Bagua

This website contains Bagua API documentation. See tutorials if you need step by step instructions on how to use Bagua.

bagua

Bagua is a communication library developed by Kuaishou Technology and DS3 Lab for deep learning.

See tutorials for Bagua’s rationale and benchmark.

Subpackages

bagua.torch_api

The Bagua communication library PyTorch interface.

Subpackages
bagua.torch_api.algorithms
Submodules
bagua.torch_api.algorithms.async_model_average
Module Contents
class bagua.torch_api.algorithms.async_model_average.AsyncModelAverageAlgorithm(peer_selection_mode='all', sync_interval_ms=500, warmup_steps=0)

Bases: bagua.torch_api.algorithms.Algorithm

Create an instance of the AsyncModelAverage algorithm.

The asynchronous implementation is experimental, and imposes some restrictions. With such asynchronous algorithm, the number of iterations on each worker are different. Therefore the current implementation assumes that the dataset is an endless stream, and all workers continuously synchronize between each other.

Users should call abort to manually stop the algorithm’s continuous synchronization process.

Parameters
  • peer_selection_mode (str) – The way how workers communicate with each other. Currently "all" is supported. "all" means all workers’ weights are synchronized during each communication.

  • sync_interval_ms (int) – Number of milliseconds between model synchronizations.

  • warmup_steps (int) – Number of steps to warm up by doing gradient allreduce before doing asynchronous model averaging. Use 0 to disable.

abort(self, bagua_module)

Stop background asynchronous communications. Should be called after training.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

resume(self, bagua_module)

Resume aborted background asynchronous communications (see abort). Should be called before training.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

bagua.torch_api.algorithms.base
Module Contents
class bagua.torch_api.algorithms.base.Algorithm

This is the base class that all Bagua algorithms inherit.

It provides methods that can be override to implement different kinds of distributed algorithms.

init_backward_hook(self, bagua_module)

Given a BaguaModule, return a hook function that will be executed on every parameter’s gradient computation completion.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

Returns

A function that takes the name of a parameter (as in torch.nn.Module.named_parameters) and the parameter itself.

init_forward_pre_hook(self, bagua_module)

Given a BaguaModule, return a hook function that will be executed before the forward process.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

Returns

A function that takes the model’s input.

init_operations(self, bagua_module, bucket)

Given a BaguaModule, and a BaguaBucket, register operations to be executed on the bucket.

Parameters
init_post_backward_hook(self, bagua_module)

Given a BaguaModule, return a hook function that will be executed when the backward pass is done.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

Returns

A function that takes no argument.

init_post_optimizer_step_hook(self, bagua_module)

Given a BaguaModule, return a hook function that will be executed when the optimizer.step() is done.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

Returns

A function that gets called after an optimizer’s step() method is called. The function takes the optimizer as its argument.

init_tensors(self, bagua_module)

Given a BaguaModule, return Bagua tensors to be used in Bagua for later operations.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

Returns

A list of Bagua tensors for communication.

Return type

List[bagua.torch_api.tensor.BaguaTensor]

need_reset(self)
Returns

True if all initialization methods of the current algorithms should be called again. This is useful for algorithms that have multiple stages where each stage needs different initializations.

Return type

bool

tensors_to_buckets(self, tensors)

Given the bucketing suggestion from Bagua, return the actual Bagua buckets. The default implementation follows the suggestion to do the bucketing.

Parameters

tensors (List[List[bagua.torch_api.tensor.BaguaTensor]]) – Bagua tensors grouped in different lists, representing Bagua’s suggestion on how to bucketing the tensors.

Returns

A list of Bagua buckets.

Return type

List[bagua.torch_api.bucket.BaguaBucket]

bagua.torch_api.algorithms.bytegrad
Module Contents
class bagua.torch_api.algorithms.bytegrad.ByteGradAlgorithm(average=True)

Bases: bagua.torch_api.algorithms.Algorithm

Create an instance of the ByteGrad algorithm.

Parameters

average (bool) – If True, the gradients on each worker are averaged. Otherwise, they are summed.

bagua.torch_api.algorithms.decentralized
Module Contents
class bagua.torch_api.algorithms.decentralized.DecentralizedAlgorithm(hierarchical=True, peer_selection_mode='all', communication_interval=1)

Bases: bagua.torch_api.algorithms.Algorithm

Create an instance of the Decentralized SGD algorithm.

Parameters
  • hierarchical (bool) – Enable hierarchical communication.

  • peer_selection_mode (str) – Can be "all" or "shift_one". "all" means all workers’ weights are averaged in each communication step. "shift_one" means each worker selects a different peer to do weights average in each communication step.

  • communication_interval (int) – Number of iterations between two communication steps.

tensors_to_buckets(self, tensors)
Parameters

tensors (List[List[bagua.torch_api.tensor.BaguaTensor]]) –

Return type

List[bagua.torch_api.bucket.BaguaBucket]

class bagua.torch_api.algorithms.decentralized.LowPrecisionDecentralizedAlgorithm(hierarchical=True, communication_interval=1)

Bases: bagua.torch_api.algorithms.Algorithm

Create an instance of the Low Precision Decentralized SGD algorithm.

Parameters
  • hierarchical (bool) – Enable hierarchical communication.

  • communication_interval (int) – Number of iterations between two communication steps.

bagua.torch_api.algorithms.gradient_allreduce
Module Contents
class bagua.torch_api.algorithms.gradient_allreduce.GradientAllReduceAlgorithm(hierarchical=False, average=True)

Bases: bagua.torch_api.algorithms.Algorithm

Create an instance of the GradientAllReduce algorithm.

Parameters
  • hierarchical (bool) – Enable hierarchical communication.

  • average (bool) – If True, the gradients on each worker are averaged. Otherwise, they are summed.

bagua.torch_api.algorithms.q_adam
Module Contents
class bagua.torch_api.algorithms.q_adam.QAdamAlgorithm(q_adam_optimizer, hierarchical=True)

Bases: bagua.torch_api.algorithms.Algorithm

Create an instance of the QAdam Algorithm .

Parameters
  • q_adam_optimizer (QAdamOptimizer) – A QAdamOptimizer initialized with model parameters.

  • hierarchical (bool) – Enable hierarchical communication.

class bagua.torch_api.algorithms.q_adam.QAdamOptimizer(params, lr=0.001, warmup_steps=100, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.0)

Bases: torch.optim.optimizer.Optimizer

Create a dedicated optimizer used for QAdam algorithm.

Parameters
  • params (iterable) – Iterable of parameters to optimize or dicts defining parameter groups.

  • lr (float) – Learning rate.

  • warmup_steps (int) – Number of steps to warm up by doing gradient allreduce before doing asynchronous model averaging. Use 0 to disable.

  • betas (Tuple[float, float]) – Coefficients used for computing running averages of gradient and its square.

  • eps (float) – Term added to the denominator to improve numerical stability.

  • weight_decay (float) – Weight decay (L2 penalty).

step(self, closure=None)
Package Contents
class bagua.torch_api.algorithms.Algorithm

This is the base class that all Bagua algorithms inherit.

It provides methods that can be override to implement different kinds of distributed algorithms.

init_backward_hook(self, bagua_module)

Given a BaguaModule, return a hook function that will be executed on every parameter’s gradient computation completion.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

Returns

A function that takes the name of a parameter (as in torch.nn.Module.named_parameters) and the parameter itself.

init_forward_pre_hook(self, bagua_module)

Given a BaguaModule, return a hook function that will be executed before the forward process.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

Returns

A function that takes the model’s input.

init_operations(self, bagua_module, bucket)

Given a BaguaModule, and a BaguaBucket, register operations to be executed on the bucket.

Parameters
init_post_backward_hook(self, bagua_module)

Given a BaguaModule, return a hook function that will be executed when the backward pass is done.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

Returns

A function that takes no argument.

init_post_optimizer_step_hook(self, bagua_module)

Given a BaguaModule, return a hook function that will be executed when the optimizer.step() is done.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

Returns

A function that gets called after an optimizer’s step() method is called. The function takes the optimizer as its argument.

init_tensors(self, bagua_module)

Given a BaguaModule, return Bagua tensors to be used in Bagua for later operations.

Parameters

bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by with_bagua method.

Returns

A list of Bagua tensors for communication.

Return type

List[bagua.torch_api.tensor.BaguaTensor]

need_reset(self)
Returns

True if all initialization methods of the current algorithms should be called again. This is useful for algorithms that have multiple stages where each stage needs different initializations.

