bagua.torch_api.communication

Module Contents

bagua.torch_api.communication.allreduce(tensor, op=dist.ReduceOp.SUM, comm=None)

Reduces the tensor data across all machines in such a way that all get the final result. After the call tensor is going to be bitwise identical in all processes.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective. The function operates in-place.

  • op (optional) – one of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The bagua communicator to work on. If None the global bagua communicator will be used. Defaults to None.

Examples

>>> from bagua.torch_api import allreduce
>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
>>> tensor
tensor([1, 2]) # Rank 0
tensor([3, 4]) # Rank 1
>>> allreduce(tensor)
>>> tensor
tensor([4, 6]) # Rank 0
tensor([4, 6]) # Rank 1
>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j)
>>> tensor
tensor([1.+1.j, 2.+2.j]) # Rank 0
tensor([3.+3.j, 4.+4.j]) # Rank 1
>>> allreduce(tensor)
>>> tensor
tensor([4.+4.j, 6.+6.j]) # Rank 0
tensor([4.+4.j, 6.+6.j]) # Rank 1
bagua.torch_api.communication.broadcast(tensor, root=0, comm=None)

Broadcasts the tensor to the whole communicator.

tensor must have the same number of elements in all processes participating in the collective.

Parameters
  • tensor (torch.Tensor) – Data to be sent if root is the rank of current process, and tensor to be used to save received data otherwise.

  • root (int, optional) – Source rank. Defaults to 0.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The bagua communicator to work on. If None, the global bagua communicator will be used. Defaults to None.

bagua.torch_api.communication.init_process_group()

Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of bagua.

Raises

RepeatedInitializationError – If you run this function repeatedly

Examples::
>>> import bagua.torch_api as bagua
>>> bagua.init_process_group()
>>> model = torch.nn.Sequential(
...    torch.nn.Linear(D_in, H),
...    torch.nn.ReLU(),
...    torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...    model.parameters(),
...    lr=0.01,
...    momentum=0.9
...    )
>>> model, optimizer = bagua_init(model, optimizer)
bagua.torch_api.communication.reduce(tensor, dst, op=dist.ReduceOp.SUM, comm=None)

Reduces the tensor across all processes.

Only the process whit rank dst is going to receive the final result.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective. The function operates in-place.

  • dst (int) – Destination rank

  • op (optional) – one of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The bagua communicator to work on. If None the global bagua communicator will be used. Defaults to None.