from time import time
from typing import Any
from multiprocessing import context, connection, get_context
import sys
from .pickle import dumps, loads
class ConnectionWrapper: # pragma: no cover
"""
This simple wrapper provides timeout function for sending
bytes on ``Connection``.
"""
def __init__(self, conn):
self.conn = conn
def send_bytes(self, bytes_):
"""
Send bytes over the connection pipe.
"""
self.conn.send_bytes(bytes_)
def recv_bytes(self, timeout=None):
"""
Receive bytes from the connection pipe.
Raises:
TimeoutError if timeout.
"""
if self.conn.poll(timeout=timeout):
return self.conn.recv_bytes()
else:
raise TimeoutError("Timeout")
def __getattr__(self, name):
if "conn" in self.__dict__:
return getattr(self.conn, name)
raise AttributeError(f"'{type(self).__name__}' object has no attribute 'conn'")
[docs]class SimpleQueue: # pragma: no cover
"""
A simple single direction queue for inter-process communications.
There could be multiple receivers and multiple senders on each side.
"""
def __init__(self, *, ctx=None, copy_tensor=False):
"""
Args:
ctx: Multiprocessing context, you can get this using ``get_context``
copy_tensor: Set the queue to send a fully serialized tensor
if ``True``, and only a stub of reference if ``False``.
See Also:
:func:`.dump_tensor`
"""
if ctx is None:
# get default context
ctx = get_context()
self._reader, self._writer = connection.Pipe(duplex=False)
self._reader = ConnectionWrapper(self._reader)
self._writer = ConnectionWrapper(self._writer)
# _rlock will be used by _help_stuff_finish() of multiprocessing.Pool
self._rlock = ctx.Lock()
self._copy_tensor = copy_tensor
if sys.platform == "win32":
self._wlock = None
else:
self._wlock = ctx.Lock()
[docs] def empty(self):
"""
Returns:
Whether the queue is empty or not.
"""
return not self._reader.poll()
[docs] def close(self):
self._reader.close()
self._writer.close()
def __getstate__(self):
context.assert_spawning(self)
return (self._reader, self._writer, self._rlock, self._wlock, self._copy_tensor)
def __setstate__(self, state):
(
self._reader,
self._writer,
self._rlock,
self._wlock,
self._copy_tensor,
) = state
[docs] def get(self, timeout=None):
"""
Get an object from the queue. This api is required by
``multiprocessing.pool`` to perform inter-process
communication.
Note:
This api is used by sub-processes in pool to get tasks
and work.
Returns:
Any object.
"""
with self._rlock:
res = self._reader.recv_bytes(timeout)
# deserialize the data after having released the lock
return loads(res)
[docs] def put(self, obj: Any):
"""
Put an object into the queue. This api is required by
``multiprocessing.pool`` to perform inter-process
communication.
Note:
This api is used by sub-processes in pool to put results
and respond.
Args:
obj: Any object.
"""
# serialize the data before acquiring the lock
obj = dumps(obj, copy_tensor=self._copy_tensor)
if self._wlock is None:
# writes to a message oriented win32 pipe are atomic
self._writer.send_bytes(obj)
else:
with self._wlock:
self._writer.send_bytes(obj)
[docs] def quick_get(self, timeout=None):
"""
Get an object from the queue.
Note:
this api is used by the result manager (``Pool._result_handler``)
thread to get results from the queue, since it is single threaded,
there is no need to use locks, and therefore quicker.
Returns:
Any object.
"""
res = self._reader.recv_bytes(timeout)
return loads(res)
[docs] def quick_put(self, obj: Any):
"""
Put an object into the queue.
Note: this api is used by the pool manager (``Pool._task_handler``)
thread to put tasks into the queue, since it is single threaded,
there is no need to use locks, and therefore quicker.
Args:
obj: Any object.
"""
obj = dumps(obj, copy_tensor=self._copy_tensor)
self._writer.send_bytes(obj)
def __del__(self):
self.close()
class SimpleP2PQueue: # pragma: no cover
"""
A simple single direction queue for inter-process P2P communications.
Each end only have one process.
"""
def __init__(self, *, copy_tensor=False):
"""
Args:
copy_tensor: Set the queue to send a fully serialized tensor
if ``True``, and only a stub of reference if ``False``.
See Also:
:func:`.dump_tensor`
"""
self._reader, self._writer = connection.Pipe(duplex=False)
self._reader = ConnectionWrapper(self._reader)
self._writer = ConnectionWrapper(self._writer)
self._copy_tensor = copy_tensor
def empty(self):
"""
Returns:
Used by receiver, Whether the queue is empty or not.
"""
return not self._reader.poll()
def close(self):
self._reader.close()
self._writer.close()
def __getstate__(self):
context.assert_spawning(self)
return self._reader, self._writer, self._copy_tensor
def __setstate__(self, state):
(self._reader, self._writer, self._copy_tensor) = state
def get(self, timeout=None):
"""
Get an object from the queue. This api is required by
``multiprocessing.pool`` to perform inter-process
communication.
Note:
This api is used by sub-processes in pool to get tasks
and work.
Returns:
Any object.
"""
res = self._reader.recv_bytes(timeout)
# deserialize the data after having released the lock
return loads(res)
def put(self, obj: Any):
"""
Put an object into the queue. This api is required by
``multiprocessing.pool`` to perform inter-process
communication.
Note:
This api is used by sub-processes in pool to put results
and respond.
Args:
obj: Any object.
"""
# serialize the data before acquiring the lock
obj = dumps(obj, copy_tensor=self._copy_tensor)
self._writer.send_bytes(obj)
def __del__(self):
self.close()
class MultiP2PQueue:
"""
P2P queue which connects pool result manager and worker processes directly,
with no lock.
"""
def __init__(self, queue_num, *, copy_tensor=False):
self.counter = 0
self.queues = [
SimpleP2PQueue(copy_tensor=copy_tensor) for _ in range(queue_num)
]
def put(self, obj: Any):
# randomly choose a worker's queue
queue = self.queues[self.counter]
self.counter = (self.counter + 1) % len(self.queues)
queue.put(obj)
def get(self, timeout=None):
begin = time()
while True:
if timeout is not None and time() - begin > timeout:
raise TimeoutError("Timeout")
for queue in self.queues:
try:
obj = queue.get(timeout=1e-3)
return obj
except TimeoutError:
continue
def get_sub_queue(self, index):
return self.queues[index]
def __del__(self):
for q in self.queues:
q.close()
__all__ = ["SimpleQueue"]