Return type

bool

tensors_to_buckets(self, tensors)

Given the bucketing suggestion from Bagua, return the actual Bagua buckets. The default implementation follows the suggestion to do the bucketing.

Parameters

tensors (List[List[bagua.torch_api.tensor.BaguaTensor]]) – Bagua tensors grouped in different lists, representing Bagua’s suggestion on how to bucketing the tensors.

Returns

A list of Bagua buckets.

Return type

List[bagua.torch_api.bucket.BaguaBucket]

bagua.torch_api.checkpoint
Submodules
bagua.torch_api.checkpoint.checkpointing
Module Contents
bagua.torch_api.checkpoint.checkpointing.load_checkpoint(checkpoints_path, model, optimizer=None, lr_scheduler=None, strict=True)

Load a model checkpoint and return the iteration.

Parameters
  • checkpoints_path (str) – Path of checkpoints.

  • model (BaguaModule) – The model to load on.

  • optimizer (torch.optim.Optimizer, optional) – The optimizer to load on. Default: None.

  • lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to load on. Default: None.

  • strict (bool, optional) – whether to strictly enforce that the keys in state_dict of the checkpoint match the keys returned by this module’s state_dict() function. Default: True.

bagua.torch_api.checkpoint.checkpointing.save_checkpoint(iteration, checkpoints_path, model, optimizer=None, lr_scheduler=None)

Save model checkpoint.

Parameters
  • iteration (int) – Training Iteration.

  • checkpoints_path (str) – Path of checkpoints.

  • model (BaguaModule) – The model to save.

  • optimizer (torch.optim.Optimizer, optional) – The optimizer to save. Default: None.

  • lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to save. Default: None.

Package Contents
bagua.torch_api.checkpoint.load_checkpoint(checkpoints_path, model, optimizer=None, lr_scheduler=None, strict=True)

Load a model checkpoint and return the iteration.

Parameters
  • checkpoints_path (str) – Path of checkpoints.

  • model (BaguaModule) – The model to load on.

  • optimizer (torch.optim.Optimizer, optional) – The optimizer to load on. Default: None.

  • lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to load on. Default: None.

  • strict (bool, optional) – whether to strictly enforce that the keys in state_dict of the checkpoint match the keys returned by this module’s state_dict() function. Default: True.

bagua.torch_api.checkpoint.save_checkpoint(iteration, checkpoints_path, model, optimizer=None, lr_scheduler=None)

Save model checkpoint.

Parameters
  • iteration (int) – Training Iteration.

  • checkpoints_path (str) – Path of checkpoints.

  • model (BaguaModule) – The model to save.

  • optimizer (torch.optim.Optimizer, optional) – The optimizer to save. Default: None.

  • lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to save. Default: None.

bagua.torch_api.contrib
Subpackages
bagua.torch_api.contrib.utils
Submodules
bagua.torch_api.contrib.utils.redis_store
Module Contents
class bagua.torch_api.contrib.utils.redis_store.RedisStore(hosts=None, cluster_mode=True, capacity_per_node=107374182400)

Bases: bagua.torch_api.contrib.utils.store.ClusterStore

A Redis-based distributed key-value store implementation, with set and get API exposed.

Parameters
  • hosts (List[Dict[str, str]]) – A list of redis servers, defined by a list of dict containing Redis host and port information like [{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}]. A new Redis instance will be spawned on each node if hosts=None.

  • cluster_mode (bool) – If True, data is sharded across all Redis instances. Otherwise, if there are \(m\) Redis instances, the workers on the \(n\)-th node will use the \(n % m\)-th Redis instance.

  • capacity_per_node (int) – Maximum memory limit in bytes when spawning new Redis instances. Old values will be evicted when the limit is reached. Default is 100GB.

Note

All Bagua jobs within the same node will share the same local Redis instance if hosts=None. The capacity_per_node only affects newly spawned Redis instances, and has no effect on existing ones.

bagua.torch_api.contrib.utils.store
Module Contents
class bagua.torch_api.contrib.utils.store.ClusterStore(stores)

Bases: Store

Base class for distributed key-value stores.

In cluster store, entries will be sharded equally among multiple store instances based on their keys.

Parameters

stores (List[Store]) – A list of stores to shard entries on.

class bagua.torch_api.contrib.utils.store.Store

Base class for key-value store implementations. Entries are added to store with set or mset, and retrieved with get or mget.

clear(self)

Delete all keys in the current store.

get(self, key)

Returns the value associated with key, or None if the key doesn’t exist.

Parameters

key (str) –

Return type

Optional[Union[str, bytes]]

mget(self, keys)

Retrieve each key’s corresponding value and return them in a list with the same order as keys.

Parameters

keys (List[str]) –

Return type

List[Optional[Union[str, bytes]]]

mset(self, dictionary)

Set multiple entries at once with a dictionary. Each key-value pair in the dictionary will be set.

Parameters

dictionary (Dict[str, Union[str, bytes]]) –

num_keys(self)

Returns the number of keys in the current store.

Return type

int

set(self, key, value)

Set a key-value pair.

Parameters
  • key (str) –

  • value (Union[str, bytes]) –

shutdown(self)

Shutdown the managed store instances. Unmanaged instances will not be killed.

status(self)

Returns True if the current store is alive.

Return type

bool

Submodules
bagua.torch_api.contrib.cache_loader
Module Contents
class bagua.torch_api.contrib.cache_loader.CacheLoader(backend='redis', dataset_name='', writer_buffer_size=1, **kwargs)

Cache loader caches values calculated by an expensive function by theirs keys via get, so that the values can be retrieved faster next time.

Internally, values are indexed by "{dataset_name}_{key}" and saved in a distributed key-value store, where dataset_name is specified on initializing, and key is the argument in get.

By default, cache loader uses RedisStore as its backend distributed key-value store implementation. It supports using a list of existing redis servers or spawning new redis servers. Parameters for RedisStore can be provided here in **kwargs.

Parameters
  • backend (str) – Backend distributed key-value store implementation. Can be "redis".

  • dataset_name (str) – Name of the dataset. Default "".

  • writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.

Example::

To use a list of existing redis servers for the “redis” backend:

>>> from bagua.torch_api.contrib import CacheLoader
>>>
>>> hosts = [{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}]
>>> loader = CacheLoader(backend="redis", hosts=hosts, cluster_mode=True, dataset_name="test")
>>>
>>> loader.get(index, lambda x: items[x])

To spawn new redis servers for the “redis” backend:

>>> loader = CacheLoader(backend="redis", hosts=None, cluster_mode=True, capacity_per_node=100000000)

Note

Cache loaders with the same dataset_name will reuse and overwrite each other’s cache. Use a different dataset_name if this is not desired.

get(self, key, load_fn)

Returns the value associated with key in cache, use load_fn to create the entry if the key does not exist in the cache. load_fn is a function taking key as its argument, and returning corresponding value to be cached.

Parameters
  • key (str) –

  • load_fn (Callable[[str], None]) –

num_keys(self)

Returns the number of keys in the cache.

bagua.torch_api.contrib.cached_dataset
Module Contents
class bagua.torch_api.contrib.cached_dataset.CachedDataset(dataset, backend='redis', dataset_name='', writer_buffer_size=20, **kwargs)

Bases: torch.utils.data.dataset.Dataset

Cached dataset wraps a PyTorch dataset to cache its samples in memory, so that accessing these samples after the first time can be much faster. This is useful when samples need tedious preprocessing to produce, or reading the dataset itself is slow, which could slow down the whole training process.

Internally, the samples are indexed by a string key "{dataset_name}_{index}" and saved in a distributed key-value store, where dataset_name is specified when initializing the cached dataset, and index is the index of a specific sample (the argument of __getitem__ method in a PyTorch dataset).

Parameters
  • dataset (torch.utils.data.dataset.Dataset) – PyTorch dataset to be wrapped.

  • backend (str) – Backend distributed key-value store implementation. Can be "redis".

  • dataset_name (str) – Name of the dataset. Default "".

  • writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.

Example:

>>> from bagua.torch_api.contrib import CachedDataset
>>> cache_dataset = CachedDataset(dataset, backend="redis", dataset_name="ds")
>>> dataloader = torch.utils.data.DataLoader(cached_dataset)

Note

Cached dataset is a special case of cache loader. Parameter backend and writer_buffer_size in initializing a cached dataset have the same meanings as those in initializing a cache loader. You can provide the arguments for cache loader here in **kwargs. See also CacheLoader.

