bagua.torch_api.communication

Module Contents

class bagua.torch_api.communication.ReduceOp

Bases: enum.IntEnum

An enum-like class for available reduction operations: SUM, PRODUCT, MIN, MAX, BAND, BOR, BXOR and AVG.

Initialize self. See help(type(self)) for accurate signature.

AVG = 10
BAND = 8
BOR = 7
BXOR = 9
MAX = 3
MIN = 2
PRODUCT = 1
SUM = 0
bagua.torch_api.communication.allgather(send_tensor, recv_tensor, comm=None)

Gathers send tensors from all processes associated with the communicator into recv_tensor.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have a size of comm.nranks * send_tensor.size() elements.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None the global Bagua communicator will be used. Default: None.

bagua.torch_api.communication.allgather_inplace(tensor, comm=None)

The in-place version of allgather.

Parameters

comm (bagua_core.BaguaSingleCommunicatorPy) –

bagua.torch_api.communication.allreduce(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)

Reduces the tensor data across all processes associated with the communicator in such a way that all get the final result. After the call recv_tensor is going to be bitwise identical in all processes.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have the same size with send_tensor.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None the global Bagua communicator will be used. Default: None.

Examples:

>>> from bagua.torch_api import allreduce
>>>
>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> send_tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
>>> recv_tensor = torch.zeros(2, dtype=torch.int64)
>>> send_tensor
tensor([1, 2]) # Rank 0
tensor([3, 4]) # Rank 1
>>> allreduce(send_tensor, recv_tensor)
>>> recv_tensor
tensor([4, 6]) # Rank 0
tensor([4, 6]) # Rank 1

>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> send_tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j)
>>> recv_tensor = torch.zeros(2, dtype=torch.cfloat)
>>> send_tensor
tensor([1.+1.j, 2.+2.j]) # Rank 0
tensor([3.+3.j, 4.+4.j]) # Rank 1
>>> allreduce(send_tensor, recv_tensor)
>>> recv_tensor
tensor([4.+4.j, 6.+6.j]) # Rank 0
tensor([4.+4.j, 6.+6.j]) # Rank 1
bagua.torch_api.communication.allreduce_inplace(tensor, op=ReduceOp.SUM, comm=None)

The in-place version of allreduce.

Parameters
  • op (ReduceOp) –

  • comm (bagua_core.BaguaSingleCommunicatorPy) –

bagua.torch_api.communication.alltoall(send_tensor, recv_tensor, comm=None)

Each process scatters send_tensor to all processes associated with the communicator and return the gathered data in recv_tensor.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by comm.nranks.

  • recv_tensor (torch.Tensor) – Output of the collective, must have equal size with send_tensor.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None the global Bagua communicator will be used. Default: None.

bagua.torch_api.communication.alltoall_inplace(tensor, comm=None)

The in-place version of alltoall.

Parameters

comm (bagua_core.BaguaSingleCommunicatorPy) –

bagua.torch_api.communication.broadcast(tensor, src=0, comm=None)

Broadcasts the tensor to all processes associated with the communicator.

tensor must have the same number of elements in all processes participating in the collective.

Parameters
  • tensor (torch.Tensor) – Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise.

  • src (int, optional) – Source rank. Default: 0.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None, the global Bagua communicator will be used. Default: None.

bagua.torch_api.communication.gather(send_tensor, recv_tensor, dst, comm=None)

Gathers send tensors from all processes associated with the communicator to recv_tensor in a single process.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have a size of comm.nranks * send_tensor.size() elements.

  • dst (int) – Destination rank.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None the global Bagua communicator will be used. Default: None.

bagua.torch_api.communication.gather_inplace(tensor, count, dst, comm=None)

The in-place version of gather.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective, On the dst rank, it must have a size of comm.nranks * count elements. On non-dst ranks, its size must be equal to :attr:count.

  • count (int) – The per-rank data count to gather.

  • dst (int) – Destination rank.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None the global Bagua communicator will be used. Default: None.

bagua.torch_api.communication.init_process_group()

Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of Bagua.

Examples::
>>> import torch
>>> import bagua.torch_api as bagua
>>>
>>> torch.cuda.set_device(bagua.get_local_rank())
>>> bagua.init_process_group()
>>>
>>> model = torch.nn.Sequential(
...    torch.nn.Linear(D_in, H),
...    torch.nn.ReLU(),
...    torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...    model.parameters(),
...    lr=0.01,
...    momentum=0.9
...    )
>>> model = model.with_bagua([optimizer], ...)
bagua.torch_api.communication.recv(tensor, src, comm=None)

Receives a tensor synchronously.

Parameters
  • tensor (torch.Tensor) – Tensor to fill with received data.

  • src (int) – Source rank.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None, the global Bagua communicator will be used.

bagua.torch_api.communication.reduce(send_tensor, recv_tensor, dst, op=ReduceOp.SUM, comm=None)

Reduces the tensor data across all processes.

Only the process whit rank dst is going to receive the final result.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have the same size with send_tensor.

  • dst (int) – Destination rank.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None the global Bagua communicator will be used. Default: None.

bagua.torch_api.communication.reduce_inplace(tensor, dst, op=ReduceOp.SUM, comm=None)

The in-place version of reduce.

Parameters
  • op (ReduceOp) –

  • comm (bagua_core.BaguaSingleCommunicatorPy) –

bagua.torch_api.communication.reduce_scatter(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)

Reduces, then scatters send_tensor to all processes associated with the communicator.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective, must have a size of comm.nranks * recv_tensor.size() elements.

  • recv_tensor (torch.Tensor) – Output of the collective.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None the global Bagua communicator will be used. Default: None.

bagua.torch_api.communication.reduce_scatter_inplace(tensor, op=ReduceOp.SUM, comm=None)

The in-place version of reduce_scatter.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective, the size must be divisible by comm.nranks.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None the global Bagua communicator will be used. Default: None.

bagua.torch_api.communication.scatter(send_tensor, recv_tensor, src, comm=None)

Scatters send tensor to all processes associated with the communicator.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective, must have a size of comm.nranks * recv_tensor.size() elements.

  • recv_tensor (torch.Tensor) – Output of the collective.

  • src (int) – Source rank.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None the global Bagua communicator will be used. Default: None.

bagua.torch_api.communication.scatter_inplace(tensor, count, src, comm=None)

The in-place version of scatter.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective, On the src rank, it must have a size of comm.nranks * count elements. On non-src ranks, its size must be equal to count.

  • count (int) – The per-rank data count to scatter.

  • src (int) – Source rank.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None the global Bagua communicator will be used. Default: None.

bagua.torch_api.communication.send(tensor, dst, comm=None)

Sends a tensor to dst synchronously.

Parameters
  • tensor (torch.Tensor) – Tensor to send.

  • dst (int) – Destination rank.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If None, the global Bagua communicator will be used.