machin.parallel

distributed

class machin.parallel.distributed.CollectiveGroup(group, current_relative_rank)[source]

Bases: object

A thin wrapper of collective communication primitives of torch.distributed, the only difference is that irecv now supports to recv from any src

Do not do it your self, use create_collective_group() instead.

all_gather(tensor_list, tensor, async_op=False)[source]

Complex tensors are supported.

Parameters
  • tensor_list (list[Tensor]) – Output list. It should contain correctly-sized tensors to be used for output of the collective.

  • tensor (Tensor) – Tensor to be broadcast from current process.

  • async_op (bool, optional) – Whether this op should be an async op

Returns

Async work handle, if async_op is set to True.

Examples

>>> # All tensors below are of torch.int64 dtype.
>>> tensor_list = [torch.zero(2, dtype=torch.int64) for _ in range(2)]
>>> tensor_list
[tensor([0, 0]), tensor([0, 0])] # Rank 0 and 1
>>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
>>> tensor
tensor([1, 2]) # Rank 0
tensor([3, 4]) # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1, 2]), tensor([3, 4])] # Rank 0
[tensor([1, 2]), tensor([3, 4])] # Rank 1
>>> # All tensors below are of torch.cfloat dtype.
>>> tensor_list = [torch.zero(2, dtype=torch.cfloat) for _ in range(2)]
>>> tensor_list
[tensor([0.+0.j, 0.+0.j]), tensor([0.+0.j, 0.+0.j])] # Rank 0 and 1
>>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j)
>>> tensor
tensor([1.+1.j, 2.+2.j]) # Rank 0
tensor([3.+3.j, 4.+4.j]) # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 0
[tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 1
all_gather_multigpu(output_tensor_lists, input_tensor_list, async_op=False)[source]

Each tensor in tensor_list should reside on a separate GPU

Only nccl backend is currently supported tensors should only be GPU tensors

Complex tensors are supported.

Parameters
  • output_tensor_lists (List[List[Tensor]]) –

    Output lists. It should contain correctly-sized tensors on each GPU to be used for output of the collective, e.g. output_tensor_lists[i] contains the all_gather result that resides on the GPU of input_tensor_list[i].

    Note that each element of output_tensor_lists has the size of world_size * len(input_tensor_list), since the function all each element of output_tensor_lists[i], note that input_tensor_list[j] of rank k will be appear in output_tensor_lists[i][k * world_size + j]

    Also note that len(output_tensor_lists), and the size of each element in output_tensor_lists (each element is a list, therefore len(output_tensor_lists[i])) need to be the same for all the distributed processes calling this function.

  • input_tensor_list (List[Tensor]) – List of tensors(on different GPUs) to be broadcast from current process. Note that len(input_tensor_list) needs to be the same for all the distributed processes calling this function.

  • async_op (bool, optional) – Whether this op should be an async op

Returns

Async work handle, if async_op is set to True.

all_reduce(tensor, op=<ReduceOp.SUM: 0>, async_op=False)[source]

Reduces the tensor data across all machines in such a way that all get the final result.

After the call tensor is going to be bitwise identical in all processes.

Complex tensors are supported.

Parameters
  • tensor (Tensor) – Input and output of the collective. The function operates in-place.

  • op (optional) – One of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions.

  • async_op (bool, optional) – Whether this op should be an async op

Returns

Async work handle, if async_op is set to True.

Examples

>>> # All tensors below are of torch.int64 type.
>>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
>>> tensor
tensor([1, 2]) # Rank 0
tensor([3, 4]) # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4, 6]) # Rank 0
tensor([4, 6]) # Rank 1
>>> # All tensors below are of torch.cfloat type.
>>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j)
>>> tensor
tensor([1.+1.j, 2.+2.j]) # Rank 0
tensor([3.+3.j, 4.+4.j]) # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4.+4.j, 6.+6.j]) # Rank 0
tensor([4.+4.j, 6.+6.j]) # Rank 1
all_reduce_multigpu(tensor_list, op=<ReduceOp.SUM: 0>, async_op=False)[source]

Reduces the tensor data across all machines in such a way that all get the final result. This function reduces a number of tensors on every node, while each tensor resides on different GPUs. Therefore, the input tensor in the tensor list needs to be GPU tensors. Also, each tensor in the tensor list needs to reside on a different GPU.

After the call, all tensor in tensor_list is going to be bitwise identical in all processes.

Complex tensors are supported.

Only nccl and gloo backend is currently supported tensors should only be GPU tensors

Parameters
  • list (tensor) – List of input and output tensors of the collective. The function operates in-place and requires that each tensor to be a GPU tensor on different GPUs. You also need to make sure that len(tensor_list) is the same for all the distributed processes calling this function.

  • op (optional) – One of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions.

  • async_op (bool, optional) – Whether this op should be an async op

Returns

Async work handle, if async_op is set to True.

barrier(async_op=False)[source]

Synchronizes all processes.

if async_op is False, or if async work handle is called on wait().

Parameters
  • async_op (bool, optional) – Whether this op should be an async op

  • device_ids ([int], optional) – List of device/GPU ids. Valid only for NCCL backend.