cache_loader

The backend cache loader instance.

bagua.torch_api.contrib.fused_optimizer
Module Contents
class bagua.torch_api.contrib.fused_optimizer.FusedOptimizer(optimizer, do_flatten=False)

Bases: torch.optim.Optimizer

Convert any optimizer into a fused optimizer.

This fused optimizer fuses multiple module parameter update kernel launches into one or a few, by flattening parameter tensors into one or more contiguous buckets.

It can be used in conjunction with with_bagua method. In this case, Bagua will do the fusions automatically, otherwise, you need to explicitly set do_flatten=True.

Parameters
  • optimizer (torch.optim.Optimizer) – Any PyTorch optimizer.

  • do_flatten (bool) – Whether to flatten the parameters. Default: False.

Returns

Fused optimizer.

Example::

To use in conjunction with with_bagua method:

>>> optimizer = torch.optim.Adadelta(model.parameters(), ....)
>>> optimizer = bagua.torch_api.contrib.FusedOptimizer(optimizer)
>>> model = model.with_bagua([optimizer], GradientAllReduceAlgorithm())

To use alone or with torch.nn.parallel.DistributedDataParallel, set do_flatten=True:

>>> optimizer = torch.optim.Adadelta(model.parameters(), ....)
>>> optimizer = bagua.torch_api.contrib.FusedOptimizer(optimizer, do_flatten=True)
step(self, closure=None)

Performs a single optimization step (parameter update).

Parameters

closure (Callable) – A closure that reevaluates the model and returns the loss. Optional for most optimizers.

Note

Unless otherwise specified, this function should not modify the .grad field of the parameters.

bagua.torch_api.contrib.load_balancing_data_loader
Module Contents
class bagua.torch_api.contrib.load_balancing_data_loader.LoadBalancingDistributedBatchSampler(sampler, batch_fn, drop_last=False)

Bases: torch.utils.data.sampler.Sampler

Wraps another load balance sampler to yield variable sized mini-batches.

Parameters
  • sampler (LoadBalancingDistributedSampler) – Load balance sampler.

  • batch_fn (Callable) – Callable to yield mini-batch indices.

  • drop_last (bool) – If True, the sampler will drop the last few batches exceeding the least number of batches among replicas, otherwise, the number of batches on each replica will be padded to the same.

batch_fn will have the signature of:

def batch_fn(indices: List[int]) -> List[List[int]]
Example::
>>> from bagua.torch_api.contrib import LoadBalancingDistributedSampler, \
...     LoadBalancingDistributedBatchSampler
>>>
>>> sampler = LoadBalancingDistributedSampler(dataset, complexity_fn=complexity_fn)
>>> batch_sampler = LoadBalancingDistributedBatchSampler(sampler, batch_fn=batch_fn)
>>> loader = torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler)
>>>
>>> for epoch in range(start_epoch, n_epochs):
...     batch_sampler.set_epoch(epoch)
...     train(loader)
set_epoch(self, epoch)

Sets the epoch for this sampler. When shuffle=True, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.

Parameters

epoch (int) – Epoch number.

Return type

None

class bagua.torch_api.contrib.load_balancing_data_loader.LoadBalancingDistributedSampler(dataset, complexity_fn, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False, random_level=0)

Bases: torch.utils.data.sampler.Sampler

Sampler that restricts data loading to a subset of the dataset.

This sampler use a complexity_fn to calculate each sample’s computational complexity and make each batch get similar computation complexity.

This is useful in scenarios like speech and NLP, where each batch has variable length and distributed training suffers from straggler problem.

The usage is similar to torch.utils.data.DistributedSampler, where each process loads a subset of the original dataset that is exclusive to it.

Note

Dataset is assumed to be of constant size.

Parameters
  • dataset (torch.utils.data.dataset.Dataset) – Dataset used for sampling.

  • complexity_fn (Callable) – A function whose input is a sample and output is an integer as a measure of the computational complexity of the sample.

  • num_replicas (int, optional) – Number of processes participating in distributed training. By default, world_size is retrieved from the current distributed group.

  • rank (int, optional) – Rank of the current process within num_replicas. By default, rank is retrieved from the current distributed group.

  • shuffle (bool, optional) – If True (default), sampler will shuffle the indices.

  • seed (int, optional) – random seed used to shuffle the sampler if shuffle=True. This number should be identical across all processes in the distributed group. Default: 0.

  • drop_last (bool, optional) – if True, then the sampler will drop the tail of the data to make it evenly divisible across the number of replicas. If False, the sampler will add extra indices to make the data evenly divisible across the replicas. Default: False.

  • random_level (float, optional) – A float varies from 0 and 1 that controls the extent of load balance. 0 means the best load balance, while 1 means the opposite.

Warning

In distributed mode, calling the set_epoch method at the beginning of each epoch before creating the DataLoader iterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be always used.

Example::

Define your complexity_fn, which accepts a dataset sample as its input and produces an integer as the sample’s computational complexity:

>>> dataset = torch.utils.data.TensorDataset(torch.randn(n, 2), torch.randperm(n))
>>> complexity_fn = lambda x: x[1]

Below is the usage of LoadBalancingDistributedSampler and DataLoader:

>>> sampler = bagua.torch_api.contrib.LoadBalancingDistributedSampler(
...     dataset,
...     complexity_fn=complexity_fn) if is_distributed else None
>>> loader = torch.utils.data.DataLoader(dataset,
...     shuffle=(sampler is None),
...     sampler=sampler)
>>>
>>> for epoch in range(start_epoch, n_epochs):
...     if is_distributed:
...         sampler.set_epoch(epoch)
...     train(loader)
set_epoch(self, epoch)

Sets the epoch for this sampler. When shuffle=True, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.

Parameters

epoch (int) – Epoch number.

Return type

None

bagua.torch_api.contrib.sync_batchnorm
Module Contents
bagua.torch_api.contrib.sync_batchnorm.unused
class bagua.torch_api.contrib.sync_batchnorm.SyncBatchNorm(num_features, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)

Bases: torch.nn.modules.batchnorm._BatchNorm

Applies synchronous BatchNorm for distributed module with N-dimensional BatchNorm layer(s). See BatchNorm for more details.

Parameters
  • num_features – Number of channels \(C\) from the shape \((N, C, ...)\).

  • eps – A value added to the denominator for numerical stability. Default: 1e-5.

  • momentum – The value used for the running_mean and running_var computation. Can be set to None for cumulative moving average (i.e. simple average). Default: 0.1.

  • affine – A boolean value that when set to True, this module has learnable affine parameters. Default: True.

  • track_running_stats – A boolean value that when set to True, this module tracks the running mean and variance, and when set to False, this module does not track such statistics and always uses batch statistics in both training and eval modes. Default: True.

Note

Only GPU input tensors are supported in the training mode.

classmethod convert_sync_batchnorm(cls, module)

Helper function to convert all BatchNorm*D layers in the model to torch.nn.SyncBatchNorm layers.

Parameters

module (nn.Module) – Module containing one or more BatchNorm*D layers

Returns

The original module with the converted torch.nn.SyncBatchNorm layers. If the original module is a BatchNorm*D layer, a new torch.nn.SyncBatchNorm layer object will be returned instead.

Note

This function must be called before with_bagua method.

Example::
>>> # Network with nn.BatchNorm layer
>>> model = torch.nn.Sequential(
...      torch.nn.Linear(D_in, H),
...      torch.nn.ReLU(),
...      torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...      model.parameters(),
...      lr=0.01,
...      momentum=0.9
...    )
>>> sync_bn_model = bagua.torch_api.contrib.sync_batchnorm.SyncBatchNorm.convert_sync_batchnorm(model)
>>> bagua_model = sync_bn_model.with_bagua([optimizer], GradientAllReduce())
forward(self, input)
Package Contents
class bagua.torch_api.contrib.CacheLoader(backend='redis', dataset_name='', writer_buffer_size=1, **kwargs)

Cache loader caches values calculated by an expensive function by theirs keys via get, so that the values can be retrieved faster next time.

Internally, values are indexed by "{dataset_name}_{key}" and saved in a distributed key-value store, where dataset_name is specified on initializing, and key is the argument in get.

By default, cache loader uses RedisStore as its backend distributed key-value store implementation. It supports using a list of existing redis servers or spawning new redis servers. Parameters for RedisStore can be provided here in **kwargs.

Parameters
  • backend (str) – Backend distributed key-value store implementation. Can be "redis".

  • dataset_name (str) – Name of the dataset. Default "".

  • writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.

