-
Notifications
You must be signed in to change notification settings - Fork 198
/
buffering.py
53 lines (42 loc) · 2 KB
/
buffering.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import multiprocessing as mp
import Queue
import threading
def buffered_gen_mp(source_gen, buffer_size=2):
"""
Generator that runs a slow source generator in a separate process.
buffer_size: the maximal number of items to pre-generate (length of the buffer)
"""
if buffer_size < 2:
raise RuntimeError("Minimal buffer size is 2!")
buffer = mp.Queue(maxsize=buffer_size - 1)
# the effective buffer size is one less, because the generation process
# will generate one extra element and block until there is room in the buffer.
def _buffered_generation_process(source_gen, buffer):
for data in source_gen:
buffer.put(data, block=True)
buffer.put(None) # sentinel: signal the end of the iterator
buffer.close() # unfortunately this does not suffice as a signal: if buffer.get()
# was called and subsequently the buffer is closed, it will block forever.
process = mp.Process(target=_buffered_generation_process, args=(source_gen, buffer))
process.start()
for data in iter(buffer.get, None):
yield data
def buffered_gen_threaded(source_gen, buffer_size=2):
"""
Generator that runs a slow source generator in a separate thread. Beware of the GIL!
buffer_size: the maximal number of items to pre-generate (length of the buffer)
"""
if buffer_size < 2:
raise RuntimeError("Minimal buffer size is 2!")
buffer = Queue.Queue(maxsize=buffer_size - 1)
# the effective buffer size is one less, because the generation process
# will generate one extra element and block until there is room in the buffer.
def _buffered_generation_thread(source_gen, buffer):
for data in source_gen:
buffer.put(data, block=True)
buffer.put(None) # sentinel: signal the end of the iterator
thread = threading.Thread(target=_buffered_generation_thread, args=(source_gen, buffer))
thread.daemon = True
thread.start()
for data in iter(buffer.get, None):
yield data