Returns

Async work handle, if async_op is set to True.

broadcast(tensor, src, async_op=False)[source]

tensor must have the same number of elements in all processes participating in the collective.

Parameters
  • tensor (Tensor) – Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise.

  • src (int) – Source rank.

  • async_op (bool, optional) – Whether this op should be an async op

Returns

Async work handle, if async_op is set to True.

broadcast_multigpu(tensor_list, src, async_op=False, src_tensor=0)[source]

per node.

tensor must have the same number of elements in all the GPUs from all processes participating in the collective. each tensor in the list must be on a different GPU

Only nccl and gloo backend are currently supported tensors should only be GPU tensors

Parameters
  • tensor_list (List[Tensor]) – Tensors that participate in the collective operation. If src is the rank, then the specified src_tensor element of tensor_list (tensor_list[src_tensor]) will be broadcast to all other tensors (on different GPUs) in the src process and all tensors in tensor_list of other non-src processes. You also need to make sure that len(tensor_list) is the same for all the distributed processes calling this function.

  • src (int) – Source rank.

  • async_op (bool, optional) – Whether this op should be an async op

  • src_tensor (int, optional) – Source tensor rank within tensor_list

Returns

Async work handle, if async_op is set to True.

destroy()[source]

Destroy this collective communication group.

gather(tensor, gather_list, dst=0, async_op=False)[source]

Gathers a list of tensors in a single process.

Parameters
  • tensor (Tensor) – Input tensor.

  • gather_list (list[Tensor], optional) – List of appropriately-sized tensors to use for gathered data (default is None, must be specified on the destination rank)

  • dst (int, optional) – Destination rank (default is 0)

  • async_op (bool, optional) – Whether this op should be an async op

Returns

Async work handle, if async_op is set to True.

irecv(tensor, src=None, tag=0)[source]
Returns

An object you can call .wait() on, .wait() will return the source rank.

isend(tensor, dst, tag=0)[source]

Sends a tensor asynchronously.

Parameters
  • tensor (Tensor) – Tensor to send.

  • dst (int) – Destination rank.

  • tag (int, optional) – Tag to match send with remote recv

Returns

A distributed request object.

recv(tensor, src=None, tag=0)[source]

Receives a tensor synchronously.

Parameters
  • tensor (Tensor) – Tensor to fill with received data.

  • src (int, optional) – Source rank. Will receive from any process if unspecified.

  • tag (int, optional) – Tag to match recv with remote send

Returns

Sender rank

reduce(tensor, dst, op=<ReduceOp.SUM: 0>, async_op=False)[source]

Reduces the tensor data across all machines.

Only the process with rank dst is going to receive the final result.

Parameters
  • tensor (Tensor) – Input and output of the collective. The function operates in-place.

  • dst (int) – Destination rank

  • op (optional) – One of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions.

  • async_op (bool, optional) – Whether this op should be an async op

Returns

Async work handle, if async_op is set to True.

reduce_multigpu(tensor_list, dst, op=<ReduceOp.SUM: 0>, async_op=False, dst_tensor=0)[source]

Reduces the tensor data on multiple GPUs across all machines. Each tensor in tensor_list should reside on a separate GPU

Only the GPU of tensor_list[dst_tensor] on the process with rank dst is going to receive the final result.

Only nccl backend is currently supported tensors should only be GPU tensors

Parameters
  • tensor_list (List[Tensor]) – Input and output GPU tensors of the collective. The function operates in-place. You also need to make sure that len(tensor_list) is the same for all the distributed processes calling this function.

  • dst (int) – Destination rank

  • op (optional) – One of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions.

  • async_op (bool, optional) – Whether this op should be an async op

  • dst_tensor (int, optional) – Destination tensor rank within tensor_list

Returns

Async work handle, if async_op is set to True. None, otherwise

scatter(tensor, scatter_list=None, src=0, async_op=False)[source]

Each process will receive exactly one tensor and store its data in the tensor argument.

Parameters
  • tensor (Tensor) – Output tensor.

  • scatter_list (list[Tensor]) – List of tensors to scatter (default is None, must be specified on the source rank)

  • src (int) – Source rank (default is 0)

  • async_op (bool, optional) – Whether this op should be an async op

Returns

Async work handle, if async_op is set to True.

send(tensor, dst, tag=0)[source]

Sends a tensor synchronously.

Parameters
  • tensor (Tensor) – Tensor to send.

  • dst (int) – Destination rank.

  • tag (int, optional) – Tag to match send with remote recv

size()[source]
Returns

collective group size.

class machin.parallel.distributed.RpcGroup(group_name, group_members, first_create=True)[source]

Bases: object

barrier(*args, **kwargs)[source]
deregister(*args, **kwargs)[source]
destroy(*args, **kwargs)[source]
static get_cur_name()[source]
Return type

str

get_group_members()[source]
Returns

A list of group members.

Return type

List[str]

get_group_name()[source]
Returns

Name of this group.

Return type

str

get_paired(key)[source]
Parameters

key (Any) – Key of the paired value, in this group.

Returns