Example::

To use a list of existing redis servers for the “redis” backend:

>>> from bagua.torch_api.contrib import CacheLoader
>>>
>>> hosts = [{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}]
>>> loader = CacheLoader(backend="redis", hosts=hosts, cluster_mode=True, dataset_name="test")
>>>
>>> loader.get(index, lambda x: items[x])

To spawn new redis servers for the “redis” backend:

>>> loader = CacheLoader(backend="redis", hosts=None, cluster_mode=True, capacity_per_node=100000000)

Note

Cache loaders with the same dataset_name will reuse and overwrite each other’s cache. Use a different dataset_name if this is not desired.

get(self, key, load_fn)

Returns the value associated with key in cache, use load_fn to create the entry if the key does not exist in the cache. load_fn is a function taking key as its argument, and returning corresponding value to be cached.

Parameters
  • key (str) –

  • load_fn (Callable[[str], None]) –

num_keys(self)

Returns the number of keys in the cache.

class bagua.torch_api.contrib.CachedDataset(dataset, backend='redis', dataset_name='', writer_buffer_size=20, **kwargs)

Bases: torch.utils.data.dataset.Dataset

Cached dataset wraps a PyTorch dataset to cache its samples in memory, so that accessing these samples after the first time can be much faster. This is useful when samples need tedious preprocessing to produce, or reading the dataset itself is slow, which could slow down the whole training process.

Internally, the samples are indexed by a string key "{dataset_name}_{index}" and saved in a distributed key-value store, where dataset_name is specified when initializing the cached dataset, and index is the index of a specific sample (the argument of __getitem__ method in a PyTorch dataset).

Parameters
  • dataset (torch.utils.data.dataset.Dataset) – PyTorch dataset to be wrapped.

  • backend (str) – Backend distributed key-value store implementation. Can be "redis".

  • dataset_name (str) – Name of the dataset. Default "".

  • writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.

Example:

>>> from bagua.torch_api.contrib import CachedDataset
>>> cache_dataset = CachedDataset(dataset, backend="redis", dataset_name="ds")
>>> dataloader = torch.utils.data.DataLoader(cached_dataset)

Note

Cached dataset is a special case of cache loader. Parameter backend and writer_buffer_size in initializing a cached dataset have the same meanings as those in initializing a cache loader. You can provide the arguments for cache loader here in **kwargs. See also CacheLoader.

cache_loader

The backend cache loader instance.

class bagua.torch_api.contrib.FusedOptimizer(optimizer, do_flatten=False)

Bases: torch.optim.Optimizer

Convert any optimizer into a fused optimizer.

This fused optimizer fuses multiple module parameter update kernel launches into one or a few, by flattening parameter tensors into one or more contiguous buckets.

It can be used in conjunction with with_bagua method. In this case, Bagua will do the fusions automatically, otherwise, you need to explicitly set do_flatten=True.

Parameters
  • optimizer (torch.optim.Optimizer) – Any PyTorch optimizer.

  • do_flatten (bool) – Whether to flatten the parameters. Default: False.

Returns

Fused optimizer.

Example::

To use in conjunction with with_bagua method:

>>> optimizer = torch.optim.Adadelta(model.parameters(), ....)
>>> optimizer = bagua.torch_api.contrib.FusedOptimizer(optimizer)
>>> model = model.with_bagua([optimizer], GradientAllReduceAlgorithm())

To use alone or with torch.nn.parallel.DistributedDataParallel, set do_flatten=True:

>>> optimizer = torch.optim.Adadelta(model.parameters(), ....)
>>> optimizer = bagua.torch_api.contrib.FusedOptimizer(optimizer, do_flatten=True)
step(self, closure=None)

Performs a single optimization step (parameter update).

Parameters

closure (Callable) – A closure that reevaluates the model and returns the loss. Optional for most optimizers.

Note

Unless otherwise specified, this function should not modify the .grad field of the parameters.

class bagua.torch_api.contrib.LoadBalancingDistributedBatchSampler(sampler, batch_fn, drop_last=False)

Bases: torch.utils.data.sampler.Sampler

Wraps another load balance sampler to yield variable sized mini-batches.

Parameters
  • sampler (LoadBalancingDistributedSampler) – Load balance sampler.

  • batch_fn (Callable) – Callable to yield mini-batch indices.

  • drop_last (bool) – If True, the sampler will drop the last few batches exceeding the least number of batches among replicas, otherwise, the number of batches on each replica will be padded to the same.

batch_fn will have the signature of:

def batch_fn(indices: List[int]) -> List[List[int]]
Example::
>>> from bagua.torch_api.contrib import LoadBalancingDistributedSampler, \
...     LoadBalancingDistributedBatchSampler
>>>
>>> sampler = LoadBalancingDistributedSampler(dataset, complexity_fn=complexity_fn)
>>> batch_sampler = LoadBalancingDistributedBatchSampler(sampler, batch_fn=batch_fn)
>>> loader = torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler)
>>>
>>> for epoch in range(start_epoch, n_epochs):
...     batch_sampler.set_epoch(epoch)
...     train(loader)
set_epoch(self, epoch)

Sets the epoch for this sampler. When shuffle=True, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.

Parameters

epoch (int) – Epoch number.

Return type

None

class bagua.torch_api.contrib.LoadBalancingDistributedSampler(dataset, complexity_fn, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False, random_level=0)

Bases: torch.utils.data.sampler.Sampler

Sampler that restricts data loading to a subset of the dataset.

This sampler use a complexity_fn to calculate each sample’s computational complexity and make each batch get similar computation complexity.

This is useful in scenarios like speech and NLP, where each batch has variable length and distributed training suffers from straggler problem.

The usage is similar to torch.utils.data.DistributedSampler, where each process loads a subset of the original dataset that is exclusive to it.

Note

Dataset is assumed to be of constant size.

Parameters
  • dataset (torch.utils.data.dataset.Dataset) – Dataset used for sampling.

  • complexity_fn (Callable) – A function whose input is a sample and output is an integer as a measure of the computational complexity of the sample.

  • num_replicas (int, optional) – Number of processes participating in distributed training. By default, world_size is retrieved from the current distributed group.

  • rank (int, optional) – Rank of the current process within num_replicas. By default, rank is retrieved from the current distributed group.

  • shuffle (bool, optional) – If True (default), sampler will shuffle the indices.

  • seed (int, optional) – random seed used to shuffle the sampler if shuffle=True. This number should be identical across all processes in the distributed group. Default: 0.

  • drop_last (bool, optional) – if True, then the sampler will drop the tail of the data to make it evenly divisible across the number of replicas. If False, the sampler will add extra indices to make the data evenly divisible across the replicas. Default: False.

  • random_level (float, optional) – A float varies from 0 and 1 that controls the extent of load balance. 0 means the best load balance, while 1 means the opposite.

Warning

In distributed mode, calling the set_epoch method at the beginning of each epoch before creating the DataLoader iterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be always used.

Example::

Define your complexity_fn, which accepts a dataset sample as its input and produces an integer as the sample’s computational complexity:

>>> dataset = torch.utils.data.TensorDataset(torch.randn(n, 2), torch.randperm(n))
>>> complexity_fn = lambda x: x[1]

Below is the usage of LoadBalancingDistributedSampler and DataLoader:

>>> sampler = bagua.torch_api.contrib.LoadBalancingDistributedSampler(
...     dataset,
...     complexity_fn=complexity_fn) if is_distributed else None
>>> loader = torch.utils.data.DataLoader(dataset,
...     shuffle=(sampler is None),
...     sampler=sampler)
>>>
>>> for epoch in range(start_epoch, n_epochs):
...     if is_distributed:
...         sampler.set_epoch(epoch)
...     train(loader)
set_epoch(self, epoch)

Sets the epoch for this sampler. When shuffle=True, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.

Parameters

epoch (int) – Epoch number.

Return type

None

bagua.torch_api.model_parallel
Subpackages
bagua.torch_api.model_parallel.moe
Submodules
bagua.torch_api.model_parallel.moe.experts
Module Contents
class bagua.torch_api.model_parallel.moe.experts.Experts(expert, num_local_experts=1)

Bases: torch.nn.Module

forward(self, inputs)
bagua.torch_api.model_parallel.moe.layer
Module Contents
class bagua.torch_api.model_parallel.moe.layer.MoE(hidden_size, expert, num_local_experts=1, k=1, output_dropout_prob=0.0, capacity_factor=1.0, eval_capacity_factor=1.0, min_capacity=4, noisy_gate_policy=None)

