machin.parallel¶
distributed¶
-
class
machin.parallel.distributed.CollectiveGroup(group, current_relative_rank)[source]¶ Bases:
objectA thin wrapper of collective communication primitives of
torch.distributed, the only difference is thatirecvnow supports to recv from any srcDo not do it your self, use
create_collective_group()instead.-
all_gather(tensor_list, tensor, async_op=False)[source]¶ Complex tensors are supported.
- Parameters
tensor_list (list[Tensor]) – Output list. It should contain correctly-sized tensors to be used for output of the collective.
tensor (Tensor) – Tensor to be broadcast from current process.
async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
Examples
>>> # All tensors below are of torch.int64 dtype. >>> tensor_list = [torch.zero(2, dtype=torch.int64) for _ in range(2)] >>> tensor_list [tensor([0, 0]), tensor([0, 0])] # Rank 0 and 1 >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2]), tensor([3, 4])] # Rank 0 [tensor([1, 2]), tensor([3, 4])] # Rank 1
>>> # All tensors below are of torch.cfloat dtype. >>> tensor_list = [torch.zero(2, dtype=torch.cfloat) for _ in range(2)] >>> tensor_list [tensor([0.+0.j, 0.+0.j]), tensor([0.+0.j, 0.+0.j])] # Rank 0 and 1 >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 0 [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 1
-
all_gather_multigpu(output_tensor_lists, input_tensor_list, async_op=False)[source]¶ Each tensor in
tensor_listshould reside on a separate GPUOnly nccl backend is currently supported tensors should only be GPU tensors
Complex tensors are supported.
- Parameters
output_tensor_lists (List[List[Tensor]]) –
Output lists. It should contain correctly-sized tensors on each GPU to be used for output of the collective, e.g.
output_tensor_lists[i]contains the all_gather result that resides on the GPU ofinput_tensor_list[i].Note that each element of
output_tensor_listshas the size ofworld_size * len(input_tensor_list), since the function all each element ofoutput_tensor_lists[i], note thatinput_tensor_list[j]of rank k will be appear inoutput_tensor_lists[i][k * world_size + j]Also note that
len(output_tensor_lists), and the size of each element inoutput_tensor_lists(each element is a list, thereforelen(output_tensor_lists[i])) need to be the same for all the distributed processes calling this function.input_tensor_list (List[Tensor]) – List of tensors(on different GPUs) to be broadcast from current process. Note that
len(input_tensor_list)needs to be the same for all the distributed processes calling this function.async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
all_reduce(tensor, op=<ReduceOp.SUM: 0>, async_op=False)[source]¶ Reduces the tensor data across all machines in such a way that all get the final result.
After the call
tensoris going to be bitwise identical in all processes.Complex tensors are supported.
- Parameters
tensor (Tensor) – Input and output of the collective. The function operates in-place.
op (optional) – One of the values from
torch.distributed.ReduceOpenum. Specifies an operation used for element-wise reductions.async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
Examples
>>> # All tensors below are of torch.int64 type. >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6]) # Rank 0 tensor([4, 6]) # Rank 1
>>> # All tensors below are of torch.cfloat type. >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j]) # Rank 0 tensor([4.+4.j, 6.+6.j]) # Rank 1
-
all_reduce_multigpu(tensor_list, op=<ReduceOp.SUM: 0>, async_op=False)[source]¶ Reduces the tensor data across all machines in such a way that all get the final result. This function reduces a number of tensors on every node, while each tensor resides on different GPUs. Therefore, the input tensor in the tensor list needs to be GPU tensors. Also, each tensor in the tensor list needs to reside on a different GPU.
After the call, all
tensorintensor_listis going to be bitwise identical in all processes.Complex tensors are supported.
Only nccl and gloo backend is currently supported tensors should only be GPU tensors
- Parameters
tensor_list (List[Tensor]) – List of input and output tensors of the collective. The function operates in-place and requires that each tensor to be a GPU tensor on different GPUs. You also need to make sure that
len(tensor_list)is the same for all the distributed processes calling this function.op (optional) – One of the values from
torch.distributed.ReduceOpenum. Specifies an operation used for element-wise reductions.async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
barrier(async_op=False)[source]¶ Synchronizes all processes.
if async_op is False, or if async work handle is called on wait().
- Parameters
async_op (bool, optional) – Whether this op should be an async op
device_ids ([int], optional) – List of device/GPU ids. Valid only for NCCL backend.
- Returns
Async work handle, if async_op is set to True.
-
broadcast(tensor, src, async_op=False)[source]¶ tensormust have the same number of elements in all processes participating in the collective.- Parameters
tensor (Tensor) – Data to be sent if
srcis the rank of current process, and tensor to be used to save received data otherwise.src (int) – Source rank.
async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
broadcast_multigpu(tensor_list, src, async_op=False, src_tensor=0)[source]¶ per node.
tensormust have the same number of elements in all the GPUs from all processes participating in the collective. each tensor in the list must be on a different GPUOnly nccl and gloo backend are currently supported tensors should only be GPU tensors
- Parameters
tensor_list (List[Tensor]) – Tensors that participate in the collective operation. If
srcis the rank, then the specifiedsrc_tensorelement oftensor_list(tensor_list[src_tensor]) will be broadcast to all other tensors (on different GPUs) in the src process and all tensors intensor_listof other non-src processes. You also need to make sure thatlen(tensor_list)is the same for all the distributed processes calling this function.src (int) – Source rank.
async_op (bool, optional) – Whether this op should be an async op
src_tensor (int, optional) – Source tensor rank within
tensor_list
- Returns
Async work handle, if async_op is set to True.
-
gather(tensor, gather_list, dst=0, async_op=False)[source]¶ Gathers a list of tensors in a single process.
- Parameters
tensor (Tensor) – Input tensor.
gather_list (list[Tensor], optional) – List of appropriately-sized tensors to use for gathered data (default is None, must be specified on the destination rank)
dst (int, optional) – Destination rank (default is 0)
async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
irecv(tensor, src=None, tag=0)[source]¶ - Returns
An object you can call .wait() on, .wait() will return the source rank.
-
isend(tensor, dst, tag=0)[source]¶ Sends a tensor asynchronously.
- Parameters
tensor (Tensor) – Tensor to send.
dst (int) – Destination rank.
tag (int, optional) – Tag to match send with remote recv
- Returns
A distributed request object.
-
recv(tensor, src=None, tag=0)[source]¶ Receives a tensor synchronously.
- Parameters
tensor (Tensor) – Tensor to fill with received data.
src (int, optional) – Source rank. Will receive from any process if unspecified.
tag (int, optional) – Tag to match recv with remote send
- Returns
Sender rank
-
reduce(tensor, dst, op=<ReduceOp.SUM: 0>, async_op=False)[source]¶ Reduces the tensor data across all machines.
Only the process with rank
dstis going to receive the final result.- Parameters
tensor (Tensor) – Input and output of the collective. The function operates in-place.
dst (int) – Destination rank
op (optional) – One of the values from
torch.distributed.ReduceOpenum. Specifies an operation used for element-wise reductions.async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
reduce_multigpu(tensor_list, dst, op=<ReduceOp.SUM: 0>, async_op=False, dst_tensor=0)[source]¶ Reduces the tensor data on multiple GPUs across all machines. Each tensor in
tensor_listshould reside on a separate GPUOnly the GPU of
tensor_list[dst_tensor]on the process with rankdstis going to receive the final result.Only nccl backend is currently supported tensors should only be GPU tensors
- Parameters
tensor_list (List[Tensor]) – Input and output GPU tensors of the collective. The function operates in-place. You also need to make sure that
len(tensor_list)is the same for all the distributed processes calling this function.dst (int) – Destination rank
op (optional) – One of the values from
torch.distributed.ReduceOpenum. Specifies an operation used for element-wise reductions.async_op (bool, optional) – Whether this op should be an async op
dst_tensor (int, optional) – Destination tensor rank within
tensor_list
- Returns
Async work handle, if async_op is set to True. None, otherwise
-
scatter(tensor, scatter_list=None, src=0, async_op=False)[source]¶ Each process will receive exactly one tensor and store its data in the
tensorargument.- Parameters
tensor (Tensor) – Output tensor.
scatter_list (list[Tensor]) – List of tensors to scatter (default is None, must be specified on the source rank)
src (int) – Source rank (default is 0)
async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
-
class
machin.parallel.distributed.RpcGroup(group_name, group_members, first_create=True)[source]¶ Bases:
object-
get_group_members()[source]¶ - Returns
A list of names of group members. Names are sorted in ascending order.
- Return type
List[str]
-
get_paired(key)[source]¶ - Parameters
key (Any) – Key of the paired value, in this group.
- Returns
A RRef to the paired value.
- Raises
KeyError if not found. –
-
is_member(target=None)[source]¶ Check whether target name is a group member.
- Parameters
target (str) –
- Return type
bool
-
is_paired(key)[source]¶ Check whether a key has been paired to the current group.
- Parameters
key (Any) – A key which uniquely identifies this value in this group. The name only needs to be unique for this value in this group.
-
is_registered(key)[source]¶ Check whether a service has been registered in the current group.
- Parameters
key (Any) – A key which uniquely identifies this service in this group. The name only needs to be unique for this service in this group.
-
registered_async(key, args=(), kwargs=None)[source]¶ - Parameters
key (Any) – Key of the registered service, in this group.
args – Service arguments.
kwargs – Service keyword arguments.
- Returns
A future object you can call
wait()``on. ``wait()will block the thread until execution is completed, and will return the result returned by the service.- Raises
KeyError if service is not found. –
-
registered_remote(key, args=(), kwargs=None)[source]¶ - Parameters
key (Any) – Key of the registered service, in this group.
args – Service arguments.
kwargs – Service keyword arguments.
- Returns
A RRef object pointing to the result returned by the service.
- Raises
KeyError if service is not found. –
-
registered_sync(key, args=(), kwargs=None)[source]¶ - Parameters
key (Any) – Key of the registered service, in this group.
args – Service arguments.
kwargs – Service keyword arguments.
- Returns
Result returned by the service.
- Raises
KeyError if service is not found. –
-
remote(to, func, timeout=- 1, args=(), kwargs=None)[source]¶ Make a remote call to run
funcon workertoand return anRRefto the result value immediately. Workertowill be the owner of the returnedRRef, and the worker callingremoteis a user. The owner manages the global reference count of itsRRef, and the ownerRRefis only destructed when globally there are no living references to it.- Parameters
to (str or WorkerInfo or int) – name/rank/
WorkerInfoof the destination worker.func (callable) – a callable function, such as Python callables, builtin operators (e.g.
add()) and annotated TorchScript functions.args (tuple) – the argument tuple for the
funcinvocation.kwargs (dict) – is a dictionary of keyword arguments for the
funcinvocation.timeout (float, optional) – timeout in seconds for this remote call. If the creation of this
RRefon workertois not successfully processed on this worker within this timeout, then the next time there is an attempt to use the RRef (such asto_here()), a timeout will be raised indicating this failure. A value of 0 indicates an infinite timeout, i.e. a timeout error will never be raised. If not provided, the default value set during initialization or with_set_rpc_timeoutis used.
- Returns
A user
RRefinstance to the result value. Use the blocking APItorch.distributed.rpc.RRef.to_here()to retrieve the result value locally.
Warning
The
remoteAPI does not copy storages of argument tensors until sending them over the wire, which could be done by a different thread depending on the RPC backend type. The caller should make sure that the contents of those tensors stay intact until the returned RRef is confirmed by the owner, which can be checked using thetorch.distributed.rpc.RRef.confirmed_by_owner()API.Warning
Errors such as timeouts for the
remoteAPI are handled on a best-effort basis. This means that when remote calls initiated byremotefail, such as with a timeout error, we take a best-effort approach to error handling. This means that errors are handled and set on the resulting RRef on an asynchronous basis. If the RRef has not been used by the application before this handling (such asto_hereor fork call), then future uses of theRRefwill appropriately raise errors. However, it is possible that the user application will use theRRefbefore the errors are handled. In this case, errors may not be raised as they have not yet been handled.- Example::
Make sure that
MASTER_ADDRandMASTER_PORTare set properly API for more details. For example,>>> export MASTER_ADDR=localhost >>> export MASTER_PORT=5678
Then run the following code in two different processes:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
Below is an example of running a TorchScript function using RPC.
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(t1, t2): >>> return torch.add(t1, t2)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
-
rpc_async(to, func, timeout=- 1, args=(), kwargs=None)[source]¶ Make a non-blocking RPC call to run function
funcon workerto. RPC messages are sent and received in parallel to execution of Python code. This method is thread-safe. This method will immediately return aFuturethat can be awaited on.- Parameters
to (str or WorkerInfo or int) – name/rank/
WorkerInfoof the destination worker.func (callable) – a callable function, such as Python callables, builtin operators (e.g.
add()) and annotated TorchScript functions.args (tuple) – the argument tuple for the
funcinvocation.kwargs (dict) – is a dictionary of keyword arguments for the
funcinvocation.timeout (float, optional) – timeout in seconds to use for this RPC. If the RPC does not complete in this amount of time, an exception indicating it has timed out will be raised. A value of 0 indicates an infinite timeout, i.e. a timeout error will never be raised. If not provided, the default value set during initialization or with
_set_rpc_timeoutis used.
- Returns
Returns a
Futureobject that can be waited on. When completed, the return value offunconargsandkwargscan be retrieved from theFutureobject.
Warning
Using GPU tensors as arguments or return values of
funcis not supported since we don’t support sending GPU tensors over the wire. You need to explicitly copy GPU tensors to CPU before using them as arguments or return values offunc.Warning
The
rpc_asyncAPI does not copy storages of argument tensors until sending them over the wire, which could be done by a different thread depending on the RPC backend type. The caller should make sure that the contents of those tensors stay intact until the returnedFuturecompletes.- Example::
Make sure that
MASTER_ADDRandMASTER_PORTare set properly API for more details. For example,>>> export MASTER_ADDR=localhost >>> export MASTER_PORT=5678
Then run the following code in two different processes:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
Below is an example of running a TorchScript function using RPC.
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(t1, t2): >>> return torch.add(t1, t2)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
-
rpc_sync(to, func, timeout=- 1, args=(), kwargs=None)[source]¶ Make a blocking RPC call to run function
funcon workerto. RPC messages are sent and received in parallel to execution of Python code. This method is thread-safe.- Parameters
to (str or WorkerInfo or int) – name/rank/
WorkerInfoof the destination worker.func (callable) – a callable function, such as Python callables, builtin operators (e.g.
add()) and annotated TorchScript functions.args (tuple) – the argument tuple for the
funcinvocation.kwargs (dict) – is a dictionary of keyword arguments for the
funcinvocation.timeout (float, optional) – timeout in seconds to use for this RPC. If the RPC does not complete in this amount of time, an exception indicating it has timed out will be raised. A value of 0 indicates an infinite timeout, i.e. a timeout error will never be raised. If not provided, the default value set during initialization or with
_set_rpc_timeoutis used.
- Returns
Returns the result of running
funcwithargsandkwargs.
- Example::
Make sure that
MASTER_ADDRandMASTER_PORTare set properly API for more details. For example,>>> export MASTER_ADDR=localhost >>> export MASTER_PORT=5678
Then run the following code in two different processes:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
Below is an example of running a TorchScript function using RPC.
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(t1, t2): >>> return torch.add(t1, t2)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
-
server¶
-
class
machin.parallel.server.OrderedServerBase[source]¶ Bases:
abc.ABCDescendent classes of OrderedServer does not have to guarantee strong consistency, that is, even if
OrderedServerBase.push_service`()has returned True, there are possibilities that these acknowledged push are discarded.-
abstract
pull(key, version=None)[source]¶ Pull a value with the specified
versioninkey.- Parameters
key – Key.
version – Target version, if
None, then the newest version of value of key will be pulled.
- Returns
Noneif version is not found, auto-deleted, or key is not found, otherwise returns value with the specifiedversioninkey, and thenversion
-
abstract
push(key, value, version, prev_version)[source]¶ Push a new
versionofvalueinkeyto the ordered server.Note
If
version = prev_versionthen there is no order guarantee. But you may exploit this feature.- Parameters
key – Key.
value – value.
version – New version.
prev_version – Previous version.
- Returns
Trueif success, andFalseif not.
-
abstract
-
class
machin.parallel.server.OrderedServerSimple(server_name, group)[source]¶ Bases:
machin.parallel.server.ordered_server.OrderedServerBase- Parameters
server_name (str) –
group (machin.parallel.distributed._world.RpcGroup) –
-
pull(key, version=None)[source]¶ Pull a value with the specified
versioninkey.- Parameters
key – Key.
version – Target version, if
None, then the newest version of value of key will be pulled.
- Returns
Noneif version is not found, auto-deleted, or key is not found, otherwise returns value with the specifiedversioninkey, and thenversion
-
push(key, value, version, prev_version)[source]¶ Push a new
versionofvalueinkeyto the ordered server.Note
If
version = prev_versionthen there is no order guarantee. But you may exploit this feature.- Parameters
key – Key.
value – value.
version – New version.
prev_version – Previous version.
- Returns
Trueif success, andFalseif not.
-
class
machin.parallel.server.OrderedServerSimpleImpl(server_name, group, version_depth=1, **__)[source]¶ Bases:
objectA simple key-value server, with strict ordered update
This init function must be only invoked on the runner process, and the runner process must be a member process of
group.- Parameters
server_name (str) – Name of this server, used to registered the server as a paired class of
group.group (machin.parallel.distributed._world.RpcGroup) – Rpc group where server locates.
server_runner – Name of the process serving the ordered server. By default is the first member of the group.
version_depth (int) – Storage depth of old versions of the same key. If
depth = 1, then only the newest version of the key will be saved.
-
class
machin.parallel.server.PushPullGradServer(server_name, group, model_name, secondary_reducers, o_server)[source]¶ Bases:
object- Parameters
server_name (str) –
group (machin.parallel.distributed._world.RpcGroup) –
model_name (str) –
secondary_reducers (List[str]) –
o_server (machin.parallel.server.ordered_server.OrderedServerBase) –
-
class
machin.parallel.server.PushPullGradServerImpl(server_name, group, model_name='model', primary_reducer=None, secondary_reducers=None, o_server=None, reduce_method='sum', reduce_device='cpu', reduce_batch_size=4, max_queue_size=64)[source]¶ Bases:
objectA simple parameter server, which synchronize model parameters by pushing gradients and pulling back new parameters, no strict order is guaranteed.
Warning
DistributedDataParallelis not supported. since we cannot load state dictionary after creation.Note
You should initialize
PushPullGradServeron all members ofsecondary_reducers, andprimary_reducer. Both of them should be members of thegroup.Note
Internally the primary reducer will push updated versions to the ordered server.
Hint
Reduction is performed in a tree fashion:
In the first step, clients will push new gradients to a random secondary reducer, and the secondary reducer will perform the first reduction pass, then secondary reducers will push their results to the primary reducer.
In the second step, the primary reducer will reduce results from the secondary reducer to get the final reduced gradient dictionary (has the same structure as state_dict), and assign gradients to its managed model, and perform the optimization.
In the final step, the primary reducer will push the final model to the model server group, then clients can pull the newest model.
- Parameters
server_name (str) – Name of this server, used to registered the server as a paired class of
group.group (machin.parallel.distributed._world.RpcGroup) – Server group.
model_name (str) – Name of the managed model in the ordered server, only needed if
serverneeds such a identifier. The default ordered server does not require this.primary_reducer (str) – Name of the process serving as the primary reducer, which collects reduced gradients from secondary reducers and perform the final reduction.
secondary_reducers (List[str]) – Name of the process serving as secondary reducers.
o_server (machin.parallel.server.ordered_server.OrderedServerBase) – Custom ordered server accessor. By default, the ordered server is a
OrderedServerSimplehosted on the primary reducer.reduce_method (str) – “mean” or “sum”
reduce_device (Union[torch.device, str]) – Device to perform reduction, by default it is “cpu”.
reduce_batch_size (int) – Size of a single reduction batch, server will wait until the number of requests in the reduction queue have reached this size.
max_queue_size (int) – Maximum reduction request queue size.
-
manage_model(model, optimizer, lr_scheduler=None)[source]¶ Let the main reducer manage your model. Must be called before start.
Warning
Make sure that the managed model is different from the model you use in your algorithms such as A3C!
- Parameters
model (torch.nn.modules.module.Module) – Model to manage.
optimizer (Any) – Optimizer of your model. you should initialize it first:
optimizer = Adam (>>>) –
lr_scheduler (Any) – learning rate scheduler, you should initialize it first:
scheduler = LambdaLR (>>>) –
- Raises
RuntimeError if current rpc role is not the main reducer. –
-
REDUCE_MASTER= 0¶
-
REDUCE_SLAVE= 1¶
-
class
machin.parallel.server.PushPullModelServer(model_name, o_server=None)[source]¶ Bases:
objectCreate an accessor to the services provided by
PushPullModelServerImpl- Parameters
model_name (str) – Name of the managed model in the ordered server, only needed if
serverneeds such a identifier. The default ordered server does not require this.o_server (machin.parallel.server.ordered_server.OrderedServerBase) – Ordered server accessor.
-
pull(model)[source]¶ Pull the newest state dict of your model and update its parameters and
pp_version. Gradients will not be cleared.- Parameters
model (torch.nn.modules.module.Module) – Model to pull.
- Returns
True if pull succeeded, else False.
-
push(model, pull_on_fail=True)[source]¶ Try to push a model to the ordered server, if failed, the newest model will be automatically pulled and its parameters will be assigned to
model. Gradients will not be cleared.- Parameters
model (torch.nn.modules.module.Module) – Model to push.
pull_on_fail – Pull the newest parameters if push failed.
- Returns
True if push succeeded, else False.
-
class
machin.parallel.server.PushPullModelServerImpl(server_name, group, model_name='model', o_server=None)[source]¶ Bases:
objectA simple parameter server, which synchronize model parameters by pushing and pulling all parameters and maintaining a strict ordered version chain.
Warning
Only one model is supported.
This init function must be only invoked on the runner process, and the runner process must be a member process of
group.- Parameters
server_name (str) – Name of this server, used to registered the server as a paired class of
group.group (machin.parallel.distributed._world.RpcGroup) – RpcGroup of the default server
OrderedServerSimplemutually exclusive witho_servermodel_name (str) – Name of the managed model in the ordered server, only needed if
serverneeds such a identifier. The default ordered server does not require this.o_server (machin.parallel.server.ordered_server.OrderedServerBase) – Custom ordered server accessor.
assigner¶
-
class
machin.parallel.assigner.ModelAssigner(models, model_connection, devices=None, model_size_multiplier=2, max_mem_ratio=0.5, cpu_weight=0, connection_weight=2, size_match_weight=0.01, complexity_match_weight=1, entropy_weight=1, iterations=500, update_rate=0.01, gpu_gpu_distance=1, cpu_gpu_distance=10, move_models=True)[source]¶ Bases:
objectAssigner for pytorch modules.
Assign models to different devices. In the scope of a single process. Assigner assumes all GPUs have the same processing power.
Assignment is based on four aspects:
Distance and model connections. Connection is usually indicated by the amount of data transmitted between two models.
Compute complexity.
Model size.
Entropy.
Four aspects are controlled by four weights:
connection_weight, assigner will try to reduce the totaldistance * connectionif this weight is larger.size_match_weight, this weight controls the total memory space used on a single device, only works if total assigned memory of models exceeds allowed device memory size (internally it uses a relu activation), the larger, the tighter and more restricted the fit.complexity_match_weight, this weights balance the model computation cost across devices, assigner will try to even thecomputation cost / compute powerratio for each device if this weight is larger.entropy_weight, this weight minimize the uncertainty of model placement probability, somodel iwill have a close to 1 probability of locating on somedevice jif this weight is larger.
Assignment uses gradient descent to compute the probability matrix of each
model ilocating on each availabledevice j.See also
Note
When the sum of your model size is very close to the capacity of your device memory, ModelAssigner does not respond very well to the
size_match_weight, therefore, please consider about increasingmodel_size_multiplieror decreasingmax_mem_ratio.- Parameters
models (List[torch.nn.modules.module.Module]) – Models to assign.
model_connection (Dict[Tuple[int, int], int]) – Connection weight between modules. Must be positive
devices (List[Union[torch.device, str]]) – Available devices.
model_size_multiplier – Size multiplier of models, used to reserve enough space for models,
max_mem_ratio – Maximum percent of memory allowed.
cpu_weight – Weight of cpu. Relative to the computing power of one GPU. By default it is 0 so no computation will be performed on CPU. Must be positive
connection_weight – Weight of connection between models.
size_match_weight – Weight of size match.
complexity_match_weight – Weight of complexity match.
entropy_weight – Weight of entropy.
iterations – Number of optimization iterations.
update_rate – Learning rate of the adam optimizer.
gpu_gpu_distance – Estimated distance cost between gpu-gpu. Must be positive
cpu_gpu_distance – Estimated distance cost between cpu-gpu. Must be positive
move_models – Whether to automatically move the models after assignment.
-
static
optimize_placement(optimizer, placement, model_size, size_capacity, model_complexity, complexity_capacity, model_connection, device_distance, connection_weight, size_match_weight, complexity_match_weight, entropy_weight)[source]¶ Suppose there are n models to place and m devices available.
- Parameters
optimizer – optimizer of placement
placement (torch.Tensor) – shape
[n, m]model_size (torch.Tensor) – shape
[1, n]size_capacity (torch.Tensor) – shape
[1, m]model_complexity (torch.Tensor) – shape
[1, n]complexity_capacity (torch.Tensor) – shape
[1, m]model_connection (torch.Tensor) – shape
[n, n]device_distance (torch.Tensor) – shape
[m, m]connection_weight (float) – Weight of connection between models.
size_match_weight (float) – Weight of size match.
complexity_match_weight (float) – Weight of complexity match.
entropy_weight (float) – weight of entropy.
-
property
assignment¶ List[t.device]: Assigned devices for each model in your model list.
-
class
machin.parallel.assigner.ModelSizeEstimator(model, size_multiplier=2)[source]¶ Bases:
objectSize estimator for pytorch modules.
Estimates the size of PyTorch models in memory.
Note
This estimator can only estimate the total size of parameters and buffers. Therefore we need to multiply the raw estimated size with a correction coefficient to reserve enough space for models.
- Parameters
model (torch.nn.modules.module.Module) – Model to be estimated.
size_multiplier – Model estimated size will be multiplied with this value, to ensure enough space will be reserved to contain your model and inputs.
pickle¶
-
class
machin.parallel.pickle.Pickler(file, recurse=False, copy_tensor=False)[source]¶ Bases:
dill._dill.PicklerNote
Picklers shares “.dispatch” among instances, and owns “dispatch_table” per instance.
The base Pickler (not dill, from builtin pickle library), will first look up the default dump method in “.dispatch”, if no valid method is found, it will try to find a custom dump method in “.dispatch_table”.
This takes a binary file for writing a pickle data stream.
The optional protocol argument tells the pickler to use the given protocol; supported protocols are 0, 1, 2, 3 and 4. The default protocol is 3; a backward-incompatible protocol designed for Python 3.
Specifying a negative protocol version selects the highest protocol version supported. The higher the protocol used, the more recent the version of Python needed to read the pickle produced.
The file argument must have a write() method that accepts a single bytes argument. It can thus be a file object opened for binary writing, an io.BytesIO instance, or any other custom object that meets this interface.
If fix_imports is True and protocol is less than 3, pickle will try to map the new Python 3 names to the old module names used in Python 2, so that the pickle data stream is readable with Python 2.
-
machin.parallel.pickle.dumps(obj, recurse=False, copy_tensor=True)[source]¶ Convert objects to bytes. Works for cpu and gpu tensors.
Warning
Till pytorch 1.5.0, there is a bug for referenced gpu tensors, which would require users to keep shared gpu tensors during the whole process life and not reassigning / deleting them, however, you may refill them with different values.
See here
- Parameters
obj – Object to dump.
recurse – Enable recursive dumping, enable this to dump local functions and lambdas.
copy_tensor – Whether to dump tensors, storage as a full copy. If it is set to “False”, then dumped tensors must either locate on GPUs or in shared memory.
- Returns
Bytes.
-
machin.parallel.pickle.mark_static_module(module)[source]¶ Some modules are static, which means they are stateless and will remain the same whether you import it in process A or process B.
If your module contains reference to functions, objects or anything inside a CDLL (usually the reference is a pointer), it is not picklable by dill, and will cause nasty errors, however, by marking this module as “Static”, dill will recognize this module as a builtin module and not saving the states of this module, dill will only save a reference to it in this situation.
- Parameters
module (Any) – Some module which imports CDLLs by hand and not using pybind11.
pool¶
-
class
machin.parallel.pool.AsyncResult(job, cache, callback, error_callback)[source]¶ Bases:
objectClass whose instances are returned by Pool.apply_async()
-
get(timeout=None)[source]¶ Return the result when it arrives.
If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised.
If the remote call raised an exception then that exception will be reraised by get().
- Parameters
timeout – Timeout in seconds.
- Returns
The result.
- Return type
Any
-
-
class
machin.parallel.pool.BasePool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)[source]¶ Bases:
objectThe basic pool class, adapted from python 3.7.3 multiprocessing.pool.
Note
The exception thrown while iterating the iterable will not be reraised and will be thrown here. This is different from the original implementation.
-
apply(func, args=(), kwds=None)[source]¶ Equivalent of func(*args, **kwds).
- Parameters
func (Callable) – Function to call.
args (Tuple) – Arguments provided to the function call.
kwds (Dict) – Keyword arguments provided to the function call.
- Returns
Function call result.
- Return type
Any
-
apply_async(func, args=(), kwds=None, callback=None, error_callback=None)[source]¶ Asynchronous version of apply() method.
- Parameters
func (Callable) – Function to call.
args (Tuple) – Arguments provided to the function call.
kwds (Dict) – Keyword arguments provided to the function call.
callback (Callable[[Any], None]) – Callback function to apply on the result.
error_callback (Callable[[Exception], None]) – Error callback function to apply on the exception instance.
- Returns
An instance of
AsyncResult.- Return type
-
close()[source]¶ Softly closing the pool, handler threads, and then shutdown workers by sending signals. The pool will be closed after all job is finished and all results returned.
Remember to call
join()to wait for full shutdown.
-
imap(func, iterable, chunksize=1)[source]¶ Equivalent of map(), but will not store all results, instead, get one at a time in the sequential order.
- Parameters
func (Callable[[Any], Any]) – Function to call.
iterable (Collection[Any]) – A collection of single argument provided to the function call.
chunksize (int) – Size of iterable chunk assigned to each worker.
- Returns
ImapIteratorwhen chunksize is set to 1, else a list of results.- Return type
Union[machin.parallel.pool.IMapIterator, List[Any]]
-
imap_unordered(func, iterable, chunksize=1)[source]¶ Like imap() method but ordering of results is arbitrary.
- Parameters
func (Callable[[Any], Any]) – Function to call.
iterable (Collection[Any]) – A collection of single argument provided to the function call.
chunksize (int) – Size of iterable chunk assigned to each worker.
- Returns
ImapIteratorwhen chunksize is set to 1, else a list of results.- Return type
Union[machin.parallel.pool.IMapUnorderedIterator, List[Any]]
-
join_workers()[source]¶ Wait until all workers have terminated.
Override this method to implement your own pool.
-
maintain_pool()[source]¶ Watch workers for exceptions and raise them and then terminate the pool, Clean up any retired workers reaching max task number, and start replacements for them.
Override this method to implement your own pool.
-
map(func, iterable, chunksize=None)[source]¶ Apply func to each element in iterable, collecting the results in a list that is returned.
- Parameters
func (Callable[[Any], Any]) – Function to call.
iterable (Collection[Any]) – A collection of single argument provided to the function call.
chunksize (int) – Size of iterable chunk assigned to each worker.
- Returns
A list of result from applying the function on each item in the iterable.
- Return type
List[Any]
-
map_async(func, iterable, chunksize=None, callback=None, error_callback=None)[source]¶ Asynchronous version of map() method.
- Parameters
func (Callable[[Any], Any]) – Function to call.
iterable (Collection[Any]) – A collection of single argument provided to the function call.
chunksize (int) – Size of iterable chunk assigned to each worker.
callback (Callable[[Any], None]) – Callback function to apply on the result.
error_callback (Callable[[Exception], None]) – Error callback function to apply on the exception instance.
- Returns
An instance of
AsyncResult.- Return type
-
pool_inqueue_put(obj)[source]¶ Put a task item into the input queue on the pool side. Note all
Override this method to implement your own pool.
- Parameters
obj (Any) –
-
pool_outqueue_get(timeout)[source]¶ Read a result item from the output queue on the pool side.
The method should block for timeout seconds, and then throw a
TimeoutErrorif no result is available. It should also throwOSErrororEOFErrorto indicate that it is improperly closed and cannot be used.Override this method to implement your own pool.
- Parameters
timeout (float) –
-
repopulate_pool()[source]¶ Bring the number of pool workers up to the specified number, it also creates new workers to replace old workers which have exited after executing
maxtasksperchild.Override this method to implement your own pool.
-
setup_queues()[source]¶ Create an input queue and an output queue which will be used to communicate with workers.
Override this method to implement your own pool.
-
starmap(func, iterable, chunksize=None)[source]¶ Like map() method but the elements of the iterable are expected to be iterables as well and will be unpacked as arguments. Hence func and (a, b) becomes func(a, b).
- Parameters
func (Callable[[Any], Any]) – Function to call.
iterable (Collection[Tuple]) – A collection of tuples of arguments provided to the function call.
chunksize (int) – Size of iterable chunk assigned to each worker.
- Returns
A list of result from applying the function on each tuple in the iterable.
- Return type
List[Any]
-
starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None)[source]¶ Asynchronous version of starmap() method.
- Parameters
func (Callable[[Any], Any]) – Function to call.
iterable (Collection[Tuple]) – A collection of tuples of arguments provided to the function call.
chunksize (int) – Size of iterable chunk assigned to each worker.
callback (Callable[[Any], None]) – Callback function to apply on the result.
error_callback (Callable[[Exception], None]) – Error callback function to apply on the exception instance.
- Returns
An instance of
AsyncResult.- Return type
-
terminate_workers()[source]¶ Force terminate all workers.
Override this method to implement your own pool.
-
static
worker(get, put, initializer=None, initargs=(), maxtasks=None)[source]¶ The default worker function executed by worker processes.
Override this method to implement your own pool.
- Parameters
get – A function of form
get() -> Anyused to get tasks.put – A function of form
put(obj: Any)used to put results.initializer (Callable) – An initializer function to init global environment in worker processes.
initargs (Tuple) – Initializer arguments.
maxtasks (int) – Maximum number of tasks a worker needs to run before it exits.
-
-
class
machin.parallel.pool.CtxPool(processes, initializer=None, initargs=(), maxtasksperchild=None, worker_contexts=None, is_recursive=False, is_daemon=True, is_copy_tensor=True, share_method=None)[source]¶ Bases:
machin.parallel.pool.PoolPool with context for each worker. your function must accept a
ctxobject as your first non-keyword argument.If
worker_contextsis not specified, thenctxwill beNone.The length of
worker_contextsmust be the same asprocessesNote
To share “cpu” tensors in shared memory, you must set:
is_copy_tensor=False, share_method="cpu"
To share “cuda” tensors, you must set:
is_copy_tensor=False, share_method="cuda"
Note
The default context used in pool is “spawn”, to avoid any issues brought by “fork”. “fork” will only be used if you want to pass cpu tensors in shared memory.
- Parameters
processes (int) – Number of processes in the pool.
initializer – Initializer function executed by the pool/
initargs – Args passed to the init function.
maxtasksperchild – Maximum number of tasks per worker process.
is_recursive – Set to
Trueto support local functions and lambdas.is_daemon – Whether worker processes in the pool are started as daemon processes.
is_copy_tensor – Whether to copy tensors or pass tensors by reference to worker processes.
share_method – If
is_copy_tensorisFalse, you must specify this argument. “cpu” means you may use cpu tensors in the shared memory, “cuda” means cuda tensors, you can only specify one share method.
-
class
machin.parallel.pool.CtxPoolStorage[source]¶ Bases:
objectThis storage class is used by all
CtxPoolinstances. However, since for each worker process, they have different memory spaces,storageis unique for all workers.storageis accessed on the client process side.-
storage= None¶
-
-
class
machin.parallel.pool.CtxThreadPool(processes, initializer=None, initargs=(), worker_contexts=None)[source]¶ Bases:
machin.parallel.pool.ThreadPoolNote
To share “cpu” tensors in shared memory, you must set:
is_copy_tensor=False, share_method="cpu"
To share “cuda” tensors, you must set:
is_copy_tensor=False, share_method="cuda"
Note
The default context used in pool is “spawn”, to avoid any issues brought by “fork”. “fork” will only be used if you want to pass cpu tensors in shared memory.
- Parameters
processes (int) – Number of processes in the pool.
initializer – Initializer function executed by the pool/
initargs – Args passed to the init function.
maxtasksperchild – Maximum number of tasks per worker process.
is_recursive – Set to
Trueto support local functions and lambdas.is_daemon – Whether worker processes in the pool are started as daemon processes.
is_copy_tensor – Whether to copy tensors or pass tensors by reference to worker processes.
share_method – If
is_copy_tensorisFalse, you must specify this argument. “cpu” means you may use cpu tensors in the shared memory, “cuda” means cuda tensors, you can only specify one share method.
-
apply(func, args=(), kwds=None)[source]¶ Equivalent of func(*args, **kwds).
- Parameters
func – Function to call.
args – Arguments provided to the function call.
kwds – Keyword arguments provided to the function call.
- Returns
Function call result.
-
apply_async(func, args=(), kwds=None, callback=None, error_callback=None)[source]¶ Asynchronous version of apply() method.
- Parameters
func – Function to call.
args – Arguments provided to the function call.
kwds – Keyword arguments provided to the function call.
callback – Callback function to apply on the result.
error_callback – Error callback function to apply on the exception instance.
- Returns
An instance of
AsyncResult.
-
imap(func, iterable, chunksize=1)[source]¶ Equivalent of map(), but will not store all results, instead, get one at a time in the sequential order.
- Parameters
func – Function to call.
iterable – A collection of single argument provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
- Returns
ImapIteratorwhen chunksize is set to 1, else a list of results.
-
imap_unordered(func, iterable, chunksize=1)[source]¶ Like imap() method but ordering of results is arbitrary.
- Parameters
func – Function to call.
iterable – A collection of single argument provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
- Returns
ImapIteratorwhen chunksize is set to 1, else a list of results.
-
map(func, iterable, chunksize=None)[source]¶ Apply func to each element in iterable, collecting the results in a list that is returned.
- Parameters
func – Function to call.
iterable – A collection of single argument provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
- Returns
A list of result from applying the function on each item in the iterable.
-
map_async(func, iterable, chunksize=None, callback=None, error_callback=None)[source]¶ Asynchronous version of map() method.
- Parameters
func – Function to call.
iterable – A collection of single argument provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
callback – Callback function to apply on the result.
error_callback – Error callback function to apply on the exception instance.
- Returns
An instance of
AsyncResult.
-
repopulate_pool()[source]¶ Bring the number of pool processes up to the specified number, for use after reaping workers which have exited.
-
starmap(func, iterable, chunksize=None)[source]¶ Like map() method but the elements of the iterable are expected to be iterables as well and will be unpacked as arguments. Hence func and (a, b) becomes func(a, b).
- Parameters
func – Function to call.
iterable – A collection of tuples of arguments provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
- Returns
A list of result from applying the function on each tuple in the iterable.
-
starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None)[source]¶ Asynchronous version of starmap() method.
- Parameters
func – Function to call.
iterable – A collection of tuples of arguments provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
callback – Callback function to apply on the result.
error_callback – Error callback function to apply on the exception instance.
- Returns
An instance of
AsyncResult.
-
class
machin.parallel.pool.IMapIterator(job, cache, length)[source]¶ Bases:
objectClass whose instances are returned by Pool.imap()
-
next(timeout=None)[source]¶ Return the next result within timeout.
If timeout is reached and no new item is returned by the worker, and returned total item number is smaller than the job size, then raise an TimeoutError.
If total item number is equal than the job size (all jobs finished and returned), then raise an StopIteration.
- Parameters
timeout – Timeout in seconds.
- Return type
Any
-
-
class
machin.parallel.pool.IMapUnorderedIterator(job, cache, length)[source]¶ Bases:
machin.parallel.pool.IMapIteratorClass whose instances are returned by Pool.imap_unordered()
-
class
machin.parallel.pool.MapResult(job, cache, chunksize, length, callback, error_callback)[source]¶ Bases:
machin.parallel.pool.AsyncResultClass whose instances are returned by Pool.map_async()
-
class
machin.parallel.pool.P2PPool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, is_recursive=False, is_daemon=True, is_copy_tensor=True, share_method=None)[source]¶ Bases:
machin.parallel.pool.PoolNote
To share “cpu” tensors in shared memory, you must set:
is_copy_tensor=False, share_method="cpu"
To share “cuda” tensors, you must set:
is_copy_tensor=False, share_method="cuda"
Note
The default context used in pool is “spawn”, to avoid any issues brought by “fork”. “fork” will only be used if you want to pass cpu tensors in shared memory.
- Parameters
processes – Number of processes in the pool.
initializer – Initializer function executed by the pool/
initargs – Args passed to the init function.
maxtasksperchild – Maximum number of tasks per worker process.
is_recursive – Set to
Trueto support local functions and lambdas.is_daemon – Whether worker processes in the pool are started as daemon processes.
is_copy_tensor – Whether to copy tensors or pass tensors by reference to worker processes.
share_method – If
is_copy_tensorisFalse, you must specify this argument. “cpu” means you may use cpu tensors in the shared memory, “cuda” means cuda tensors, you can only specify one share method.
-
close()[source]¶ Softly closing the pool, handler threads, and then shutdown workers by sending signals. The pool will be closed after all job is finished and all results returned.
Remember to call
join()to wait for full shutdown.
-
class
machin.parallel.pool.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, is_recursive=False, is_daemon=True, is_copy_tensor=True, share_method=None)[source]¶ Bases:
machin.parallel.pool.BasePool- Enhanced multiprocessing pool for pytorch, provides:
Support for lambdas and local functions.
Ability to select the tensor serialize scheme.
Note
To share “cpu” tensors in shared memory, you must set:
is_copy_tensor=False, share_method="cpu"
To share “cuda” tensors, you must set:
is_copy_tensor=False, share_method="cuda"
Note
The default context used in pool is “spawn”, to avoid any issues brought by “fork”. “fork” will only be used if you want to pass cpu tensors in shared memory.
- Parameters
processes – Number of processes in the pool.
initializer – Initializer function executed by the pool/
initargs – Args passed to the init function.
maxtasksperchild – Maximum number of tasks per worker process.
is_recursive – Set to
Trueto support local functions and lambdas.is_daemon – Whether worker processes in the pool are started as daemon processes.
is_copy_tensor – Whether to copy tensors or pass tensors by reference to worker processes.
share_method – If
is_copy_tensorisFalse, you must specify this argument. “cpu” means you may use cpu tensors in the shared memory, “cuda” means cuda tensors, you can only specify one share method.
-
apply(func, args=(), kwds=None)[source]¶ Equivalent of func(*args, **kwds).
- Parameters
func – Function to call.
args – Arguments provided to the function call.
kwds – Keyword arguments provided to the function call.
- Returns
Function call result.
-
apply_async(func, args=(), kwds=None, callback=None, error_callback=None)[source]¶ Asynchronous version of apply() method.
- Parameters
func – Function to call.
args – Arguments provided to the function call.
kwds – Keyword arguments provided to the function call.
callback – Callback function to apply on the result.
error_callback – Error callback function to apply on the exception instance.
- Returns
An instance of
AsyncResult.
-
imap(func, iterable, chunksize=1)[source]¶ Equivalent of map(), but will not store all results, instead, get one at a time in the sequential order.
- Parameters
func – Function to call.
iterable – A collection of single argument provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
- Returns
ImapIteratorwhen chunksize is set to 1, else a list of results.
-
imap_unordered(func, iterable, chunksize=1)[source]¶ Like imap() method but ordering of results is arbitrary.
- Parameters
func – Function to call.
iterable – A collection of single argument provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
- Returns
ImapIteratorwhen chunksize is set to 1, else a list of results.
-
map(func, iterable, chunksize=None)[source]¶ Apply func to each element in iterable, collecting the results in a list that is returned.
- Parameters
func – Function to call.
iterable – A collection of single argument provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
- Returns
A list of result from applying the function on each item in the iterable.
-
map_async(func, iterable, chunksize=None, callback=None, error_callback=None)[source]¶ Asynchronous version of map() method.
- Parameters
func – Function to call.
iterable – A collection of single argument provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
callback – Callback function to apply on the result.
error_callback – Error callback function to apply on the exception instance.
- Returns
An instance of
AsyncResult.
-
starmap(func, iterable, chunksize=None)[source]¶ Like map() method but the elements of the iterable are expected to be iterables as well and will be unpacked as arguments. Hence func and (a, b) becomes func(a, b).
- Parameters
func – Function to call.
iterable – A collection of tuples of arguments provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
- Returns
A list of result from applying the function on each tuple in the iterable.
-
starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None)[source]¶ Asynchronous version of starmap() method.
- Parameters
func – Function to call.
iterable – A collection of tuples of arguments provided to the function call.
chunksize – Size of iterable chunk assigned to each worker.
callback – Callback function to apply on the result.
error_callback – Error callback function to apply on the exception instance.
- Returns
An instance of
AsyncResult.
-
static
worker(*args, **kwargs)[source]¶ The default worker function executed by worker processes.
Override this method to implement your own pool.
- Parameters
get – A function of form
get() -> Anyused to get tasks.put – A function of form
put(obj: Any)used to put results.initializer – An initializer function to init global environment in worker processes.
initargs – Initializer arguments.
maxtasks – Maximum number of tasks a worker needs to run before it exits.
-
class
machin.parallel.pool.PoolStates[source]¶ Bases:
enum.EnumAn enumeration.
-
CLOSE= 1¶
-
RUN= 0¶
-
TERMINATE= 2¶
-
-
class
machin.parallel.pool.ThreadPool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, is_recursive=False, is_daemon=True, is_copy_tensor=True, share_method=None)[source]¶ Bases:
machin.parallel.pool.PoolA typical thread pool.
Note
To share “cpu” tensors in shared memory, you must set:
is_copy_tensor=False, share_method="cpu"
To share “cuda” tensors, you must set:
is_copy_tensor=False, share_method="cuda"
Note
The default context used in pool is “spawn”, to avoid any issues brought by “fork”. “fork” will only be used if you want to pass cpu tensors in shared memory.
- Parameters
processes – Number of processes in the pool.
initializer – Initializer function executed by the pool/
initargs – Args passed to the init function.
maxtasksperchild – Maximum number of tasks per worker process.
is_recursive – Set to
Trueto support local functions and lambdas.is_daemon – Whether worker processes in the pool are started as daemon processes.
is_copy_tensor – Whether to copy tensors or pass tensors by reference to worker processes.
share_method – If
is_copy_tensorisFalse, you must specify this argument. “cpu” means you may use cpu tensors in the shared memory, “cuda” means cuda tensors, you can only specify one share method.
-
join_workers()[source]¶ Wait until all workers have terminated.
Override this method to implement your own pool.
-
maintain_pool()[source]¶ Watch workers for exceptions and raise them and then terminate the pool, Clean up any retired workers reaching max task number, and start replacements for them.
Override this method to implement your own pool.
-
pool_inqueue_put(obj)[source]¶ Put a task item into the input queue on the pool side. Note all
Override this method to implement your own pool.
- Parameters
obj (Any) –
-
pool_outqueue_get(timeout)[source]¶ Read a result item from the output queue on the pool side.
The method should block for timeout seconds, and then throw a
TimeoutErrorif no result is available. It should also throwOSErrororEOFErrorto indicate that it is improperly closed and cannot be used.Override this method to implement your own pool.
- Parameters
timeout (float) –
-
repopulate_pool()[source]¶ Bring the number of pool workers up to the specified number, it also creates new workers to replace old workers which have exited after executing
maxtasksperchild.Override this method to implement your own pool.
-
machin.parallel.pool.proxy_ctx_caller(*input_)[source]¶ Call a serialized function with worker context and return results.
queue¶
-
class
machin.parallel.queue.SimpleQueue(*, ctx=None, copy_tensor=False)[source]¶ Bases:
objectA simple single direction queue for inter-process communications. There could be multiple receivers and multiple senders on each side.
- Parameters
ctx – Multiprocessing context, you can get this using
get_contextcopy_tensor – Set the queue to send a fully serialized tensor if
True, and only a stub of reference ifFalse.
See also
dump_tensor()-
get(timeout=None)[source]¶ Get an object from the queue. This api is required by
multiprocessing.poolto perform inter-process communication.Note
This api is used by sub-processes in pool to get tasks and work.
- Returns
Any object.
-
put(obj)[source]¶ Put an object into the queue. This api is required by
multiprocessing.poolto perform inter-process communication.Note
This api is used by sub-processes in pool to put results and respond.
- Parameters
obj (Any) – Any object.