forked from sagemath/sagecell
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathraw_queue.py
60 lines (51 loc) · 2.09 KB
/
raw_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
"""
Subclasses the multiprocessing queue to provide "safe" get and put methods that operate on bytes, rather than pickling objects
"""
from multiprocessing.queues import _multiprocessing, Queue, Pipe, Lock, os, sys, BoundedSemaphore, register_after_fork, threading, collections
__all__ = ['RawQueue']
class _RawQueue(Queue):
"""
RawQueue makes a single change to Queue: instead of using the underlying ``send`` and ``recv`` functions of the pipe, it uses ``send_bytes`` and ``recv_bytes`` to provide a "safe" transport mechanism
"""
def __init__(self, maxsize=0):
"""
Override the __init__ function so that *our* _after_fork function gets registered, rather than Queue's.
"""
if maxsize <= 0:
maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
self._maxsize = maxsize
self._reader, self._writer = Pipe(duplex=False)
self._rlock = Lock()
self._opid = os.getpid()
if sys.platform == 'win32':
self._wlock = None
else:
self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize)
self._after_fork()
if sys.platform != 'win32':
# this is the only line that is changed from Queue, to
# call *our* _after_fork
register_after_fork(self, _RawQueue._after_fork)
def _after_fork(self):
"""
Override the default :meth:`multiprocessing.Queue._after_fork` method
to use the ``send_bytes`` and ``recv_bytes`` methods.
"""
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
self._jointhread = None
self._joincancelled = False
self._closed = False
self._close = None
# the following two lines are the only changes from Queue,
# make the functions the *_bytes ones
self._send = self._writer.send_bytes
self._recv = self._reader.recv_bytes
self._poll = self._reader.poll
def RawQueue(maxsize=0):
'''
Returns a queue object
'''
return _RawQueue(maxsize)