Bases: torch.nn.Module

Initialize an MoE layer.

Parameters
  • hidden_size (int) – the hidden dimension of the model, importantly this is also the input and output dimension.

  • expert (torch.nn.Module) – the torch module that defines the expert (e.g., MLP, torch.linear).

  • num_local_experts (int, optional) – default=1, number of local experts per gpu.

  • k (int, optional) – default=1, top-k gating value, only supports k=1 or k=2.

  • output_dropout_prob (float, optional) – default=0.0, output dropout probability.

  • capacity_factor (float, optional) – default=1.0, the capacity of the expert at training time.

  • eval_capacity_factor (float, optional) – default=1.0, the capacity of the expert at eval time.

  • min_capacity (int, optional) – default=4, the minimum capacity per expert regardless of the capacity_factor.

  • noisy_gate_policy (str, optional) – default=None, noisy gate policy, valid options are ‘Jitter’, ‘RSample’ or ‘None’.

forward(self, hidden_states, used_token=None)

MoE forward

Parameters
  • hidden_states (Tensor) – input to the layer

  • used_token (Tensor, optional) – default: None, mask only used tokens

Returns

A tuple including output, gate loss, and expert count.

  • output (Tensor): output of the model

  • l_aux (Tensor): gate loss value

  • exp_counts (int): expert count

bagua.torch_api.model_parallel.moe.sharded_moe
Module Contents
bagua.torch_api.model_parallel.moe.sharded_moe.Base
bagua.torch_api.model_parallel.moe.sharded_moe.exp_selection_uniform_map :Dict[torch.device, Callable]
bagua.torch_api.model_parallel.moe.sharded_moe.gumbel_map :Dict[torch.device, Callable]
bagua.torch_api.model_parallel.moe.sharded_moe.uniform_map :Dict[torch.device, Callable]
class bagua.torch_api.model_parallel.moe.sharded_moe.MOELayer(gate, experts, num_local_experts, group=None)

Bases: Base

MOELayer module which implements MixtureOfExperts as described in Gshard_.

gate = TopKGate(model_dim, num_experts)
moe = MOELayer(gate, expert)
output = moe(input)
l_aux = moe.l_aux
Parameters
  • gate (torch.nn.Module) – gate network

  • expert (torch.nn.Module) – expert network

  • experts (torch.nn.Module) –

  • num_local_experts (int) –

  • group (Optional[Any]) –

forward(self, *input, **kwargs)
Parameters
  • input (torch.Tensor) –

  • kwargs (Any) –

Return type

torch.Tensor

class bagua.torch_api.model_parallel.moe.sharded_moe.TopKGate(model_dim, num_experts, k=1, capacity_factor=1.0, eval_capacity_factor=1.0, min_capacity=4, noisy_gate_policy=None)

Bases: torch.nn.Module

Gate module which implements Top2Gating as described in Gshard_.

gate = TopKGate(model_dim, num_experts)
l_aux, combine_weights, dispatch_mask = gate(input)
Parameters
  • model_dim (int) – size of model embedding dimension

  • num_experts (ints) – number of experts in model

  • k (int) –

  • capacity_factor (float) –

  • eval_capacity_factor (float) –

  • min_capacity (int) –

  • noisy_gate_policy (Optional[str]) –

wg :torch.nn.Linear
forward(self, input, used_token=None)
Parameters
  • input (torch.Tensor) –

  • used_token (torch.Tensor) –

Return type

Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]

bagua.torch_api.model_parallel.moe.sharded_moe.gumbel_rsample(shape, device)
Parameters
  • shape (Tuple) –

  • device (torch.device) –

Return type

torch.Tensor

bagua.torch_api.model_parallel.moe.sharded_moe.multiplicative_jitter(x, device, epsilon=0.01)

Modified from swtich transformer paper. mesh transformers Multiply values by a random number between 1-epsilon and 1+epsilon. Makes models more resilient to rounding errors introduced by bfloat16. This seems particularly important for logits. :param x: a torch.tensor :param device: torch.device :param epsilon: a floating point value

Returns

a jittered x.

Parameters

device (torch.device) –

bagua.torch_api.model_parallel.moe.sharded_moe.top1gating(logits, capacity_factor, min_capacity, used_token=None, noisy_gate_policy=None)

Implements Top1Gating on logits.

Parameters
  • logits (torch.Tensor) –

  • capacity_factor (float) –

  • min_capacity (int) –

  • used_token (torch.Tensor) –

  • noisy_gate_policy (Optional[str]) –

Return type

Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]

bagua.torch_api.model_parallel.moe.sharded_moe.top2gating(logits, capacity_factor)

Implements Top2Gating on logits.

Parameters
  • logits (torch.Tensor) –

  • capacity_factor (float) –

Return type

Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]

Package Contents
class bagua.torch_api.model_parallel.moe.MoE(hidden_size, expert, num_local_experts=1, k=1, output_dropout_prob=0.0, capacity_factor=1.0, eval_capacity_factor=1.0, min_capacity=4, noisy_gate_policy=None)

Bases: torch.nn.Module

Initialize an MoE layer.

Parameters
  • hidden_size (int) – the hidden dimension of the model, importantly this is also the input and output dimension.

  • expert (torch.nn.Module) – the torch module that defines the expert (e.g., MLP, torch.linear).

  • num_local_experts (int, optional) – default=1, number of local experts per gpu.

  • k (int, optional) – default=1, top-k gating value, only supports k=1 or k=2.

  • output_dropout_prob (float, optional) – default=0.0, output dropout probability.

  • capacity_factor (float, optional) – default=1.0, the capacity of the expert at training time.

  • eval_capacity_factor (float, optional) – default=1.0, the capacity of the expert at eval time.

  • min_capacity (int, optional) – default=4, the minimum capacity per expert regardless of the capacity_factor.

  • noisy_gate_policy (str, optional) – default=None, noisy gate policy, valid options are ‘Jitter’, ‘RSample’ or ‘None’.

forward(self, hidden_states, used_token=None)

MoE forward

Parameters
  • hidden_states (Tensor) – input to the layer

  • used_token (Tensor, optional) – default: None, mask only used tokens

Returns

A tuple including output, gate loss, and expert count.

  • output (Tensor): output of the model

  • l_aux (Tensor): gate loss value

  • exp_counts (int): expert count

Submodules
bagua.torch_api.bucket
Module Contents
class bagua.torch_api.bucket.BaguaBucket(tensors, name, flatten, alignment=1)

Create a Bagua bucket with a list of Bagua tensors.

Parameters
  • tensors (List[bagua.torch_api.tensor.BaguaTensor]) – A list of Bagua tensors to be put in the bucket.

  • name (str) – The unique name of the bucket.

  • flatten (bool) – If True, flatten the input tensors so that they are contiguous in memory.

  • alignment (int) – If alignment > 1, Bagua will create a padding tensor to the bucket so that the total number of elements in the bucket divides the given alignment.

name

The bucket’s name.

tensors

The tensors contained within the bucket.

append_asynchronous_model_average_op(self, peer_selection_mode, group=None)

Append an asynchronous model average operation to a bucket. This operation will enable continuous model averaging between workers while training a model.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

This operation is intended to run in parallel with the computation process. It returns a reference to the op. The op features a lock to exclusively access the model. Call op.lock_weight() to acquire the lock and op.unlock_weight() to release it.

Parameters
  • peer_selection_mode (str) – The way how workers communicate with each otehr. Currently "all" is supported. "all" means all workers’ weights are averaged during each communication.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

Returns

The asynchronous model average operation itself.

append_centralized_synchronous_op(self, hierarchical=False, average=True, scattergather=False, compression=None, group=None)

Append a centralized synchronous operation to a bucket. It will sum or average the tensors in the bucket for all workers.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters
  • hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.

  • average (bool) – If True, the gradients on each worker are averaged. Otherwise, they are summed.

  • scattergather (bool) – If True, the communication between workers are done with scatter gather instead of allreduce. This is required for using compression.

  • compression (Optional[str]) – If not None, the tensors will be compressed for communication. Currently "MinMaxUInt8" is supported.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

append_decentralized_synchronous_op(self, peer_weight, hierarchical=True, peer_selection_mode='all', group=None)

Append a decentralized synchronous operation to a bucket. It will do gossipy style model averaging among workers.

This operation is not inplace, which means the bucket weights is first copied to peer_weight, and the result of decentralized averaging will be in peer_weight. To copy peer_weight back to self, call decentralized_synchronous_op_copy_back_peer_weight.