A RRef to the paired value.

Raises

KeyError if not found.

is_member(target=None)[source]

Check whether target name is a group member.

Parameters

target (str) –

Return type

bool

is_paired(key)[source]

Check whether a key has been paired to the current group.

Parameters

key (Any) – A key which uniquely identifies this value in this group. The name only needs to be unique for this value in this group.

is_registered(key)[source]

Check whether a service has been registered in the current group.

Parameters

key (Any) – A key which uniquely identifies this service in this group. The name only needs to be unique for this service in this group.

pair(*args, **kwargs)[source]
register(*args, **kwargs)[source]
registered_async(key, args=(), kwargs=None)[source]
Parameters
  • key (Any) – Key of the registered service, in this group.

  • args – Service arguments.

  • kwargs – Service keyword arguments.

Returns

A future object you can call wait()``on. ``wait() will block the thread until execution is completed, and will return the result returned by the service.

Raises

KeyError if service is not found.

registered_remote(key, args=(), kwargs=None)[source]
Parameters
  • key (Any) – Key of the registered service, in this group.

  • args – Service arguments.

  • kwargs – Service keyword arguments.

Returns

A RRef object pointing to the result returned by the service.

Raises

KeyError if service is not found.

registered_sync(key, args=(), kwargs=None)[source]
Parameters
  • key (Any) – Key of the registered service, in this group.

  • args – Service arguments.

  • kwargs – Service keyword arguments.

Returns

Result returned by the service.

Raises

KeyError if service is not found.

remote(to, func, timeout=- 1, args=(), kwargs=None)[source]

Make a remote call to run func on worker to and return an RRef to the result value immediately. Worker to will be the owner of the returned RRef, and the worker calling remote is a user. The owner manages the global reference count of its RRef, and the owner RRef is only destructed when globally there are no living references to it.

Parameters
  • to (str or WorkerInfo or int) – name/rank/WorkerInfo of the destination worker.

  • func (callable) – a callable function, such as Python callables, builtin operators (e.g. add()) and annotated TorchScript functions.

  • args (tuple) – the argument tuple for the func invocation.

  • kwargs (dict) – is a dictionary of keyword arguments for the func invocation.

  • timeout (float, optional) – timeout in seconds for this remote call. If the creation of this RRef on worker to is not successfully processed on this worker within this timeout, then the next time there is an attempt to use the RRef (such as to_here()), a timeout will be raised indicating this failure. A value of 0 indicates an infinite timeout, i.e. a timeout error will never be raised. If not provided, the default value set during initialization or with _set_rpc_timeout is used.

Returns

A user RRef instance to the result value. Use the blocking API torch.distributed.rpc.RRef.to_here() to retrieve the result value locally.

Warning

Using GPU tensors as arguments or return values of func is not supported since we don’t support sending GPU tensors over the wire. You need to explicitly copy GPU tensors to CPU before using them as arguments or return values of func.

Warning

The remote API does not copy storages of argument tensors until sending them over the wire, which could be done by a different thread depending on the RPC backend type. The caller should make sure that the contents of those tensors stay intact until the returned RRef is confirmed by the owner, which can be checked using the torch.distributed.rpc.RRef.confirmed_by_owner() API.

Warning

Errors such as timeouts for the remote API are handled on a best-effort basis. This means that when remote calls initiated by remote fail, such as with a timeout error, we take a best-effort approach to error handling. This means that errors are handled and set on the resulting RRef on an asynchronous basis. If the RRef has not been used by the application before this handling (such as to_here or fork call), then future uses of the RRef will appropriately raise errors. However, it is possible that the user application will use the RRef before the errors are handled. In this case, errors may not be raised as they have not yet been handled.

Example::

Make sure that MASTER_ADDR and MASTER_PORT are set properly API for more details. For example,

>>> export MASTER_ADDR=localhost
>>> export MASTER_PORT=5678

Then run the following code in two different processes:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>> x = rref1.to_here() + rref2.to_here()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

Below is an example of running a TorchScript function using RPC.

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(t1, t2):
>>>    return torch.add(t1, t2)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rref.to_here()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
rpc_async(to, func, timeout=- 1, args=(), kwargs=None)[source]

Make a non-blocking RPC call to run function func on worker to. RPC messages are sent and received in parallel to execution of Python code. This method is thread-safe. This method will immediately return a Future that can be awaited on.

Parameters
  • to (str or WorkerInfo or int) – name/rank/WorkerInfo of the destination worker.

  • func (callable) – a callable function, such as Python callables, builtin operators (e.g. add()) and annotated TorchScript functions.

  • args (tuple) – the argument tuple for the func invocation.

  • kwargs (dict) – is a dictionary of keyword arguments for the func invocation.

  • timeout (float, optional) – timeout in seconds to use for this RPC. If the RPC does not complete in this amount of time, an exception indicating it has timed out will be raised. A value of 0 indicates an infinite timeout, i.e. a timeout error will never be raised. If not provided, the default value set during initialization or with _set_rpc_timeout is used.

Returns

Returns a Future object that can be waited on. When completed, the return value of func on args and kwargs can be retrieved from the Future object.

