bagua.torch_api.communication¶
Module Contents¶
- bagua.torch_api.communication.allreduce(tensor, op=dist.ReduceOp.SUM, comm=None)¶
Reduces the tensor data across all machines in such a way that all get the final result. After the call tensor is going to be bitwise identical in all processes.
- Parameters
tensor (torch.Tensor) – Input and output of the collective. The function operates in-place.
op (optional) – one of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions.
comm (B.BaguaSingleCommunicatorPy, optional) – The bagua communicator to work on. If None the global bagua communicator will be used. Defaults to None.
Examples
>>> from bagua.torch_api import allreduce >>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> allreduce(tensor) >>> tensor tensor([4, 6]) # Rank 0 tensor([4, 6]) # Rank 1
>>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> allreduce(tensor) >>> tensor tensor([4.+4.j, 6.+6.j]) # Rank 0 tensor([4.+4.j, 6.+6.j]) # Rank 1
- bagua.torch_api.communication.broadcast(tensor, root=0, comm=None)¶
Broadcasts the tensor to the whole communicator.
tensor must have the same number of elements in all processes participating in the collective.
- Parameters
tensor (torch.Tensor) – Data to be sent if root is the rank of current process, and tensor to be used to save received data otherwise.
root (int, optional) – Source rank. Defaults to 0.
comm (B.BaguaSingleCommunicatorPy, optional) – The bagua communicator to work on. If None, the global bagua communicator will be used. Defaults to None.
- bagua.torch_api.communication.init_process_group()¶
Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of bagua.
- Raises
RepeatedInitializationError – If you run this function repeatedly
- Examples::
>>> import bagua.torch_api as bagua >>> bagua.init_process_group() >>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model, optimizer = bagua_init(model, optimizer)
- bagua.torch_api.communication.reduce(tensor, dst, op=dist.ReduceOp.SUM, comm=None)¶
Reduces the tensor across all processes.
Only the process whit rank dst is going to receive the final result.
- Parameters
tensor (torch.Tensor) – Input and output of the collective. The function operates in-place.
dst (int) – Destination rank
op (optional) – one of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions.
comm (B.BaguaSingleCommunicatorPy, optional) – The bagua communicator to work on. If None the global bagua communicator will be used. Defaults to None.