This operation will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters
  • peer_weight (BaguaTensor) – A tensor used for averaging model with peers, should be of the same size with the bucket tensors total size. Use self.flattened_tensor().to_bagua_tensor(...) to create such a tensor.

  • hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.

  • peer_selection_mode (str) – Can be "all" or "shift_one". "all" means all workers’ weights are averaged in each communication step. "shift_one" means each worker selects a different peer to do weights average in each communication step.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

append_low_precision_decentralized_synchronous_op(self, weight, left_peer_weight, right_peer_weight, hierarchical=True, compression='MinMaxUInt8', group=None)

Append a low precision decentralized synchronous operation to a bucket. It will compress the difference of local models between two successive iterations and exchange them among workers.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters
  • weight (BaguaTensor) – Model replica of current worker’s local model. It should be of the same size with the bucket tensors total size. Use self.flattened_tensor().to_bagua_tensor(...) to create such a tensor.

  • left_peer_weight (BaguaTensor) – Model replica of current worker’s left peer. It should be of the same size with the bucket tensors total size. Use self.flattened_tensor().to_bagua_tensor(...) to create such a tensor, then copy the initializing weights of current worker’s left peer to the tensor.

  • right_peer_weight (BaguaTensor) – Model replica of current worker’s right peer. It should be of the same size with the bucket tensors total size. Use self.flattened_tensor().to_bagua_tensor(...) to create such a tensor. then copy the initializing weights of current worker’s right peer to the tensor.

  • hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.

  • compression (str) – The way how tensors are compressed for communication. Currently "MinMaxUInt8" is supported.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

append_python_op(self, python_function)

Append a Python operation to a bucket. A Python operation is a Python function that takes the bucket’s name and returns None. It can do arbitrary things within the function body.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters

python_function (Callable[[str], None]) – The Python operation function.

bytes(self)

Returns the total number of bytes occupied by the bucket.

Return type

int

check_flatten(self)
Returns

True if the bucket’s tensors are contiguous in memory.

Return type

bool

clear_ops(self)

Clear the previously appended operations.

Return type

BaguaBucket

decentralized_synchronous_op_copy_back_peer_weight(self, peer_weight, hierarchical=True, group=None)

Copy peer_weight back to bucket weights to end a decentralized synchronous operation. See append_decentralized_synchronous_op for more information.

Parameters
  • peer_weight (BaguaTensor) – A tensor used for averaging model with peers, should be of the same size with the bucket tensors total size. Use self.flattened_tensor().to_bagua_tensor(...) to create such a tensor.

  • hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high. Must be the same with hierarchical argument in append_decentralized_synchronous_op.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

flattened_tensor(self)

Returns a tensor contiguous in memory which contains the same data as self tensors and padding tensor (if exists).

Return type

bagua.torch_api.tensor.BaguaTensor

bagua.torch_api.communication
Module Contents
class bagua.torch_api.communication.ReduceOp

Bases: enum.IntEnum

An enum-like class for available reduction operations: SUM, PRODUCT, MIN, MAX, BAND, BOR, BXOR and AVG.

Initialize self. See help(type(self)) for accurate signature.

AVG = 10
BAND = 8
BOR = 7
BXOR = 9
MAX = 3
MIN = 2
PRODUCT = 1
SUM = 0
bagua.torch_api.communication.allgather(send_tensor, recv_tensor, comm=None)

Gathers send tensors from all processes associated with the communicator into recv_tensor.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have a size of comm.nranks * send_tensor.size() elements.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.allgather_inplace(tensor, comm=None)

The in-place version of allgather.

Parameters
  • tensor (torch.Tensor) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.communication.allreduce(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)

Reduces the tensor data across all processes associated with the communicator in such a way that all get the final result. After the call recv_tensor is going to be bitwise identical in all processes.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have the same size with send_tensor.

  • op (ReduceOp) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

Examples:

>>> from bagua.torch_api import allreduce
>>>
>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> send_tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
>>> recv_tensor = torch.zeros(2, dtype=torch.int64)
>>> send_tensor
tensor([1, 2]) # Rank 0
tensor([3, 4]) # Rank 1
>>> allreduce(send_tensor, recv_tensor)
>>> recv_tensor
tensor([4, 6]) # Rank 0
tensor([4, 6]) # Rank 1

>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> send_tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j)
>>> recv_tensor = torch.zeros(2, dtype=torch.cfloat)
>>> send_tensor
tensor([1.+1.j, 2.+2.j]) # Rank 0
tensor([3.+3.j, 4.+4.j]) # Rank 1
>>> allreduce(send_tensor, recv_tensor)
>>> recv_tensor
tensor([4.+4.j, 6.+6.j]) # Rank 0
tensor([4.+4.j, 6.+6.j]) # Rank 1
bagua.torch_api.communication.allreduce_inplace(tensor, op=ReduceOp.SUM, comm=None)

The in-place version of allreduce.

Parameters
  • tensor (torch.Tensor) –

  • op (ReduceOp) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.communication.alltoall(send_tensor, recv_tensor, comm=None)

Each process scatters send_tensor to all processes associated with the communicator and return the gathered data in recv_tensor.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by comm.nranks.

  • recv_tensor (torch.Tensor) – Output of the collective, must have equal size with send_tensor.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.alltoall_inplace(tensor, comm=None)

The in-place version of alltoall.

Parameters
  • tensor (torch.Tensor) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.communication.barrier(comm=None)

Synchronizes all processes. This collective blocks processes until all processes associated with the communicator enters this function.

Parameters

comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.broadcast(tensor, src=0, comm=None)

Broadcasts the tensor to all processes associated with the communicator.

tensor must have the same number of elements in all processes participating in the collective.

Parameters
  • tensor (torch.Tensor) – Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise.

  • src (int) – Source rank. Default: 0.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.from_torch_group(group, stream=None)

Convert a Pytorch process group to its equivalent Bagua process group.

Parameters
  • group – A handle of the Pytorch process group.

  • stream (Optional[torch.cuda.Stream]) – A CUDA stream used to execute NCCL operations. If None, CUDA stream of the default group will be used. See new_group for more information.

Returns

A handle of the Bagua process group.

bagua.torch_api.communication.gather(send_tensor, recv_tensor, dst, comm=None)

Gathers send tensors from all processes associated with the communicator to recv_tensor in a single process.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have a size of comm.nranks * send_tensor.size() elements.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.gather_inplace(tensor, count, dst, comm=None)

The in-place version of gather.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective, On the dst rank, it must have a size of comm.nranks * count elements. On non-dst ranks, its size must be equal to :attr:count.

  • count (int) – The per-rank data count to gather.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.init_process_group(store=None)

Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of Bagua.

Parameters

store (Optional[torch.distributed.Store]) – Key/value store accessible to all workers, used to exchange connection/address information. If None, a TCP-based store will be created. Default: None.

Examples::
>>> import torch
>>> import bagua.torch_api as bagua
>>>
>>> torch.cuda.set_device(bagua.get_local_rank())
>>> bagua.init_process_group()
>>>
>>> model = torch.nn.Sequential(
...    torch.nn.Linear(D_in, H),
...    torch.nn.ReLU(),
...    torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...    model.parameters(),
...    lr=0.01,
...    momentum=0.9
...    )
>>> model = model.with_bagua([optimizer], ...)
bagua.torch_api.communication.is_initialized()

Checking if the default process group has been initialized.

bagua.torch_api.communication.new_group(ranks=None, stream=None)

Creates a new process group.

This function requires that all processes in the default group (i.e. all processes that are part of the distributed job) enter this function, even if they are not going to be members of the group. Additionally, groups should be created in the same order in all processes.

Each process group will create three communicators on request, a global communicator, a inter-node communicator and a intra-node communicator. Users can access them through group.get_global_communicator(), group.get_inter_node_communicator() and group.get_intra_node_communicator() respectively.

Parameters
  • ranks (Optional[List[int]]) – List of ranks of group members. If None, will be set to all ranks. Default is None.

  • stream (Optional[torch.cuda.Stream]) – A CUDA stream used to execute NCCL operations. If None, CUDA stream of the default group will be used. See CUDA semantics for details.

Returns

A handle of process group that can be given to collective calls.

Note

The global communicator is used for global communications involving all ranks in the process group. The inter-node communicator and the intra-node communicator is used for hierarchical communications in this process group.

Note

