bagua.torch_api

The Bagua communication library PyTorch interface.

Package Contents

bagua.torch_api.init_process_group()

Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of bagua.

Raises

RepeatedInitializationError – If you run this function repeatedly

Examples::
>>> import bagua.torch_api as bagua
>>> bagua.init_process_group()
>>> model = torch.nn.Sequential(
...    torch.nn.Linear(D_in, H),
...    torch.nn.ReLU(),
...    torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...    model.parameters(),
...    lr=0.01,
...    momentum=0.9
...    )
>>> model, optimizer = bagua_init(model, optimizer)
bagua.torch_api.allreduce(tensor, average=True, comm=None)

Reduces the tensor data across all machines in such a way that all get the final result. After the call tensor is going to be bitwise identical in all processes.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective. The function operates in-place.

  • average (bool, optional) – Average the reduced tensor or not, Defaults to True.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The bagua communicator to work on. If None the global bagua communicator will be used. Defaults to None.

Examples

>>> from bagua.torch_api import allreduce
>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
>>> tensor
tensor([1, 2]) # Rank 0
tensor([3, 4]) # Rank 1
>>> allreduce(tensor)
>>> tensor
tensor([4, 6]) # Rank 0
tensor([4, 6]) # Rank 1
>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j)
>>> tensor
tensor([1.+1.j, 2.+2.j]) # Rank 0
tensor([3.+3.j, 4.+4.j]) # Rank 1
>>> allreduce(tensor)
>>> tensor
tensor([4.+4.j, 6.+6.j]) # Rank 0
tensor([4.+4.j, 6.+6.j]) # Rank 1
bagua.torch_api.broadcast(tensor, root=0, comm=None)

Broadcasts the tensor to the whole communicator.

tensor must have the same number of elements in all processes participating in the collective.

Parameters
  • tensor (torch.Tensor) – Data to be sent if root is the rank of current process, and tensor to be used to save received data otherwise.

  • root (int, optional) – Source rank. Defaults to 0.

  • comm (B.BaguaSingleCommunicatorPy, optional) – The bagua communicator to work on. If None, the global bagua communicator will be used. Defaults to None.

bagua.torch_api.bagua_init(module, optimizer, distributed_algorithm=DistributedAlgorithm.GradientAllReduce, broadcast_buffers=True, delay_reduce=False, hierarchical_reduce=None, message_size=10000000, **kwargs)

bagua_init is a module wrapper that enables easy multiprocess distributed data parallel training using different distributed algorithms.

Parameters
  • module (torch.nn.Module) – Network definition to be run in multi-gpu/distributed mode.

  • optimizer (torch.optim.Optimizer or list of torch.optim.Optimizer) – Optimizer(s) for the module. It can contain one or more PyTorch optimizers.

  • distributed_algorithm (DistributedAlgorithm) – Distributed algorithm used to average gradients or weights across all workers. Default: DistributedAlgorithm.GradientAllReduce.

  • broadcast_buffers (bool) – Flag that enables syncing (broadcasting) buffers of the module at the first iteration of the forward function. Default: True.

  • delay_reduce (bool) – Delay all communication to the end of the backward pass. This disables overlapping communication with computation. Default value is False.

  • hierarchical_reduce (bool) – Enable hierarchical reduce. For GradientAllReduce algorithm, default value is False, otherwise, default value is True.

  • message_size (int) – Minimum bytes in a communication bucket. Default: 10_000_000.

Returns

Distributed module.

Examples:

>>> model = torch.nn.Sequential(
...    torch.nn.Linear(D_in, H),
...    torch.nn.ReLU(),
...    torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...    model.parameters(),
...    lr=0.01,
...    momentum=0.9
...    )
>>> model, optimizer = bagua_init(
...    model,
...    optimizer,
...    broadcast_buffers=True
...    )
bagua.torch_api.get_rank()

Get the rank of current process group.

Rank is a unique identifier assigned to each process within a distributed process group. They are always consecutive integers ranging from 0 to world_size.

Returns

The rank of the process group.

bagua.torch_api.get_world_size()

Get the number of processes in the current process group.

Returns

The world size of the process group.

bagua.torch_api.get_local_rank()

Get the rank of current node.

Local rank is a unique identifier assigned to each process within a node. They are always consecutive integers ranging from 0 to local_size.

Returns

The local rank of the node.

bagua.torch_api.get_local_size()

Get the number of processes in the node.

Returns

The local size of the node.

class bagua.torch_api.DistributedAlgorithm

Bases: enum.Enum

An enum-like class of available distributed algorithms: allreduce, sg-allreduce, quantize and decentralize.

The values of this class are lowercase strings, e.g., "allreduce". They can be accessed as attributes, e.g., DistributedAlgorithm.GradientAllReduce.

This class can be directly called to parse the string, e.g., DistributedAlgorithm(algor_str).

GradientAllReduce = allreduce
ScatterGatherAllReduce = sg-allreduce
Decentralize = decentralize
QuantizeAllReduce = quantize
static from_str(val)
Parameters

val (str) –