bagua.torch_api.communication¶
Module Contents¶
- class bagua.torch_api.communication.ReduceOp¶
Bases:
enum.IntEnum
An enum-like class for available reduction operations:
SUM
,PRODUCT
,MIN
,MAX
,BAND
,BOR
,BXOR
andAVG
.Initialize self. See help(type(self)) for accurate signature.
- AVG = 10¶
- BAND = 8¶
- BOR = 7¶
- BXOR = 9¶
- MAX = 3¶
- MIN = 2¶
- PRODUCT = 1¶
- SUM = 0¶
- bagua.torch_api.communication.allgather(send_tensor, recv_tensor, comm=None)¶
Gathers send tensors from all processes associated with the communicator into
recv_tensor
.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have a size of
comm.nranks * send_tensor.size()
elements.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.allgather_inplace(tensor, comm=None)¶
The in-place version of
allgather
.- Parameters
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.communication.allreduce(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)¶
Reduces the tensor data across all processes associated with the communicator in such a way that all get the final result. After the call
recv_tensor
is going to be bitwise identical in all processes.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have the same size with
send_tensor
.op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
Examples:
>>> from bagua.torch_api import allreduce >>> >>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> send_tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> recv_tensor = torch.zeros(2, dtype=torch.int64) >>> send_tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> allreduce(send_tensor, recv_tensor) >>> recv_tensor tensor([4, 6]) # Rank 0 tensor([4, 6]) # Rank 1 >>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> send_tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> recv_tensor = torch.zeros(2, dtype=torch.cfloat) >>> send_tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> allreduce(send_tensor, recv_tensor) >>> recv_tensor tensor([4.+4.j, 6.+6.j]) # Rank 0 tensor([4.+4.j, 6.+6.j]) # Rank 1
- bagua.torch_api.communication.allreduce_inplace(tensor, op=ReduceOp.SUM, comm=None)¶
The in-place version of
allreduce
.- Parameters
op (ReduceOp) –
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.communication.alltoall(send_tensor, recv_tensor, comm=None)¶
Each process scatters
send_tensor
to all processes associated with the communicator and return the gathered data inrecv_tensor
.- Parameters
send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by
comm.nranks
.recv_tensor (torch.Tensor) – Output of the collective, must have equal size with
send_tensor
.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.alltoall_inplace(tensor, comm=None)¶
The in-place version of
alltoall
.- Parameters
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.communication.broadcast(tensor, src=0, comm=None)¶
Broadcasts the tensor to all processes associated with the communicator.
tensor
must have the same number of elements in all processes participating in the collective.- Parameters
tensor (torch.Tensor) – Data to be sent if
src
is the rank of current process, and tensor to be used to save received data otherwise.src (int, optional) – Source rank. Default: 0.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
, the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.gather(send_tensor, recv_tensor, dst, comm=None)¶
Gathers send tensors from all processes associated with the communicator to
recv_tensor
in a single process.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have a size of
comm.nranks * send_tensor.size()
elements.dst (int) – Destination rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.gather_inplace(tensor, count, dst, comm=None)¶
The in-place version of
gather
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, On the
dst
rank, it must have a size ofcomm.nranks * count
elements. On non-dst ranks, its size must be equal to :attr:count
.count (int) – The per-rank data count to gather.
dst (int) – Destination rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.init_process_group()¶
Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of Bagua.
- Examples::
>>> import torch >>> import bagua.torch_api as bagua >>> >>> torch.cuda.set_device(bagua.get_local_rank()) >>> bagua.init_process_group() >>> >>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model = model.with_bagua([optimizer], ...)
- bagua.torch_api.communication.recv(tensor, src, comm=None)¶
Receives a tensor synchronously.
- Parameters
tensor (torch.Tensor) – Tensor to fill with received data.
src (int) – Source rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
, the global Bagua communicator will be used.
- bagua.torch_api.communication.reduce(send_tensor, recv_tensor, dst, op=ReduceOp.SUM, comm=None)¶
Reduces the tensor data across all processes.
Only the process whit rank
dst
is going to receive the final result.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have the same size with
send_tensor
.dst (int) – Destination rank.
op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.reduce_inplace(tensor, dst, op=ReduceOp.SUM, comm=None)¶
The in-place version of
reduce
.- Parameters
op (ReduceOp) –
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.communication.reduce_scatter(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)¶
Reduces, then scatters
send_tensor
to all processes associated with the communicator.- Parameters
send_tensor (torch.Tensor) – Input of the collective, must have a size of
comm.nranks * recv_tensor.size()
elements.recv_tensor (torch.Tensor) – Output of the collective.
op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.reduce_scatter_inplace(tensor, op=ReduceOp.SUM, comm=None)¶
The in-place version of
reduce_scatter
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, the size must be divisible by
comm.nranks
.op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.scatter(send_tensor, recv_tensor, src, comm=None)¶
Scatters send tensor to all processes associated with the communicator.
- Parameters
send_tensor (torch.Tensor) – Input of the collective, must have a size of
comm.nranks * recv_tensor.size()
elements.recv_tensor (torch.Tensor) – Output of the collective.
src (int) – Source rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.scatter_inplace(tensor, count, src, comm=None)¶
The in-place version of
scatter
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, On the
src
rank, it must have a size ofcomm.nranks * count
elements. On non-src ranks, its size must be equal tocount
.count (int) – The per-rank data count to scatter.
src (int) – Source rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.send(tensor, dst, comm=None)¶
Sends a tensor to
dst
synchronously.- Parameters
tensor (torch.Tensor) – Tensor to send.
dst (int) – Destination rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
, the global Bagua communicator will be used.