For a specific communicator comm, comm.rank() returns the rank of current process and comm.nranks() returns the size of the communicator.

bagua.torch_api.communication.recv(tensor, src, comm=None)

Receives a tensor synchronously.

Parameters
  • tensor (torch.Tensor) – Tensor to fill with received data.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.reduce(send_tensor, recv_tensor, dst, op=ReduceOp.SUM, comm=None)

Reduces the tensor data across all processes.

Only the process whit rank dst is going to receive the final result.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have the same size with send_tensor.

  • dst (int) – Destination rank.

  • op (ReduceOp) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.reduce_inplace(tensor, dst, op=ReduceOp.SUM, comm=None)

The in-place version of reduce.

Parameters
  • tensor (torch.Tensor) –

  • dst (int) –

  • op (ReduceOp) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.communication.reduce_scatter(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)

Reduces, then scatters send_tensor to all processes associated with the communicator.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective, must have a size of comm.nranks * recv_tensor.size() elements.

  • recv_tensor (torch.Tensor) – Output of the collective.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.reduce_scatter_inplace(tensor, op=ReduceOp.SUM, comm=None)

The in-place version of reduce_scatter.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective, the size must be divisible by comm.nranks.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.scatter(send_tensor, recv_tensor, src, comm=None)

Scatters send tensor to all processes associated with the communicator.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective, must have a size of comm.nranks * recv_tensor.size() elements.

  • recv_tensor (torch.Tensor) – Output of the collective.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.scatter_inplace(tensor, count, src, comm=None)

The in-place version of scatter.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective, On the src rank, it must have a size of comm.nranks * count elements. On non-src ranks, its size must be equal to count.

  • count (int) – The per-rank data count to scatter.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.send(tensor, dst, comm=None)

Sends a tensor to dst synchronously.

Parameters
  • tensor (torch.Tensor) – Tensor to send.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.distributed
Module Contents
class bagua.torch_api.distributed.BaguaModule

This class patches torch.nn.Module with several methods to enable Bagua functionalities.

Variables
  • bagua_optimizers (List[torch.optim.Optimizer]) – The optimizers passed in by with_bagua.

  • bagua_algorithm (bagua.torch_api.algorithms.Algorithm) – The algorithm passed in by with_bagua.

  • parameters_to_ignore (List[str]) – The parameter names in "{module_name}.{param_name}" format to ignore when calling self.bagua_build_params().

  • bagua_train_step_counter (int) – Number of iterations in training mode.

  • bagua_buckets (List[bagua.torch_api.bucket.BaguaBucket]) – All Bagua buckets in a list.

bagua_build_params(self)

Build tuple of (parameter_name, parameter) for all parameters that require grads and not in the _bagua_params_and_buffers_to_ignore attribute.

Return type

List[Tuple[str, torch.nn.Parameter]]

with_bagua(self, optimizers, algorithm, process_group=None)

with_bagua enables easy distributed data parallel training on a torch.nn.Module.

Parameters
  • optimizers (List[torch.optim.Optimizer]) – Optimizer(s) used by the module. It can contain one or more PyTorch optimizers.

  • algorithm (bagua.torch_api.algorithms.Algorithm) – Distributed algorithm used to do the actual communication and update.

  • process_group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to be used for distributed data all-reduction. If None, the default process group, which is created by bagua.torch_api.init_process_group, will be used. (default: None)

Returns

The original module, with Bagua related environments initialized.

Return type

BaguaModule

Note

If we want to ignore some layers for communication, we can first check these layer’s corresponding keys in the module’s state_dict (they are in "{module_name}.{param_name}" format), then assign the list of keys to your_module._bagua_params_and_buffers_to_ignore.

Examples:

>>> model = torch.nn.Sequential(
...      torch.nn.Linear(D_in, H),
...      torch.nn.ReLU(),
...      torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...      model.parameters(),
...      lr=0.01,
...      momentum=0.9
...    )
>>> model = model.with_bagua(
...      [optimizer],
...      GradientAllReduce()
...    )
bagua.torch_api.env
Module Contents
bagua.torch_api.env.get_default_bucket_size()

Get default communication bucket byte size.

Returns

The default bucket size.

Return type

int

bagua.torch_api.env.get_local_rank()

Get the rank of current node.

Local rank is a unique identifier assigned to each process within a node. They are always consecutive integers ranging from 0 to local_size.

Returns

The local rank of the node.

Return type

int

bagua.torch_api.env.get_local_size()

Get the number of processes in the node.

Returns

The local size of the node.

Return type

int

bagua.torch_api.env.get_rank()

Get the rank of current process group.

Rank is a unique identifier assigned to each process within a distributed process group. They are always consecutive integers ranging from 0 to world_size.

Returns

The rank of the process group.

Return type

int

bagua.torch_api.env.get_world_size()

Get the number of processes in the current process group.

Returns

The world size of the process group.

Return type

int

bagua.torch_api.tensor
Module Contents
class bagua.torch_api.tensor.BaguaTensor

This class patch torch.Tensor with additional methods.

bagua_backend_tensor(self)
Returns

The raw Bagua backend tensor.

Return type

bagua_core.BaguaTensorPy

bagua_ensure_grad(self)

Return the gradient of current parameter. Create a zero gradient tensor if not exist.

Return type

torch.Tensor

bagua_mark_communication_ready(self)

Mark a Bagua tensor ready for scheduled operations execution.

bagua_mark_communication_ready_without_synchronization(self)

Mark a Bagua tensor ready immediately, without CUDA event synchronization.

bagua_set_storage(self, storage, storage_offset=0)

Sets the underlying storage using an existing torch.Storage.

Parameters
  • storage (torch.Storage) – The storage to use.

  • storage_offset (int) – The offset in the storage.

ensure_bagua_tensor(self, name=None, module_name=None)

Convert a PyTorch tensor or parameter to Bagua tensor inplace and return it. A Bagua tensor is required to use Bagua’s communication algorithms.

Parameters
  • name (Optional[str]) – The unique name of the tensor.

  • module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using model.bagua_module_name. This is required to call bagua_mark_communication_ready related methods.

Returns

The original tensor with Bagua tensor attributes initialized.

is_bagua_tensor(self)
Return type

bool

to_bagua_tensor(self, name=None, module_name=None)

Create a new Bagua tensor from a PyTorch tensor or parameter and return it. The original tensor is not changed. A Bagua tensor is required to use Bagua’s communication algorithms.

Parameters
  • name (Optional[str]) – The unique name of the tensor.

  • module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using model.bagua_module_name. This is required to call bagua_mark_communication_ready related methods.

Returns

The new Bagua tensor sharing the same storage with the original tensor.

Package Contents
bagua.torch_api.version
class bagua.torch_api.BaguaModule

This class patches torch.nn.Module with several methods to enable Bagua functionalities.

Variables
  • bagua_optimizers (List[torch.optim.Optimizer]) – The optimizers passed in by with_bagua.

  • bagua_algorithm (bagua.torch_api.algorithms.Algorithm) – The algorithm passed in by with_bagua.

  • parameters_to_ignore (List[str]) – The parameter names in "{module_name}.{param_name}" format to ignore when calling self.bagua_build_params().

  • bagua_train_step_counter (int) – Number of iterations in training mode.

  • bagua_buckets (List[bagua.torch_api.bucket.BaguaBucket]) – All Bagua buckets in a list.

bagua_build_params(self)

Build tuple of (parameter_name, parameter) for all parameters that require grads and not in the _bagua_params_and_buffers_to_ignore attribute.

Return type

List[Tuple[str, torch.nn.Parameter]]

with_bagua(self, optimizers, algorithm, process_group=None)

with_bagua enables easy distributed data parallel training on a torch.nn.Module.

Parameters
  • optimizers (List[torch.optim.Optimizer]) – Optimizer(s) used by the module. It can contain one or more PyTorch optimizers.

  • algorithm (bagua.torch_api.algorithms.Algorithm) – Distributed algorithm used to do the actual communication and update.

  • process_group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to be used for distributed data all-reduction. If None, the default process group, which is created by bagua.torch_api.init_process_group, will be used. (default: None)

Returns

The original module, with Bagua related environments initialized.

Return type

BaguaModule

Note

If we want to ignore some layers for communication, we can first check these layer’s corresponding keys in the module’s state_dict (they are in "{module_name}.{param_name}" format), then assign the list of keys to your_module._bagua_params_and_buffers_to_ignore.

Examples:

