machin.parallel¶
distributed¶
-
class
machin.parallel.distributed.
CollectiveGroup
(group, current_relative_rank)[source]¶ Bases:
object
A thin wrapper of collective communication primitives of
torch.distributed
, the only difference is thatirecv
now supports to recv from any srcDo not do it your self, use
create_collective_group()
instead.-
all_gather
(tensor_list, tensor, async_op=False)[source]¶ Complex tensors are supported.
- Parameters
tensor_list (list[Tensor]) – Output list. It should contain correctly-sized tensors to be used for output of the collective.
tensor (Tensor) – Tensor to be broadcast from current process.
async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
Examples
>>> # All tensors below are of torch.int64 dtype. >>> tensor_list = [torch.zero(2, dtype=torch.int64) for _ in range(2)] >>> tensor_list [tensor([0, 0]), tensor([0, 0])] # Rank 0 and 1 >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2]), tensor([3, 4])] # Rank 0 [tensor([1, 2]), tensor([3, 4])] # Rank 1
>>> # All tensors below are of torch.cfloat dtype. >>> tensor_list = [torch.zero(2, dtype=torch.cfloat) for _ in range(2)] >>> tensor_list [tensor([0.+0.j, 0.+0.j]), tensor([0.+0.j, 0.+0.j])] # Rank 0 and 1 >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 0 [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 1
-
all_gather_multigpu
(output_tensor_lists, input_tensor_list, async_op=False)[source]¶ Each tensor in
tensor_list
should reside on a separate GPUOnly nccl backend is currently supported tensors should only be GPU tensors
Complex tensors are supported.
- Parameters
output_tensor_lists (List[List[Tensor]]) –
Output lists. It should contain correctly-sized tensors on each GPU to be used for output of the collective, e.g.
output_tensor_lists[i]
contains the all_gather result that resides on the GPU ofinput_tensor_list[i]
.Note that each element of
output_tensor_lists
has the size ofworld_size * len(input_tensor_list)
, since the function all each element ofoutput_tensor_lists[i]
, note thatinput_tensor_list[j]
of rank k will be appear inoutput_tensor_lists[i][k * world_size + j]
Also note that
len(output_tensor_lists)
, and the size of each element inoutput_tensor_lists
(each element is a list, thereforelen(output_tensor_lists[i])
) need to be the same for all the distributed processes calling this function.input_tensor_list (List[Tensor]) – List of tensors(on different GPUs) to be broadcast from current process. Note that
len(input_tensor_list)
needs to be the same for all the distributed processes calling this function.async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
all_reduce
(tensor, op=<ReduceOp.SUM: 0>, async_op=False)[source]¶ Reduces the tensor data across all machines in such a way that all get the final result.
After the call
tensor
is going to be bitwise identical in all processes.Complex tensors are supported.
- Parameters
tensor (Tensor) – Input and output of the collective. The function operates in-place.
op (optional) – One of the values from
torch.distributed.ReduceOp
enum. Specifies an operation used for element-wise reductions.async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
Examples
>>> # All tensors below are of torch.int64 type. >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6]) # Rank 0 tensor([4, 6]) # Rank 1
>>> # All tensors below are of torch.cfloat type. >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j]) # Rank 0 tensor([4.+4.j, 6.+6.j]) # Rank 1
-
all_reduce_multigpu
(tensor_list, op=<ReduceOp.SUM: 0>, async_op=False)[source]¶ Reduces the tensor data across all machines in such a way that all get the final result. This function reduces a number of tensors on every node, while each tensor resides on different GPUs. Therefore, the input tensor in the tensor list needs to be GPU tensors. Also, each tensor in the tensor list needs to reside on a different GPU.
After the call, all
tensor
intensor_list
is going to be bitwise identical in all processes.Complex tensors are supported.
Only nccl and gloo backend is currently supported tensors should only be GPU tensors
- Parameters
list (tensor) – List of input and output tensors of the collective. The function operates in-place and requires that each tensor to be a GPU tensor on different GPUs. You also need to make sure that
len(tensor_list)
is the same for all the distributed processes calling this function.op (optional) – One of the values from
torch.distributed.ReduceOp
enum. Specifies an operation used for element-wise reductions.async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
barrier
(async_op=False)[source]¶ Synchronizes all processes.
if async_op is False, or if async work handle is called on wait().
- Parameters
async_op (bool, optional) – Whether this op should be an async op
device_ids ([int], optional) – List of device/GPU ids. Valid only for NCCL backend.
- Returns
Async work handle, if async_op is set to True.
-
broadcast
(tensor, src, async_op=False)[source]¶ tensor
must have the same number of elements in all processes participating in the collective.- Parameters
tensor (Tensor) – Data to be sent if
src
is the rank of current process, and tensor to be used to save received data otherwise.src (int) – Source rank.
async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
broadcast_multigpu
(tensor_list, src, async_op=False, src_tensor=0)[source]¶ per node.
tensor
must have the same number of elements in all the GPUs from all processes participating in the collective. each tensor in the list must be on a different GPUOnly nccl and gloo backend are currently supported tensors should only be GPU tensors
- Parameters
tensor_list (List[Tensor]) – Tensors that participate in the collective operation. If
src
is the rank, then the specifiedsrc_tensor
element oftensor_list
(tensor_list[src_tensor]
) will be broadcast to all other tensors (on different GPUs) in the src process and all tensors intensor_list
of other non-src processes. You also need to make sure thatlen(tensor_list)
is the same for all the distributed processes calling this function.src (int) – Source rank.
async_op (bool, optional) – Whether this op should be an async op
src_tensor (int, optional) – Source tensor rank within
tensor_list
- Returns
Async work handle, if async_op is set to True.
-
gather
(tensor, gather_list, dst=0, async_op=False)[source]¶ Gathers a list of tensors in a single process.
- Parameters
tensor (Tensor) – Input tensor.
gather_list (list[Tensor], optional) – List of appropriately-sized tensors to use for gathered data (default is None, must be specified on the destination rank)
dst (int, optional) – Destination rank (default is 0)
async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
irecv
(tensor, src=None, tag=0)[source]¶ - Returns
An object you can call .wait() on, .wait() will return the source rank.
-
isend
(tensor, dst, tag=0)[source]¶ Sends a tensor asynchronously.
- Parameters
tensor (Tensor) – Tensor to send.
dst (int) – Destination rank.
tag (int, optional) – Tag to match send with remote recv
- Returns
A distributed request object.
-
recv
(tensor, src=None, tag=0)[source]¶ Receives a tensor synchronously.
- Parameters
tensor (Tensor) – Tensor to fill with received data.
src (int, optional) – Source rank. Will receive from any process if unspecified.
tag (int, optional) – Tag to match recv with remote send
- Returns
Sender rank
-
reduce
(tensor, dst, op=<ReduceOp.SUM: 0>, async_op=False)[source]¶ Reduces the tensor data across all machines.
Only the process with rank
dst
is going to receive the final result.- Parameters
tensor (Tensor) – Input and output of the collective. The function operates in-place.
dst (int) – Destination rank
op (optional) – One of the values from
torch.distributed.ReduceOp
enum. Specifies an operation used for element-wise reductions.async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
reduce_multigpu
(tensor_list, dst, op=<ReduceOp.SUM: 0>, async_op=False, dst_tensor=0)[source]¶ Reduces the tensor data on multiple GPUs across all machines. Each tensor in
tensor_list
should reside on a separate GPUOnly the GPU of
tensor_list[dst_tensor]
on the process with rankdst
is going to receive the final result.Only nccl backend is currently supported tensors should only be GPU tensors
- Parameters
tensor_list (List[Tensor]) – Input and output GPU tensors of the collective. The function operates in-place. You also need to make sure that
len(tensor_list)
is the same for all the distributed processes calling this function.dst (int) – Destination rank
op (optional) – One of the values from
torch.distributed.ReduceOp
enum. Specifies an operation used for element-wise reductions.async_op (bool, optional) – Whether this op should be an async op
dst_tensor (int, optional) – Destination tensor rank within
tensor_list
- Returns
Async work handle, if async_op is set to True. None, otherwise
-
scatter
(tensor, scatter_list=None, src=0, async_op=False)[source]¶ Each process will receive exactly one tensor and store its data in the
tensor
argument.- Parameters
tensor (Tensor) – Output tensor.
scatter_list (list[Tensor]) – List of tensors to scatter (default is None, must be specified on the source rank)
src (int) – Source rank (default is 0)
async_op (bool, optional) – Whether this op should be an async op
- Returns
Async work handle, if async_op is set to True.
-
-
class
machin.parallel.distributed.
RpcGroup
(group_name, group_members, first_create=True)[source]¶ Bases:
object
-
get_paired
(key)[source]¶ - Parameters
key (Any) – Key of the paired value, in this group.
- Returns
A RRef to the paired value.
- Raises
KeyError if not found. –
-
is_member
(target=None)[source]¶ Check whether target name is a group member.
- Parameters
target (str) –
- Return type
bool
-
is_paired
(key)[source]¶ Check whether a key has been paired to the current group.
- Parameters
key (Any) – A key which uniquely identifies this value in this group. The name only needs to be unique for this value in this group.
-
is_registered
(key)[source]¶ Check whether a service has been registered in the current group.
- Parameters
key (Any) – A key which uniquely identifies this service in this group. The name only needs to be unique for this service in this group.
-
registered_async
(key, args=(), kwargs=None)[source]¶ - Parameters
key (Any) – Key of the registered service, in this group.
args – Service arguments.
kwargs – Service keyword arguments.
- Returns
A future object you can call
wait()``on. ``wait()
will block the thread until execution is completed, and will return the result returned by the service.- Raises
KeyError if service is not found. –
-
registered_remote
(key, args=(), kwargs=None)[source]¶ - Parameters
key (Any) – Key of the registered service, in this group.
args – Service arguments.
kwargs – Service keyword arguments.
- Returns
A RRef object pointing to the result returned by the service.
- Raises
KeyError if service is not found. –
-
registered_sync
(key, args=(), kwargs=None)[source]¶ - Parameters
key (Any) – Key of the registered service, in this group.
args – Service arguments.
kwargs – Service keyword arguments.
- Returns
Result returned by the service.
- Raises
KeyError if service is not found. –
-
remote
(to, func, timeout=- 1, args=(), kwargs=None)[source]¶ Make a remote call to run
func
on workerto
and return anRRef
to the result value immediately. Workerto
will be the owner of the returnedRRef
, and the worker callingremote
is a user. The owner manages the global reference count of itsRRef
, and the ownerRRef
is only destructed when globally there are no living references to it.- Parameters
to (str or WorkerInfo or int) – name/rank/
WorkerInfo
of the destination worker.func (callable) – a callable function, such as Python callables, builtin operators (e.g.
add()
) and annotated TorchScript functions.args (tuple) – the argument tuple for the
func
invocation.kwargs (dict) – is a dictionary of keyword arguments for the
func
invocation.timeout (float, optional) – timeout in seconds for this remote call. If the creation of this
RRef
on workerto
is not successfully processed on this worker within this timeout, then the next time there is an attempt to use the RRef (such asto_here()
), a timeout will be raised indicating this failure. A value of 0 indicates an infinite timeout, i.e. a timeout error will never be raised. If not provided, the default value set during initialization or with_set_rpc_timeout
is used.
- Returns
A user
RRef
instance to the result value. Use the blocking APItorch.distributed.rpc.RRef.to_here()
to retrieve the result value locally.
Warning
Using GPU tensors as arguments or return values of
func
is not supported since we don’t support sending GPU tensors over the wire. You need to explicitly copy GPU tensors to CPU before using them as arguments or return values offunc
.Warning
The
remote
API does not copy storages of argument tensors until sending them over the wire, which could be done by a different thread depending on the RPC backend type. The caller should make sure that the contents of those tensors stay intact until the returned RRef is confirmed by the owner, which can be checked using thetorch.distributed.rpc.RRef.confirmed_by_owner()
API.Warning
Errors such as timeouts for the
remote
API are handled on a best-effort basis. This means that when remote calls initiated byremote
fail, such as with a timeout error, we take a best-effort approach to error handling. This means that errors are handled and set on the resulting RRef on an asynchronous basis. If the RRef has not been used by the application before this handling (such asto_here
or fork call), then future uses of theRRef
will appropriately raise errors. However, it is possible that the user application will use theRRef
before the errors are handled. In this case, errors may not be raised as they have not yet been handled.- Example::
Make sure that
MASTER_ADDR
andMASTER_PORT
are set properly API for more details. For example,>>> export MASTER_ADDR=localhost >>> export MASTER_PORT=5678
Then run the following code in two different processes:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
Below is an example of running a TorchScript function using RPC.
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(t1, t2): >>> return torch.add(t1, t2)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
-
rpc_async
(to, func, timeout=- 1, args=(), kwargs=None)[source]¶ Make a non-blocking RPC call to run function
func
on workerto
. RPC messages are sent and received in parallel to execution of Python code. This method is thread-safe. This method will immediately return aFuture
that can be awaited on.- Parameters
to (str or WorkerInfo or int) – name/rank/
WorkerInfo
of the destination worker.func (callable) – a callable function, such as Python callables, builtin operators (e.g.
add()
) and annotated TorchScript functions.args (tuple) – the argument tuple for the
func
invocation.kwargs (dict) – is a dictionary of keyword arguments for the
func
invocation.timeout (float, optional) – timeout in seconds to use for this RPC. If the RPC does not complete in this amount of time, an exception indicating it has timed out will be raised. A value of 0 indicates an infinite timeout, i.e. a timeout error will never be raised. If not provided, the default value set during initialization or with
_set_rpc_timeout
is used.
- Returns
Returns a
Future
object that can be waited on. When completed, the return value offunc
onargs
andkwargs
can be retrieved from theFuture
object.
Warning
Using GPU tensors as arguments or return values of
func
is not supported since we don’t support sending GPU tensors over the wire. You need to explicitly copy GPU tensors to CPU before using them as arguments or return values offunc
.Warning
The
rpc_async
API does not copy storages of argument tensors until sending them over the wire, which could be done by a different thread depending on the RPC backend type. The caller should make sure that the contents of those tensors stay intact until the returnedFuture
completes.- Example::
Make sure that
MASTER_ADDR
andMASTER_PORT
are set properly API for more details. For example,>>> export MASTER_ADDR=localhost >>> export MASTER_PORT=5678
Then run the following code in two different processes:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
Below is an example of running a TorchScript function using RPC.
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(t1, t2): >>> return torch.add(t1, t2)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
-
rpc_sync
(to, func, timeout=- 1, args=(), kwargs=None)[source]¶ Make a blocking RPC call to run function
func
on workerto
. RPC messages are sent and received in parallel to execution of Python code. This method is thread-safe.- Parameters
to (str or WorkerInfo or int) – name/rank/
WorkerInfo
of the destination worker.func (callable) – a callable function, such as Python callables, builtin operators (e.g.
add()
) and annotated TorchScript functions.args (tuple) – the argument tuple for the
func
invocation.kwargs (dict) – is a dictionary of keyword arguments for the
func
invocation.timeout (float, optional) – timeout in seconds to use for this RPC. If the RPC does not complete in this amount of time, an exception indicating it has timed out will be raised. A value of 0 indicates an infinite timeout, i.e. a timeout error will never be raised. If not provided, the default value set during initialization or with
_set_rpc_timeout
is used.
- Returns
Returns the result of running
func
withargs
andkwargs
.
Warning
Using GPU tensors as arguments or return values of
func
is not supported since we don’t support sending GPU tensors over the wire. You need to explicitly copy GPU tensors to CPU before using them as arguments or return values offunc
.- Example::
Make sure that
MASTER_ADDR
andMASTER_PORT
are set properly API for more details. For example,>>> export MASTER_ADDR=localhost >>> export MASTER_PORT=5678
Then run the following code in two different processes:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
Below is an example of running a TorchScript function using RPC.
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(t1, t2): >>> return torch.add(t1, t2)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
-
server¶
-
class
machin.parallel.server.
OrderedServerBase
[source]¶ Bases:
abc.ABC
Descendent classes of OrderedServer does not have to guarantee strong consistency, that is, even if
OrderedServerBase.push_service`()
has returned True, there are possibilities that these acknowledged push are discarded.-
abstract
pull
(key, version=None)[source]¶ Pull a value with the specified
version
inkey
.- Parameters
key – Key.
version – Target version, if
None
, then the newest version of value of key will be pulled.
- Returns
None
if version is not found, auto-deleted, or key is not found, otherwise returns value with the specifiedversion
inkey
, and thenversion
-
abstract
push
(key, value, version, prev_version)[source]¶ Push a new
version
ofvalue
inkey
to the ordered server.Note
If
version = prev_version
then there is no order guarantee. But you may exploit this feature.- Parameters
key – Key.
value – value.
version – New version.
prev_version – Previous version.
- Returns
True
if success, andFalse
if not.
-
abstract
-
class
machin.parallel.server.
OrderedServerSimple
(server_name, group)[source]¶ Bases:
machin.parallel.server.ordered_server.OrderedServerBase
- Parameters
server_name (str) –
group (machin.parallel.distributed.world.RpcGroup) –
-
pull
(key, version=None)[source]¶ Pull a value with the specified
version
inkey
.- Parameters
key – Key.
version – Target version, if
None
, then the newest version of value of key will be pulled.
- Returns
None
if version is not found, auto-deleted, or key is not found, otherwise returns value with the specifiedversion
inkey
, and thenversion
-
push
(key, value, version, prev_version)[source]¶ Push a new
version
ofvalue
inkey
to the ordered server.Note
If
version = prev_version
then there is no order guarantee. But you may exploit this feature.- Parameters
key – Key.
value – value.
version – New version.
prev_version – Previous version.
- Returns
True
if success, andFalse
if not.
-
class
machin.parallel.server.
OrderedServerSimpleImpl
(server_name, group, version_depth=1, **__)[source]¶ Bases:
object
A simple key-value server, with strict ordered update
This init function must be only invoked on the runner process, and the runner process must be a member process of
group
.- Parameters
server_name (str) – Name of this server, used to registered the server as a paired class of
group
.group (machin.parallel.distributed.world.RpcGroup) – Rpc group where server locates.
server_runner – Name of the process serving the ordered server. By default is the first member of the group.
version_depth (int) – Storage depth of old versions of the same key. If
depth = 1
, then only the newest version of the key will be saved.
-
class
machin.parallel.server.
PushPullGradServer
(server_name, group, model_name, secondary_reducers, o_server)[source]¶ Bases:
object
- Parameters
server_name (str) –
group (machin.parallel.distributed.world.RpcGroup) –
model_name (str) –
secondary_reducers (List[str]) –
o_server (machin.parallel.server.ordered_server.OrderedServerBase) –
-
class
machin.parallel.server.
PushPullGradServerImpl
(server_name, group, model_name='model', primary_reducer=None, secondary_reducers=None, o_server=None, reduce_method='sum', reduce_device='cpu', reduce_batch_size=4, max_queue_size=64)[source]¶ Bases:
object
A simple parameter server, which synchronize model parameters by pushing gradients and pulling back new parameters, no strict order is guaranteed.
Warning
DistributedDataParallel
is not supported. since we cannot load state dictionary after creation.Note
You should initialize
PushPullGradServer
on all members ofsecondary_reducers
, andprimary_reducer
. Both of them should be members of thegroup
.Note
Internally the primary reducer will push updated versions to the ordered server.
Hint
Reduction is performed in a tree fashion:
In the first step, clients will push new gradients to a random secondary reducer, and the secondary reducer will perform the first reduction pass, then secondary reducers will push their results to the primary reducer.
In the second step, the primary reducer will reduce results from the secondary reducer to get the final reduced gradient dictionary (has the same structure as state_dict), and assign gradients to its managed model, and perform the optimization.
In the final step, the primary reducer will push the final model to the model server group, then clients can pull the newest model.
- Parameters
server_name (str) – Name of this server, used to registered the server as a paired class of
group
.group (machin.parallel.distributed.world.RpcGroup) – Server group.
model_name (str) – Name of the managed model in the ordered server, only needed if
server
needs such a identifier. The default ordered server does not require this.primary_reducer (str) – Name of the process serving as the primary reducer, which collects reduced gradients from secondary reducers and perform the final reduction.
secondary_reducers (List[str]) – Name of the process serving as secondary reducers.
o_server (machin.parallel.server.ordered_server.OrderedServerBase) – Custom ordered server accessor. By default, the ordered server is a
OrderedServerSimple
hosted on the primary reducer.reduce_method (str) – “mean” or “sum”
reduce_device (Union[torch.device, str]) – Device to perform reduction, by default it is “cpu”.
reduce_batch_size (int) – Size of a single reduction batch, server will wait until the number of requests in the reduction queue have reached this size.
max_queue_size (int) – Maximum reduction request queue size.
-
manage_model
(model, optimizer)[source]¶ Let the main reducer manage your model. Must be called before start.
Warning
Make sure that the managed model is different from the model you use in your algorithms such as A3C!
- Parameters
model (torch.nn.modules.module.Module) – Model to manage.
optimizer (>>>) – Optimizer of your model. you should initialize it first:
optimizer –
- Raises
RuntimeError if current rpc role is not the main reducer. –
-
REDUCE_MASTER
= 0¶
-
REDUCE_SLAVE
= 1¶
-
class
machin.parallel.server.
PushPullModelServer
(model_name, o_server=None)[source]¶ Bases:
object
Create an accessor to the services provided by
PushPullModelServerImpl
- Parameters
model_name (str) – Name of the managed model in the ordered server, only needed if
server
needs such a identifier. The default ordered server does not require this.o_server (machin.parallel.server.ordered_server.OrderedServerBase) – Ordered server accessor.
-
pull
(model)[source]¶ Pull the newest state dict of your model and update its parameters and
pp_version
. Gradients will not be cleared.- Parameters
model (torch.nn.modules.module.Module) – Model to pull.
-
push
(model, pull_on_fail=True)[source]¶ Try to push a model to the ordered server, if failed, the newest model will be automatically pulled and its parameters will be assigned to
model
. Gradients will not be cleared.- Parameters
model (torch.nn.modules.module.Module) – Model to push.
pull_on_fail – Pull the newest parameters if push failed.
-
class
machin.parallel.server.
PushPullModelServerImpl
(server_name, group, model_name='model', o_server=None)[source]¶ Bases:
object
A simple parameter server, which synchronize model parameters by pushing and pulling all parameters and maintaining a strict ordered version chain.
Warning
Only one model is supported.
This init function must be only invoked on the runner process, and the runner process must be a member process of
group
.- Parameters
server_name (str) – Name of this server, used to registered the server as a paired class of
group
.group (machin.parallel.distributed.world.RpcGroup) – RpcGroup of the default server
OrderedServerSimple
mutually exclusive witho_server
model_name (str) – Name of the managed model in the ordered server, only needed if
server
needs such a identifier. The default ordered server does not require this.o_server (machin.parallel.server.ordered_server.OrderedServerBase) – Custom ordered server accessor.
assigner¶
-
class
machin.parallel.assigner.
ModelAssigner
(models, model_connection, devices=None, model_size_multiplier=2, max_mem_ratio=0.5, cpu_weight=0, connection_weight=2, size_match_weight=0.01, complexity_match_weight=1, entropy_weight=1, iterations=500, update_rate=0.01, gpu_gpu_distance=1, cpu_gpu_distance=10, move_models=True)[source]¶ Bases:
object
Assigner for pytorch modules.
Assign models to different devices. In the scope of a single process. Assigner assumes all GPUs have the same processing power.
Assignment is based on four aspects:
Distance and model connections. Connection is usually indicated by the amount of data transmitted between two models.
Compute complexity.
Model size.
Entropy.
Four aspects are controlled by four weights:
connection_weight
, assigner will try to reduce the totaldistance * connection
if this weight is larger.size_match_weight
, this weight controls the total memory space used on a single device, only works if total assigned memory of models exceeds allowed device memory size (internally it uses a relu activation), the larger, the tighter and more restricted the fit.complexity_match_weight
, this weights balance the model computation cost across devices, assigner will try to even thecomputation cost / compute power
ratio for each device if this weight is larger.entropy_weight
, this weight minimize the uncertainty of model placement probability, somodel i
will have a close to 1 probability of locating on somedevice j
if this weight is larger.
Assignment uses gradient descent to compute the probability matrix of each
model i
locating on each availabledevice j
.See also
Note
When the sum of your model size is very close to the capacity of your device memory, ModelAssigner does not respond very well to the
size_match_weight
, therefore, please consider about increasingmodel_size_multiplier
or decreasingmax_mem_ratio
.- Parameters
models (List[torch.nn.modules.module.Module]) – Models to assign.
model_connection (Dict[Tuple[int, int], int]) – Connection weight between modules. Must be positive
devices (List[Union[torch.device, str]]) – Available devices.
model_size_multiplier – Size multiplier of models, used to reserve enough space for models,
max_mem_ratio – Maximum percent of memory allowed.
cpu_weight – Weight of cpu. Relative to the computing power of one GPU. By default it is 0 so no computation will be performed on CPU. Must be positive
connection_weight – Weight of connection between models.
size_match_weight – Weight of size match.
complexity_match_weight – Weight of complexity match.
entropy_weight – Weight of entropy.
iterations – Number of optimization iterations.
update_rate – Learning rate of the adam optimizer.
gpu_gpu_distance – Estimated distance cost between gpu-gpu. Must be positive
cpu_gpu_distance – Estimated distance cost between cpu-gpu. Must be positive
move_models – Whether to automatically move the models after assignment.
-
static
optimize_placement
(optimizer, placement, model_size, size_capacity, model_complexity, complexity_capacity, model_connection, device_distance, connection_weight, size_match_weight, complexity_match_weight, entropy_weight)[source]¶ Suppose there are n models to place and m devices available.
- Parameters
optimizer – optimizer of placement
placement (torch.Tensor) – shape
[n, m]
model_size (torch.Tensor) – shape
[1, n]
size_capacity (torch.Tensor) – shape
[1, m]
model_complexity (torch.Tensor) – shape
[1, n]
complexity_capacity (torch.Tensor) – shape
[1, m]
model_connection (torch.Tensor) – shape
[n, n]
device_distance (torch.Tensor) – shape
[m, m]
connection_weight (float) – Weight of connection between models.
size_match_weight (float) – Weight of size match.
complexity_match_weight (float) – Weight of complexity match.
entropy_weight (float) – weight of entropy.
-
property
assignment
¶ List[t.device]: Assigned devices for each model in your model list.
-
class
machin.parallel.assigner.
ModelSizeEstimator
(model, size_multiplier=2)[source]¶ Bases:
object
Size estimator for pytorch modules.
Estimates the size of PyTorch models in memory.
Note
This estimator can only estimate the total size of parameters and buffers. Therefore we need to multiply the raw estimated size with a correction coefficient to reserve enough space for models.
- Parameters
model (torch.nn.modules.module.Module) – Model to be estimated.
size_multiplier – Model estimated size will be multiplied with this value, to ensure enough space will be reserved to contain your model and inputs.
pickle¶
-
class
machin.parallel.pickle.
Pickler
(file, recurse=False, copy_tensor=False)[source]¶ Bases:
dill._dill.Pickler
Note
Picklers shares “.dispatch” among instances, and owns “dispatch_table” per instance.
The base Pickler (not dill, from builtin pickle library), will first look up the default dump method in “.dispatch”, if no valid method is found, it will try to find a custom dump method in “.dispatch_table”.
This takes a binary file for writing a pickle data stream.
The optional protocol argument tells the pickler to use the given protocol; supported protocols are 0, 1, 2, 3 and 4. The default protocol is 3; a backward-incompatible protocol designed for Python 3.
Specifying a negative protocol version selects the highest protocol version supported. The higher the protocol used, the more recent the version of Python needed to read the pickle produced.
The file argument must have a write() method that accepts a single bytes argument. It can thus be a file object opened for binary writing, an io.BytesIO instance, or any other custom object that meets this interface.
If fix_imports is True and protocol is less than 3, pickle will try to map the new Python 3 names to the old module names used in Python 2, so that the pickle data stream is readable with Python 2.
-
machin.parallel.pickle.
dumps
(obj, recurse=False, copy_tensor=True)[source]¶ Convert objects to bytes. Works for cpu and gpu tensors.
Warning
Till pytorch 1.5.0, there is a bug for referenced gpu tensors, which would require users to keep shared gpu tensors during the whole process life and not reassigning / deleting them, however, you may refill them with different values.
See here
- Parameters
obj – Object to dump.
recurse – Enable recursive dumping, enable this to dump local functions and lambdas.
copy_tensor – Whether to dump tensors, storage as a full copy. If it is set to “False”, then dumped tensors must either locate on GPUs or in shared memory.
- Returns
Bytes.
-
machin.parallel.pickle.
mark_static_module
(module)[source]¶ Some modules are static, which means they are stateless and will remain the same whether you import it in process A or process B.
If your module contains reference to functions, objects or anything inside a CDLL (usually the reference is a pointer), it is not picklable by dill, and will cause nasty errors, however, by marking this module as “Static”, dill will recognize this module as a builtin module and not saving the states of this module, dill will only save a reference to it in this situation.
- Parameters
module (Any) – Some module which imports CDLLs by hand and not using pybind11.
pool¶
-
class
machin.parallel.pool.
CtxPool
(processes, initializer=None, initargs=(), maxtasksperchild=None, worker_contexts=None, is_recursive=False, is_daemon=True, is_copy_tensor=True, share_method=None)[source]¶ Bases:
machin.parallel.pool.Pool
Pool with context for each worker. your function must accept a
ctx
object as your first non-keyword argument.If
worker_contexts
is not specified, thenctx
will beNone
.The length of
worker_contexts
must be the same asprocesses
Note
To share “cpu” tensors in shared memory, you must set:
is_copy_tensor=False, share_method="cpu"
To share “cuda” tensors, you must set:
is_copy_tensor=False, share_method="cuda"
Note
The default context used in pool is “spawn”, to avoid any issues brought by “fork”. “fork” will only be used if you want to pass cpu tensors in shared memory.
- Parameters
processes (int) – Number of processes in the pool.
initializer – Initializer function executed by the pool/
initargs – Args passed to the init function.
maxtasksperchild – Maximum number of tasks per worker process.
is_recursive – Set to
True
to support local functions and lambdas.is_daemon – Whether worker processes in the pool are started as daemon processes.
is_copy_tensor – Whether to copy tensors or pass tensors by reference to worker processes.
share_method – If
is_copy_tensor
isFalse
, you must specify this argument. “cpu” means you may use cpu tensors in the shared memory, “cuda” means cuda tensors, you can only specify one share method.
-
class
machin.parallel.pool.
CtxPoolStorage
[source]¶ Bases:
object
This storage class is used by all
CtxPool
instances. However, since for each worker process, they have different memory spaces,storage
is unique for all workers.storage
is accessed on the client process side.-
storage
= None¶
-
-
class
machin.parallel.pool.
CtxThreadPool
(processes, initializer=None, initargs=(), worker_contexts=None)[source]¶ Bases:
machin.parallel.pool.ThreadPool
- Parameters
processes (int) –
-
apply_async
(func, args=(), kwds=None, callback=None, error_callback=None)[source]¶ Asynchronous version of apply() method.
-
imap
(func, iterable, chunksize=1)[source]¶ Equivalent of map() – can be MUCH slower than Pool.map().
-
imap_unordered
(func, iterable, chunksize=1)[source]¶ Like imap() method but ordering of results is arbitrary.
-
map
(func, iterable, chunksize=None)[source]¶ Apply func to each element in iterable, collecting the results in a list that is returned.
-
map_async
(func, iterable, chunksize=None, callback=None, error_callback=None)[source]¶ Asynchronous version of map() method.
-
class
machin.parallel.pool.
P2PPool
(processes=None, initializer=None, initargs=(), maxtasksperchild=None, is_recursive=False, is_daemon=True, is_copy_tensor=True, share_method=None)[source]¶ Bases:
machin.parallel.pool.Pool
Note
To share “cpu” tensors in shared memory, you must set:
is_copy_tensor=False, share_method="cpu"
To share “cuda” tensors, you must set:
is_copy_tensor=False, share_method="cuda"
Note
The default context used in pool is “spawn”, to avoid any issues brought by “fork”. “fork” will only be used if you want to pass cpu tensors in shared memory.
- Parameters
processes – Number of processes in the pool.
initializer – Initializer function executed by the pool/
initargs – Args passed to the init function.
maxtasksperchild – Maximum number of tasks per worker process.
is_recursive – Set to
True
to support local functions and lambdas.is_daemon – Whether worker processes in the pool are started as daemon processes.
is_copy_tensor – Whether to copy tensors or pass tensors by reference to worker processes.
share_method – If
is_copy_tensor
isFalse
, you must specify this argument. “cpu” means you may use cpu tensors in the shared memory, “cuda” means cuda tensors, you can only specify one share method.
-
class
machin.parallel.pool.
Pool
(processes=None, initializer=None, initargs=(), maxtasksperchild=None, is_recursive=False, is_daemon=True, is_copy_tensor=True, share_method=None)[source]¶ Bases:
multiprocessing.pool.Pool
- Enhanced multiprocessing pool for pytorch, provides:
Support for lambdas and local functions.
Ability to select the tensor serialize scheme.
Note
To share “cpu” tensors in shared memory, you must set:
is_copy_tensor=False, share_method="cpu"
To share “cuda” tensors, you must set:
is_copy_tensor=False, share_method="cuda"
Note
The default context used in pool is “spawn”, to avoid any issues brought by “fork”. “fork” will only be used if you want to pass cpu tensors in shared memory.
- Parameters
processes – Number of processes in the pool.
initializer – Initializer function executed by the pool/
initargs – Args passed to the init function.
maxtasksperchild – Maximum number of tasks per worker process.
is_recursive – Set to
True
to support local functions and lambdas.is_daemon – Whether worker processes in the pool are started as daemon processes.
is_copy_tensor – Whether to copy tensors or pass tensors by reference to worker processes.
share_method – If
is_copy_tensor
isFalse
, you must specify this argument. “cpu” means you may use cpu tensors in the shared memory, “cuda” means cuda tensors, you can only specify one share method.
-
apply_async
(func, args=(), kwds=None, callback=None, error_callback=None)[source]¶ Asynchronous version of apply() method.
-
imap
(func, iterable, chunksize=1)[source]¶ Equivalent of map() – can be MUCH slower than Pool.map().
-
imap_unordered
(func, iterable, chunksize=1)[source]¶ Like imap() method but ordering of results is arbitrary.
-
map
(func, iterable, chunksize=None)[source]¶ Apply func to each element in iterable, collecting the results in a list that is returned.
-
map_async
(func, iterable, chunksize=None, callback=None, error_callback=None)[source]¶ Asynchronous version of map() method.
-
class
machin.parallel.pool.
ThreadPool
(processes=None, initializer=None, initargs=())[source]¶ Bases:
multiprocessing.pool.ThreadPool
A typical thread pool.
queue¶
-
class
machin.parallel.queue.
SimpleQueue
(*, ctx=None, copy_tensor=False)[source]¶ Bases:
object
A simple single direction queue for inter-process communications. There could be multiple receivers and multiple senders on each side.
- Parameters
ctx – Multiprocessing context, you can get this using
get_context
copy_tensor – Set the queue to send a fully serialized tensor if
True
, and only a stub of reference ifFalse
.
See also
dump_tensor()
-
get
(timeout=None)[source]¶ Get an object from the queue. This api is required by
multiprocessing.pool
to perform inter-process communication.Note
This api is used by sub-processes in pool to get tasks and work.
- Returns
Any object.
-
put
(obj)[source]¶ Put an object into the queue. This api is required by
multiprocessing.pool
to perform inter-process communication.Note
This api is used by sub-processes in pool to put results and respond.
- Parameters
obj (Any) – Any object.