Warning

Using GPU tensors as arguments or return values of func is not supported since we don’t support sending GPU tensors over the wire. You need to explicitly copy GPU tensors to CPU before using them as arguments or return values of func.

Warning

The rpc_async API does not copy storages of argument tensors until sending them over the wire, which could be done by a different thread depending on the RPC backend type. The caller should make sure that the contents of those tensors stay intact until the returned Future completes.

Example::

Make sure that MASTER_ADDR and MASTER_PORT are set properly API for more details. For example,

>>> export MASTER_ADDR=localhost
>>> export MASTER_PORT=5678

Then run the following code in two different processes:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
>>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
>>> result = fut1.wait() + fut2.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

Below is an example of running a TorchScript function using RPC.

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(t1, t2):
>>>    return torch.add(t1, t2)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3))
>>> ret = fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
rpc_sync(to, func, timeout=- 1, args=(), kwargs=None)[source]

Make a blocking RPC call to run function func on worker to. RPC messages are sent and received in parallel to execution of Python code. This method is thread-safe.

Parameters
  • to (str or WorkerInfo or int) – name/rank/WorkerInfo of the destination worker.

  • func (callable) – a callable function, such as Python callables, builtin operators (e.g. add()) and annotated TorchScript functions.

  • args (tuple) – the argument tuple for the func invocation.

  • kwargs (dict) – is a dictionary of keyword arguments for the func invocation.

  • timeout (float, optional) – timeout in seconds to use for this RPC. If the RPC does not complete in this amount of time, an exception indicating it has timed out will be raised. A value of 0 indicates an infinite timeout, i.e. a timeout error will never be raised. If not provided, the default value set during initialization or with _set_rpc_timeout is used.

Returns

Returns the result of running func with args and kwargs.

Warning

Using GPU tensors as arguments or return values of func is not supported since we don’t support sending GPU tensors over the wire. You need to explicitly copy GPU tensors to CPU before using them as arguments or return values of func.

Example::

Make sure that MASTER_ADDR and MASTER_PORT are set properly API for more details. For example,

>>> export MASTER_ADDR=localhost
>>> export MASTER_PORT=5678

Then run the following code in two different processes:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

Below is an example of running a TorchScript function using RPC.

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(t1, t2):
>>>    return torch.add(t1, t2)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
size()[source]

Get the number of members in group.

unpair(*args, **kwargs)[source]
machin.parallel.distributed.World(*args, **kwargs)[source]
machin.parallel.distributed.get_world()[source]
machin.parallel.distributed.get_cur_rank()[source]
Returns

Current real process rank.

machin.parallel.distributed.get_cur_name()[source]
Returns

Current real process name.

machin.parallel.distributed.is_world_initialized()[source]
machin.parallel.distributed.debug_with_process(message)[source]

server

class machin.parallel.server.OrderedServerBase[source]

Bases: abc.ABC