>>> model = torch.nn.Sequential(
...      torch.nn.Linear(D_in, H),
...      torch.nn.ReLU(),
...      torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...      model.parameters(),
...      lr=0.01,
...      momentum=0.9
...    )
>>> model = model.with_bagua(
...      [optimizer],
...      GradientAllReduce()
...    )
class bagua.torch_api.BaguaTensor

This class patch torch.Tensor with additional methods.

bagua_backend_tensor(self)
Returns

The raw Bagua backend tensor.

Return type

bagua_core.BaguaTensorPy

bagua_ensure_grad(self)

Return the gradient of current parameter. Create a zero gradient tensor if not exist.

Return type

torch.Tensor

bagua_mark_communication_ready(self)

Mark a Bagua tensor ready for scheduled operations execution.

bagua_mark_communication_ready_without_synchronization(self)

Mark a Bagua tensor ready immediately, without CUDA event synchronization.

bagua_set_storage(self, storage, storage_offset=0)

Sets the underlying storage using an existing torch.Storage.

Parameters
  • storage (torch.Storage) – The storage to use.

  • storage_offset (int) – The offset in the storage.

ensure_bagua_tensor(self, name=None, module_name=None)

Convert a PyTorch tensor or parameter to Bagua tensor inplace and return it. A Bagua tensor is required to use Bagua’s communication algorithms.

Parameters
  • name (Optional[str]) – The unique name of the tensor.

  • module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using model.bagua_module_name. This is required to call bagua_mark_communication_ready related methods.

Returns

The original tensor with Bagua tensor attributes initialized.

is_bagua_tensor(self)
Return type

bool

to_bagua_tensor(self, name=None, module_name=None)

Create a new Bagua tensor from a PyTorch tensor or parameter and return it. The original tensor is not changed. A Bagua tensor is required to use Bagua’s communication algorithms.

Parameters
  • name (Optional[str]) – The unique name of the tensor.

  • module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using model.bagua_module_name. This is required to call bagua_mark_communication_ready related methods.

Returns

The new Bagua tensor sharing the same storage with the original tensor.

class bagua.torch_api.ReduceOp

Bases: enum.IntEnum

An enum-like class for available reduction operations: SUM, PRODUCT, MIN, MAX, BAND, BOR, BXOR and AVG.

Initialize self. See help(type(self)) for accurate signature.

AVG = 10
BAND = 8
BOR = 7
BXOR = 9
MAX = 3
MIN = 2
PRODUCT = 1
SUM = 0
bagua.torch_api.allgather(send_tensor, recv_tensor, comm=None)

Gathers send tensors from all processes associated with the communicator into recv_tensor.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have a size of comm.nranks * send_tensor.size() elements.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.allgather_inplace(tensor, comm=None)

The in-place version of allgather.

Parameters
  • tensor (torch.Tensor) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.allreduce(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)

Reduces the tensor data across all processes associated with the communicator in such a way that all get the final result. After the call recv_tensor is going to be bitwise identical in all processes.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have the same size with send_tensor.

  • op (ReduceOp) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

Examples:

>>> from bagua.torch_api import allreduce
>>>
>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> send_tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
>>> recv_tensor = torch.zeros(2, dtype=torch.int64)
>>> send_tensor
tensor([1, 2]) # Rank 0
tensor([3, 4]) # Rank 1
>>> allreduce(send_tensor, recv_tensor)
>>> recv_tensor
tensor([4, 6]) # Rank 0
tensor([4, 6]) # Rank 1

>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> send_tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j)
>>> recv_tensor = torch.zeros(2, dtype=torch.cfloat)
>>> send_tensor
tensor([1.+1.j, 2.+2.j]) # Rank 0
tensor([3.+3.j, 4.+4.j]) # Rank 1
>>> allreduce(send_tensor, recv_tensor)
>>> recv_tensor
tensor([4.+4.j, 6.+6.j]) # Rank 0
tensor([4.+4.j, 6.+6.j]) # Rank 1
bagua.torch_api.allreduce_inplace(tensor, op=ReduceOp.SUM, comm=None)

The in-place version of allreduce.

Parameters
  • tensor (torch.Tensor) –

  • op (ReduceOp) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.alltoall(send_tensor, recv_tensor, comm=None)

Each process scatters send_tensor to all processes associated with the communicator and return the gathered data in recv_tensor.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by comm.nranks.

  • recv_tensor (torch.Tensor) – Output of the collective, must have equal size with send_tensor.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.alltoall_inplace(tensor, comm=None)

The in-place version of alltoall.

Parameters
  • tensor (torch.Tensor) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.broadcast(tensor, src=0, comm=None)

Broadcasts the tensor to all processes associated with the communicator.

tensor must have the same number of elements in all processes participating in the collective.

Parameters
  • tensor (torch.Tensor) – Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise.

  • src (int) – Source rank. Default: 0.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.gather(send_tensor, recv_tensor, dst, comm=None)

Gathers send tensors from all processes associated with the communicator to recv_tensor in a single process.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have a size of comm.nranks * send_tensor.size() elements.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.gather_inplace(tensor, count, dst, comm=None)

The in-place version of gather.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective, On the dst rank, it must have a size of comm.nranks * count elements. On non-dst ranks, its size must be equal to :attr:count.

  • count (int) – The per-rank data count to gather.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.get_backend(model_name)
Parameters

model_name (str) –

bagua.torch_api.get_local_rank()

Get the rank of current node.

Local rank is a unique identifier assigned to each process within a node. They are always consecutive integers ranging from 0 to local_size.

Returns

The local rank of the node.

Return type

int

bagua.torch_api.get_local_size()

Get the number of processes in the node.

Returns

The local size of the node.

Return type

int

bagua.torch_api.get_rank()

Get the rank of current process group.

Rank is a unique identifier assigned to each process within a distributed process group. They are always consecutive integers ranging from 0 to world_size.

Returns

The rank of the process group.

Return type

int

bagua.torch_api.get_world_size()

Get the number of processes in the current process group.

Returns

The world size of the process group.

Return type

int

bagua.torch_api.init_process_group(store=None)

Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of Bagua.

Parameters

store (Optional[torch.distributed.Store]) – Key/value store accessible to all workers, used to exchange connection/address information. If None, a TCP-based store will be created. Default: None.

Examples::
>>> import torch
>>> import bagua.torch_api as bagua
>>>
>>> torch.cuda.set_device(bagua.get_local_rank())
>>> bagua.init_process_group()
>>>
>>> model = torch.nn.Sequential(
...    torch.nn.Linear(D_in, H),
...    torch.nn.ReLU(),
...    torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...    model.parameters(),
...    lr=0.01,
...    momentum=0.9
...    )
>>> model = model.with_bagua([optimizer], ...)
bagua.torch_api.recv(tensor, src, comm=None)

Receives a tensor synchronously.

Parameters
  • tensor (torch.Tensor) – Tensor to fill with received data.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.reduce(send_tensor, recv_tensor, dst, op=ReduceOp.SUM, comm=None)

Reduces the tensor data across all processes.

Only the process whit rank dst is going to receive the final result.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have the same size with send_tensor.

  • dst (int) – Destination rank.

  • op (ReduceOp) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.reduce_inplace(tensor, dst, op=ReduceOp.SUM, comm=None)

The in-place version of reduce.

Parameters
  • tensor (torch.Tensor) –

  • dst (int) –

  • op (ReduceOp) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.reduce_scatter(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)

Reduces, then scatters send_tensor to all processes associated with the communicator.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective, must have a size of comm.nranks * recv_tensor.size() elements.

  • recv_tensor (torch.Tensor) – Output of the collective.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.reduce_scatter_inplace(tensor, op=ReduceOp.SUM, comm=None)

The in-place version of reduce_scatter.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective, the size must be divisible by comm.nranks.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.scatter(send_tensor, recv_tensor, src, comm=None)

Scatters send tensor to all processes associated with the communicator.

Parameters
  • send_tensor (torch.Tensor) – Input of the collective, must have a size of comm.nranks * recv_tensor.size() elements.

  • recv_tensor (torch.Tensor) – Output of the collective.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.scatter_inplace(tensor, count, src, comm=None)

The in-place version of scatter.

Parameters
  • tensor (torch.Tensor) – Input and output of the collective, On the src rank, it must have a size of comm.nranks * count elements. On non-src ranks, its size must be equal to count.

  • count (int) – The per-rank data count to scatter.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.send(tensor, dst, comm=None)

Sends a tensor to dst synchronously.

Parameters
  • tensor (torch.Tensor) – Tensor to send.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.