Descendent classes of OrderedServer does not have to guarantee strong consistency, that is, even if OrderedServerBase.push_service`() has returned True, there are possibilities that these acknowledged push are discarded.

abstract pull(key, version=None)[source]

Pull a value with the specified version in key.

Parameters
  • key – Key.

  • version – Target version, if None, then the newest version of value of key will be pulled.

Returns

None if version is not found, auto-deleted, or key is not found, otherwise returns value with the specified version in key, and then version

abstract push(key, value, version, prev_version)[source]

Push a new version of value in key to the ordered server.

Note

If version = prev_version then there is no order guarantee. But you may exploit this feature.

Parameters
  • key – Key.

  • value – value.

  • version – New version.

  • prev_version – Previous version.

Returns

True if success, and False if not.

class machin.parallel.server.OrderedServerSimple(server_name, group)[source]

Bases: machin.parallel.server.ordered_server.OrderedServerBase

Parameters
  • server_name (str) –

  • group (machin.parallel.distributed.world.RpcGroup) –

pull(key, version=None)[source]

Pull a value with the specified version in key.

Parameters
  • key – Key.

  • version – Target version, if None, then the newest version of value of key will be pulled.

Returns

None if version is not found, auto-deleted, or key is not found, otherwise returns value with the specified version in key, and then version

push(key, value, version, prev_version)[source]

Push a new version of value in key to the ordered server.

Note

If version = prev_version then there is no order guarantee. But you may exploit this feature.

Parameters
  • key – Key.

  • value – value.

  • version – New version.

  • prev_version – Previous version.

Returns

True if success, and False if not.

class machin.parallel.server.OrderedServerSimpleImpl(server_name, group, version_depth=1, **__)[source]

Bases: object

A simple key-value server, with strict ordered update

This init function must be only invoked on the runner process, and the runner process must be a member process of group.

Parameters
  • server_name (str) – Name of this server, used to registered the server as a paired class of group.

  • group (machin.parallel.distributed.world.RpcGroup) – Rpc group where server locates.

  • server_runner – Name of the process serving the ordered server. By default is the first member of the group.

  • version_depth (int) – Storage depth of old versions of the same key. If depth = 1, then only the newest version of the key will be saved.

class machin.parallel.server.PushPullGradServer(server_name, group, model_name, secondary_reducers, o_server)[source]

Bases: object

Parameters
  • server_name (str) –

  • group (machin.parallel.distributed.world.RpcGroup) –

  • model_name (str) –

  • secondary_reducers (List[str]) –

  • o_server (machin.parallel.server.ordered_server.OrderedServerBase) –

pull(model)[source]

Pull the newest model. Its gradients will be cleared.

Parameters

model (torch.nn.modules.module.Module) – Model to push.

Returns

True if pull succeeded, else False.

push(model)[source]
Push the gradients of your model, then pull the newest parameters.

Its gradients will be cleared.

Parameters

model (torch.nn.modules.module.Module) – Model to push.

Returns

True if push succeeded, else False.

class machin.parallel.server.PushPullGradServerImpl(server_name, group, model_name='model', primary_reducer=None, secondary_reducers=None, o_server=None, reduce_method='sum', reduce_device='cpu', reduce_batch_size=4, max_queue_size=64)[source]

Bases: object

A simple parameter server, which synchronize model parameters by pushing gradients and pulling back new parameters, no strict order is guaranteed.

Warning

DistributedDataParallel is not supported. since we cannot load state dictionary after creation.

Note

You should initialize PushPullGradServer on all members of secondary_reducers, and primary_reducer. Both of them should be members of the group.

Note

Internally the primary reducer will push updated versions to the ordered server.

Hint

Reduction is performed in a tree fashion:

  1. In the first step, clients will push new gradients to a random secondary reducer, and the secondary reducer will perform the first reduction pass, then secondary reducers will push their results to the primary reducer.

  2. In the second step, the primary reducer will reduce results from the secondary reducer to get the final reduced gradient dictionary (has the same structure as state_dict), and assign gradients to its managed model, and perform the optimization.

  3. In the final step, the primary reducer will push the final model to the model server group, then clients can pull the newest model.

Parameters
  • server_name (str) – Name of this server, used to registered the server as a paired class of group.

  • group (machin.parallel.distributed.world.RpcGroup) – Server group.

  • model_name (str) – Name of the managed model in the ordered server, only needed if server needs such a identifier. The default ordered server does not require this.

  • primary_reducer (str) – Name of the process serving as the primary reducer, which collects reduced gradients from secondary reducers and perform the final reduction.

  • secondary_reducers (List[str]) – Name of the process serving as secondary reducers.

  • o_server (machin.parallel.server.ordered_server.OrderedServerBase) – Custom ordered server accessor. By default, the ordered server is a OrderedServerSimple hosted on the primary reducer.

  • reduce_method (str) – “mean” or “sum”

  • reduce_device (Union[torch.device, str]) – Device to perform reduction, by default it is “cpu”.

  • reduce_batch_size (int) – Size of a single reduction batch, server will wait until the number of requests in the reduction queue have reached this size.

  • max_queue_size (int) – Maximum reduction request queue size.

manage_model(model, optimizer, lr_scheduler=None)[source]

Let the main reducer manage your model. Must be called before start.

Warning

Make sure that the managed model is different from the model you use in your algorithms such as A3C!

Parameters
  • model (torch.nn.modules.module.Module) – Model to manage.

  • optimizer (Any) – Optimizer of your model. you should initialize it first:

  • optimizer = Adam (>>>) –

  • lr_scheduler (Any) – learning rate scheduler, you should initialize it first:

  • scheduler = LambdaLR (>>>) –

Raises

RuntimeError if current rpc role is not the main reducer.

start()[source]
stop()[source]
watch()[source]
REDUCE_MASTER = 0
REDUCE_SLAVE = 1
class machin.parallel.server.PushPullModelServer(model_name, o_server=None)[source]

Bases: object

Create an accessor to the services provided by PushPullModelServerImpl

Parameters
  • model_name (str) – Name of the managed model in the ordered server, only needed if server needs such a identifier. The default ordered server does not require this.

  • o_server (machin.parallel.server.ordered_server.OrderedServerBase) – Ordered server accessor.

pull(model)[source]

Pull the newest state dict of your model and update its parameters and pp_version. Gradients will not be cleared.

Parameters

model (torch.nn.modules.module.Module) – Model to pull.

Returns

True if pull succeeded, else False.

push(model, pull_on_fail=True)[source]

Try to push a model to the ordered server, if failed, the newest model will be automatically pulled and its parameters will be assigned to model. Gradients will not be cleared.

Parameters
  • model (torch.nn.modules.module.Module) – Model to push.

  • pull_on_fail – Pull the newest parameters if push failed.

Returns

True if push succeeded, else False.

class machin.parallel.server.PushPullModelServerImpl(server_name, group, model_name='model', o_server=None)[source]

Bases: object

A simple parameter server, which synchronize model parameters by pushing and pulling all parameters and maintaining a strict ordered version chain.

Warning

Only one model is supported.

This init function must be only invoked on the runner process, and the runner process must be a member process of group.

Parameters
  • server_name (str) – Name of this server, used to registered the server as a paired class of group.

  • group (machin.parallel.distributed.world.RpcGroup) – RpcGroup of the default server OrderedServerSimple mutually exclusive with o_server

  • model_name (str) – Name of the managed model in the ordered server, only needed if server needs such a identifier. The default ordered server does not require this.

  • o_server (machin.parallel.server.ordered_server.OrderedServerBase) – Custom ordered server accessor.

assigner

class machin.parallel.assigner.ModelAssigner(models, model_connection, devices=None, model_size_multiplier=2, max_mem_ratio=0.5, cpu_weight=0, connection_weight=2, size_match_weight=0.01, complexity_match_weight=1, entropy_weight=1, iterations=500, update_rate=0.01, gpu_gpu_distance=1, cpu_gpu_distance=10, move_models=True)[source]

Bases: object

Assigner for pytorch modules.

Assign models to different devices. In the scope of a single process. Assigner assumes all GPUs have the same processing power.

Assignment is based on four aspects:

  1. Distance and model connections. Connection is usually indicated by the amount of data transmitted between two models.

  2. Compute complexity.

  3. Model size.

  4. Entropy.

Four aspects are controlled by four weights:

  1. connection_weight, assigner will try to reduce the total distance * connection if this weight is larger.

  2. size_match_weight, this weight controls the total memory space used on a single device, only works if total assigned memory of models exceeds allowed device memory size (internally it uses a relu activation), the larger, the tighter and more restricted the fit.

  3. complexity_match_weight, this weights balance the model computation cost across devices, assigner will try to even the computation cost / compute power ratio for each device if this weight is larger.

  4. entropy_weight, this weight minimize the uncertainty of model placement probability, so model i will have a close to 1 probability of locating on some device j if this weight is larger.

Assignment uses gradient descent to compute the probability matrix of each model i locating on each available device j.

Note

When the sum of your model size is very close to the capacity of your device memory, ModelAssigner does not respond very well to the size_match_weight, therefore, please consider about increasing model_size_multiplier or decreasing max_mem_ratio.

Parameters
  • models (List[torch.nn.modules.module.Module]) – Models to assign.

  • model_connection (Dict[Tuple[int, int], int]) – Connection weight between modules. Must be positive

  • devices (List[Union[torch.device, str]]) – Available devices.

  • model_size_multiplier – Size multiplier of models, used to reserve enough space for models,

  • max_mem_ratio – Maximum percent of memory allowed.

  • cpu_weight – Weight of cpu. Relative to the computing power of one GPU. By default it is 0 so no computation will be performed on CPU. Must be positive

  • connection_weight – Weight of connection between models.

  • size_match_weight – Weight of size match.

  • complexity_match_weight – Weight of complexity match.

  • entropy_weight – Weight of entropy.

  • iterations – Number of optimization iterations.

  • update_rate – Learning rate of the adam optimizer.

  • gpu_gpu_distance – Estimated distance cost between gpu-gpu. Must be positive

  • cpu_gpu_distance – Estimated distance cost between cpu-gpu. Must be positive

  • move_models – Whether to automatically move the models after assignment.

static optimize_placement(optimizer, placement, model_size, size_capacity, model_complexity, complexity_capacity, model_connection, device_distance, connection_weight, size_match_weight, complexity_match_weight, entropy_weight)[source]

Suppose there are n models to place and m devices available.

Parameters
  • optimizer – optimizer of placement

  • placement (torch.Tensor) – shape [n, m]

  • model_size (torch.Tensor) – shape [1, n]

  • size_capacity (torch.Tensor) – shape [1, m]

  • model_complexity (torch.Tensor) – shape [1, n]

  • complexity_capacity (torch.Tensor) – shape [1, m]

  • model_connection (torch.Tensor) – shape [n, n]

  • device_distance (torch.Tensor) – shape [m, m]

  • connection_weight (float) – Weight of connection between models.

  • size_match_weight (float) – Weight of size match.

  • complexity_match_weight (float) – Weight of complexity match.

  • entropy_weight (float) – weight of entropy.

property assignment

List[t.device]: Assigned devices for each model in your model list.

class machin.parallel.assigner.ModelSizeEstimator(model, size_multiplier=2)[source]

Bases: object

Size estimator for pytorch modules.

Estimates the size of PyTorch models in memory.

Note

This estimator can only estimate the total size of parameters and buffers. Therefore we need to multiply the raw estimated size with a correction coefficient to reserve enough space for models.

Parameters
  • model (torch.nn.modules.module.Module) – Model to be estimated.

  • size_multiplier – Model estimated size will be multiplied with this value, to ensure enough space will be reserved to contain your model and inputs.

estimate_size()[source]

Estimate model size in memory in megabytes.

get_buffer_sizes()[source]

Get sizes of all buffers in model in mega bytes.

Return type

float

get_parameter_sizes()[source]

Get sizes of all parameters in model in mega bytes.

Return type

float

pickle

class machin.parallel.pickle.Pickler(file, recurse=False, copy_tensor=False)[source]

Bases: dill._dill.Pickler

Note

Picklers shares “.dispatch” among instances, and owns “dispatch_table” per instance.

The base Pickler (not dill, from builtin pickle library), will first look up the default dump method in “.dispatch”, if no valid method is found, it will try to find a custom dump method in “.dispatch_table”.

This takes a binary file for writing a pickle data stream.

The optional protocol argument tells the pickler to use the given protocol; supported protocols are 0, 1, 2, 3 and 4. The default protocol is 3; a backward-incompatible protocol designed for Python 3.

Specifying a negative protocol version selects the highest protocol version supported. The higher the protocol used, the more recent the version of Python needed to read the pickle produced.

The file argument must have a write() method that accepts a single bytes argument. It can thus be a file object opened for binary writing, an io.BytesIO instance, or any other custom object that meets this interface.

If fix_imports is True and protocol is less than 3, pickle will try to map the new Python 3 names to the old module names used in Python 2, so that the pickle data stream is readable with Python 2.

machin.parallel.pickle.dumps(obj, recurse=False, copy_tensor=True)[source]

Convert objects to bytes. Works for cpu and gpu tensors.

Warning

Till pytorch 1.5.0, there is a bug for referenced gpu tensors, which would require users to keep shared gpu tensors during the whole process life and not reassigning / deleting them, however, you may refill them with different values.

See here

Parameters
  • obj – Object to dump.

  • recurse – Enable recursive dumping, enable this to dump local functions and lambdas.

  • copy_tensor – Whether to dump tensors, storage as a full copy. If it is set to “False”, then dumped tensors must either locate on GPUs or in shared memory.

Returns

Bytes.

machin.parallel.pickle.mark_static_module(module)[source]

Some modules are static, which means they are stateless and will remain the same whether you import it in process A or process B.

If your module contains reference to functions, objects or anything inside a CDLL (usually the reference is a pointer), it is not picklable by dill, and will cause nasty errors, however, by marking this module as “Static”, dill will recognize this module as a builtin module and not saving the states of this module, dill will only save a reference to it in this situation.

Parameters

module (Any) – Some module which imports CDLLs by hand and not using pybind11.

pool

class machin.parallel.pool.CtxPool(processes, initializer=None, initargs=(), maxtasksperchild=None, worker_contexts=None, is_recursive=False, is_daemon=True, is_copy_tensor=True, share_method=None)[source]

Bases: machin.parallel.pool.Pool

Pool with context for each worker. your function must accept a ctx object as your first non-keyword argument.

If worker_contexts is not specified, then ctx will be None.

The length of worker_contexts must be the same as processes

Note

To share “cpu” tensors in shared memory, you must set:

is_copy_tensor=False,
share_method="cpu"

To share “cuda” tensors, you must set:

is_copy_tensor=False,
share_method="cuda"

Note

The default context used in pool is “spawn”, to avoid any issues brought by “fork”. “fork” will only be used if you want to pass cpu tensors in shared memory.

Parameters
  • processes (int) – Number of processes in the pool.

  • initializer – Initializer function executed by the pool/

  • initargs – Args passed to the init function.

  • maxtasksperchild – Maximum number of tasks per worker process.

  • is_recursive – Set to True to support local functions and lambdas.

  • is_daemon – Whether worker processes in the pool are started as daemon processes.

  • is_copy_tensor – Whether to copy tensors or pass tensors by reference to worker processes.

  • share_method – If is_copy_tensor is False, you must specify this argument. “cpu” means you may use cpu tensors in the shared memory, “cuda” means cuda tensors, you can only specify one share method.

class machin.parallel.pool.CtxPoolStorage[source]

Bases: object

This storage class is used by all CtxPool instances. However, since for each worker process, they have different memory spaces, storage is unique for all workers.

storage is accessed on the client process side.

storage = None
class machin.parallel.pool.CtxThreadPool(processes, initializer=None, initargs=(), worker_contexts=None)[source]

Bases: machin.parallel.pool.ThreadPool

Parameters

processes (int) –

apply(func, args=(), kwds=None)[source]

Equivalent of func(*args, **kwds). Pool must be running.

apply_async(func, args=(), kwds=None, callback=None, error_callback=None)[source]

Asynchronous version of apply() method.

imap(func, iterable, chunksize=1)[source]

Equivalent of map() – can be MUCH slower than Pool.map().

imap_unordered(func, iterable, chunksize=1)[source]

Like imap() method but ordering of results is arbitrary.

map(func, iterable, chunksize=None)[source]

Apply func to each element in iterable, collecting the results in a list that is returned.

map_async(func, iterable, chunksize=None, callback=None, error_callback=None)[source]

Asynchronous version of map() method.

starmap(func, iterable, chunksize=None)[source]

Like map() method but the elements of the iterable are expected to be iterables as well and will be unpacked as arguments. Hence func and (a, b) becomes func(a, b).

starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None)[source]

Asynchronous version of starmap() method.

class machin.parallel.pool.P2PPool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, is_recursive=False, is_daemon=True, is_copy_tensor=True, share_method=None)[source]

Bases: machin.parallel.pool.Pool

Note

To share “cpu” tensors in shared memory, you must set:

is_copy_tensor=False,
share_method="cpu"

To share “cuda” tensors, you must set:

is_copy_tensor=False,
share_method="cuda"

Note

The default context used in pool is “spawn”, to avoid any issues brought by “fork”. “fork” will only be used if you want to pass cpu tensors in shared memory.

Parameters
  • processes – Number of processes in the pool.

  • initializer – Initializer function executed by the pool/

  • initargs – Args passed to the init function.

  • maxtasksperchild – Maximum number of tasks per worker process.

  • is_recursive – Set to True to support local functions and lambdas.

  • is_daemon – Whether worker processes in the pool are started as daemon processes.

  • is_copy_tensor – Whether to copy tensors or pass tensors by reference to worker processes.

  • share_method – If is_copy_tensor is False, you must specify this argument. “cpu” means you may use cpu tensors in the shared memory, “cuda” means cuda tensors, you can only specify one share method.

close()[source]
class machin.parallel.pool.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, is_recursive=False, is_daemon=True, is_copy_tensor=True, share_method=None)[source]

Bases: multiprocessing.pool.Pool

Enhanced multiprocessing pool for pytorch, provides:
  1. Support for lambdas and local functions.

  2. Ability to select the tensor serialize scheme.

Note

To share “cpu” tensors in shared memory, you must set:

is_copy_tensor=False,
share_method="cpu"

To share “cuda” tensors, you must set:

is_copy_tensor=False,
share_method="cuda"

Note

The default context used in pool is “spawn”, to avoid any issues brought by “fork”. “fork” will only be used if you want to pass cpu tensors in shared memory.

Parameters
  • processes – Number of processes in the pool.

  • initializer – Initializer function executed by the pool/

  • initargs – Args passed to the init function.

  • maxtasksperchild – Maximum number of tasks per worker process.

  • is_recursive – Set to True to support local functions and lambdas.

  • is_daemon – Whether worker processes in the pool are started as daemon processes.

  • is_copy_tensor – Whether to copy tensors or pass tensors by reference to worker processes.

  • share_method – If is_copy_tensor is False, you must specify this argument. “cpu” means you may use cpu tensors in the shared memory, “cuda” means cuda tensors, you can only specify one share method.

apply(func, args=(), kwds=None)[source]

Equivalent of func(*args, **kwds). Pool must be running.

apply_async(func, args=(), kwds=None, callback=None, error_callback=None)[source]

Asynchronous version of apply() method.

imap(func, iterable, chunksize=1)[source]

Equivalent of map() – can be MUCH slower than Pool.map().

imap_unordered(func, iterable, chunksize=1)[source]

Like imap() method but ordering of results is arbitrary.

map(func, iterable, chunksize=None)[source]

Apply func to each element in iterable, collecting the results in a list that is returned.

map_async(func, iterable, chunksize=None, callback=None, error_callback=None)[source]

Asynchronous version of map() method.

size()[source]
Returns

The number of workers in pool.

starmap(func, iterable, chunksize=None)[source]

Like map() method but the elements of the iterable are expected to be iterables as well and will be unpacked as arguments. Hence func and (a, b) becomes func(a, b).

starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None)[source]

Asynchronous version of starmap() method.

class machin.parallel.pool.ThreadPool(processes=None, initializer=None, initargs=())[source]

Bases: multiprocessing.pool.ThreadPool

A typical thread pool.

size()[source]
Returns

The number of workers in pool.

machin.parallel.pool.proxy_caller(*input_)[source]

Call a serialized function and return results.

machin.parallel.pool.proxy_ctx_caller(*input_)[source]

Call a serialized function with worker context and return results.

machin.parallel.pool.proxy_dumper(recurse, copy_tensor, func, args_list)[source]

Serialize a function so it can be called.

Returns

List[function string, arguments…]

queue

class machin.parallel.queue.SimpleQueue(*, ctx=None, copy_tensor=False)[source]

Bases: object

A simple single direction queue for inter-process communications. There could be multiple receivers and multiple senders on each side.

Parameters
  • ctx – Multiprocessing context, you can get this using get_context

  • copy_tensor – Set the queue to send a fully serialized tensor if True, and only a stub of reference if False.

See also

dump_tensor()

close()[source]
empty()[source]
Returns

Whether the queue is empty or not.

get(timeout=None)[source]

Get an object from the queue. This api is required by multiprocessing.pool to perform inter-process communication.

Note

This api is used by sub-processes in pool to get tasks and work.

Returns

Any object.

put(obj)[source]

Put an object into the queue. This api is required by multiprocessing.pool to perform inter-process communication.

Note

This api is used by sub-processes in pool to put results and respond.

Parameters

obj (Any) – Any object.

quick_get(timeout=None)[source]

Get an object from the queue.

Note

this api is used by the result manager (Pool._result_handler) thread to get results from the queue, since it is single threaded, there is no need to use locks, and therefore quicker.

Returns

Any object.

quick_put(obj)[source]

Put an object into the queue.

Note: this api is used by the pool manager (Pool._task_handler)

thread to put tasks into the queue, since it is single threaded, there is no need to use locks, and therefore quicker.

Parameters

obj (Any) – Any object.