Bagua

This website contains Bagua API documentation. See tutorials if you need step by step instructions on how to use Bagua.

bagua

Bagua is a communication library developed by Kuaishou Technology and DS3 Lab for deep learning.

See tutorials for Bagua’s rationale and benchmark.

Subpackages

bagua.torch_api

The Bagua communication library PyTorch interface.

Subpackages
bagua.torch_api.algorithms
Submodules
bagua.torch_api.algorithms.async_model_average
Module Contents
class bagua.torch_api.algorithms.async_model_average.AsyncModelAverageAlgorithm(peer_selection_mode='all', sync_interval_ms=500, warmup_steps=0)

Bases: bagua.torch_api.algorithms.Algorithm

Create an instance of the AsyncModelAverage algorithm.

The asynchronous implementation is experimental, and imposes some restrictions. With such asynchronous algorithm, the number of iterations on each worker are different. Therefore the current implementation assumes that the dataset is an endless stream, and all workers continuously synchronize between each other.

Users should call abort to manually stop the algorithm’s continuous synchronization process. For example, for a model wrapped with .with_bagua(…), you can abort with model.bagua_algorithm.abort(model), and resume with model.bagua_algorithm.resume(model).

Parameters:
  • peer_selection_mode (str) – The way how workers communicate with each other. Currently "all" is supported. "all" means all workers’ weights are synchronized during each communication.

  • sync_interval_ms (int) – Number of milliseconds between model synchronizations.

  • warmup_steps (int) – Number of steps to warm up by doing gradient allreduce before doing asynchronous model averaging. Use 0 to disable.

class bagua.torch_api.algorithms.async_model_average.AsyncModelAverageAlgorithmImpl(process_group, peer_selection_mode='all', sync_interval_ms=500, warmup_steps=0)

Bases: bagua.torch_api.algorithms.AlgorithmImpl

Implementation of the AsyncModelAverage algorithm.

The asynchronous implementation is experimental, and imposes some restrictions. With such asynchronous algorithm, the number of iterations on each worker are different. Therefore the current implementation assumes that the dataset is an endless stream, and all workers continuously synchronize between each other.

Users should call abort to manually stop the algorithm’s continuous synchronization process. For example, for a model wrapped with .with_bagua(…), you can abort with model.bagua_algorithm.abort(model), and resume with model.bagua_algorithm.resume(model).

Parameters:
  • process_group (BaguaProcessGroup) – The process group to work on.

  • peer_selection_mode (str) – The way how workers communicate with each other. Currently "all" is supported. "all" means all workers’ weights are synchronized during each communication.

  • sync_interval_ms (int) – Number of milliseconds between model synchronizations.

  • warmup_steps (int) – Number of steps to warm up by doing gradient allreduce before doing asynchronous model averaging. Use 0 to disable.

abort(bagua_ddp)

Stop background asynchronous communications. Should be called after training.

Parameters:

bagua_ddp (Union[BaguaDistributedDataParallel, bagua.torch_api.distributed.BaguaModule]) – Bagua distributed data parallel module.

resume(bagua_ddp)

Resume aborted background asynchronous communications (see abort). Should be called before training.

Parameters:

bagua_ddp (Union[BaguaDistributedDataParallel, bagua.torch_api.distributed.BaguaModule]) – Bagua distributed data parallel module.

bagua.torch_api.algorithms.base
Module Contents
class bagua.torch_api.algorithms.base.Algorithm

This is the base class that all Bagua algorithms inherit.

classmethod init(name, **kwargs)

Helper class to initialize a registered Bagua algorithm.

Parameters:
  • name – Name of the registered Bagua algorithm.

  • kwargs – Arguments to initialize the registered Bagua algorithm.

Returns:

An instance of a registered Bagua algorithm.

Return type:

Algorithm

Example::
>>> from bagua.torch_api.algorithms import Algorithm
>>> algorithm = Algorithm.init("gradient_allreduce", hierarchical=True)

Note

Call str(bagua.torch_api.algorithms.GlobalAlgorithmRegistry) to see all registered Bagua algorithms.

reify(process_group)

Create an algorithm implementation instance. See AlgorithmImpl.

Parameters:

process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group to work on.

Returns:

An instance of Bagua algorithm implementation.

class bagua.torch_api.algorithms.base.AlgorithmImpl(process_group)

This is the base class that all Bagua algorithm implementations inherit.

It provides methods that can be override to implement different kinds of distributed algorithms.

Parameters:

process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group to work on.

init_backward_hook(bagua_ddp)

Given a BaguaDistributedDataParallel, return a hook function that will be executed on every parameter’s gradient computation completion.

Parameters:

bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

Returns:

A function that takes the name of a parameter (as in torch.nn.Module.named_parameters) and the parameter itself.

init_forward_pre_hook(bagua_ddp)

Given a BaguaDistributedDataParallel, return a hook function that will be executed before the forward process.

Parameters:

bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

Returns:

A function that takes the model’s input.

init_operations(bagua_ddp, bucket)

Given a BaguaDistributedDataParallel, and a BaguaBucket, register operations to be executed on the bucket.

Parameters:
  • bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

  • bucket (bagua.torch_api.bucket.BaguaBucket) – A single bucket to register operations.

init_post_backward_hook(bagua_ddp)

Given a BaguaDistributedDataParallel, return a hook function that will be executed when the backward pass is done.

Parameters:

bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

Returns:

A function that takes no argument.

init_post_optimizer_step_hook(bagua_ddp)

Given a BaguaDistributedDataParallel, return a hook function that will be executed when the optimizer.step() is done.

Parameters:

bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

Returns:

A function that gets called after an optimizer’s step() method is called. The function takes the optimizer as its argument.

init_tensors(bagua_ddp)

Given a BaguaDistributedDataParallel, return Bagua tensors to be used in Bagua for later operations.

Parameters:

bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

Returns:

A list of Bagua tensors for communication.

Return type:

List[bagua.torch_api.tensor.BaguaTensor]

need_reset()
Returns:

True if all initialization methods of the current algorithms should be called again. This is useful for algorithms that have multiple stages where each stage needs different initializations.

Return type:

bool

tensors_to_buckets(tensors, do_flatten)

Given the bucketing suggestion from Bagua, return the actual Bagua buckets. The default implementation follows the suggestion to do the bucketing.

Parameters:
  • tensors (List[List[bagua.torch_api.tensor.BaguaTensor]]) – Bagua tensors grouped in different lists, representing Bagua’s suggestion on how to bucketing the tensors.

  • do_flatten (bool) – Whether to flatten the Bagua buckets.

Returns:

A list of Bagua buckets.

Return type:

List[bagua.torch_api.bucket.BaguaBucket]

bagua.torch_api.algorithms.bytegrad
Module Contents
class bagua.torch_api.algorithms.bytegrad.ByteGradAlgorithm(hierarchical=True, average=True)

Bases: bagua.torch_api.algorithms.Algorithm

This is the base class that all Bagua algorithms inherit.

Create an instance of the ByteGrad algorithm.

Parameters:
  • hierarchical (bool) – Enable hierarchical communication.

  • average (bool) – If True, the gradients on each worker are averaged. Otherwise, they are summed.

class bagua.torch_api.algorithms.bytegrad.ByteGradAlgorithmImpl(process_group, hierarchical=True, average=True)

Bases: bagua.torch_api.algorithms.AlgorithmImpl

This is the base class that all Bagua algorithm implementations inherit.

It provides methods that can be override to implement different kinds of distributed algorithms.

Parameters:

Implementation of the ByteGrad algorithm.

Parameters:
  • process_group (BaguaProcessGroup) – The process group to work on.

  • hierarchical (bool) – Enable hierarchical communication.

  • average (bool) – If True, the gradients on each worker are averaged. Otherwise, they are summed.

bagua.torch_api.algorithms.decentralized
Module Contents
class bagua.torch_api.algorithms.decentralized.DecentralizedAlgorithm(hierarchical=True, peer_selection_mode='all', communication_interval=1)

Bases: bagua.torch_api.algorithms.Algorithm

Create an instance of the Decentralized SGD algorithm.

Parameters:
  • hierarchical (bool) – Enable hierarchical communication.

  • peer_selection_mode (str) – Can be "all" or "shift_one". "all" means all workers’ weights are averaged in each communication step. "shift_one" means each worker selects a different peer to do weights average in each communication step.

  • communication_interval (int) – Number of iterations between two communication steps.

class bagua.torch_api.algorithms.decentralized.DecentralizedAlgorithmImpl(process_group, hierarchical=True, peer_selection_mode='all', communication_interval=1)

Bases: bagua.torch_api.algorithms.AlgorithmImpl

Implementation of the Decentralized SGD algorithm.

Parameters:
  • process_group (BaguaProcessGroup) – The process group to work on.

  • hierarchical (bool) – Enable hierarchical communication.

  • peer_selection_mode (str) – Can be "all" or "shift_one". "all" means all workers’ weights are averaged in each communication step. "shift_one" means each worker selects a different peer to do weights average in each communication step.

  • communication_interval (int) – Number of iterations between two communication steps.

class bagua.torch_api.algorithms.decentralized.LowPrecisionDecentralizedAlgorithm(hierarchical=True, communication_interval=1)

Bases: bagua.torch_api.algorithms.Algorithm

Create an instance of the Low Precision Decentralized SGD algorithm.

Parameters:
  • hierarchical (bool) – Enable hierarchical communication.

  • communication_interval (int) – Number of iterations between two communication steps.

class bagua.torch_api.algorithms.decentralized.LowPrecisionDecentralizedAlgorithmImpl(process_group, hierarchical=True, communication_interval=1)

Bases: bagua.torch_api.algorithms.AlgorithmImpl

Implementation of the Low Precision Decentralized SGD algorithm.

Parameters:
  • process_group (BaguaProcessGroup) – The process group to work on.

  • hierarchical (bool) – Enable hierarchical communication.

  • communication_interval (int) – Number of iterations between two communication steps.

bagua.torch_api.algorithms.gradient_allreduce
Module Contents
class bagua.torch_api.algorithms.gradient_allreduce.GradientAllReduceAlgorithm(hierarchical=False, average=True)

Bases: bagua.torch_api.algorithms.base.Algorithm

Create an instance of the GradientAllReduce algorithm.

Parameters:
  • hierarchical (bool) – Enable hierarchical communication.

  • average (bool) – If True, the gradients on each worker are averaged. Otherwise, they are summed.

class bagua.torch_api.algorithms.gradient_allreduce.GradientAllReduceAlgorithmImpl(process_group, hierarchical=False, average=True)

Bases: bagua.torch_api.algorithms.base.AlgorithmImpl

Implementation of the GradientAllReduce algorithm.

Parameters:
  • process_group (BaguaProcessGroup) – The process group to work on.

  • hierarchical (bool) – Enable hierarchical communication.

  • average (bool) – If True, the gradients on each worker are averaged. Otherwise, they are summed.

bagua.torch_api.algorithms.q_adam
Module Contents
class bagua.torch_api.algorithms.q_adam.QAdamAlgorithm(q_adam_optimizer, hierarchical=True)

Bases: bagua.torch_api.algorithms.Algorithm

This is the base class that all Bagua algorithms inherit.

Create an instance of the QAdam Algorithm .

Parameters:
  • q_adam_optimizer (QAdamOptimizer) – A QAdamOptimizer initialized with model parameters.

  • hierarchical (bool) – Enable hierarchical communication.

class bagua.torch_api.algorithms.q_adam.QAdamAlgorithmImpl(process_group, q_adam_optimizer, hierarchical=True)

Bases: bagua.torch_api.algorithms.AlgorithmImpl

This is the base class that all Bagua algorithm implementations inherit.

It provides methods that can be override to implement different kinds of distributed algorithms.

Parameters:

Implementation of the QAdam Algorithm .

Parameters:
class bagua.torch_api.algorithms.q_adam.QAdamOptimizer(params, lr=0.001, warmup_steps=100, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.0)

Bases: torch.optim.optimizer.Optimizer

Create a dedicated optimizer used for QAdam algorithm.

Parameters:
  • params (iterable) – Iterable of parameters to optimize or dicts defining parameter groups.

  • lr (float) – Learning rate.

  • warmup_steps (int) – Number of steps to warm up by doing gradient allreduce before doing asynchronous model averaging. Use 0 to disable.

  • betas (Tuple[float, float]) – Coefficients used for computing running averages of gradient and its square.

  • eps (float) – Term added to the denominator to improve numerical stability.

  • weight_decay (float) – Weight decay (L2 penalty).

step(closure=None)
Package Contents
class bagua.torch_api.algorithms.Algorithm

This is the base class that all Bagua algorithms inherit.

classmethod init(name, **kwargs)

Helper class to initialize a registered Bagua algorithm.

Parameters:
  • name – Name of the registered Bagua algorithm.

  • kwargs – Arguments to initialize the registered Bagua algorithm.

Returns:

An instance of a registered Bagua algorithm.

Return type:

Algorithm

Example::
>>> from bagua.torch_api.algorithms import Algorithm
>>> algorithm = Algorithm.init("gradient_allreduce", hierarchical=True)

Note

Call str(bagua.torch_api.algorithms.GlobalAlgorithmRegistry) to see all registered Bagua algorithms.

reify(process_group)

Create an algorithm implementation instance. See AlgorithmImpl.

Parameters:

process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group to work on.

Returns:

An instance of Bagua algorithm implementation.

class bagua.torch_api.algorithms.AlgorithmImpl(process_group)

This is the base class that all Bagua algorithm implementations inherit.

It provides methods that can be override to implement different kinds of distributed algorithms.

Parameters:

process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group to work on.

init_backward_hook(bagua_ddp)

Given a BaguaDistributedDataParallel, return a hook function that will be executed on every parameter’s gradient computation completion.

Parameters:

bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

Returns:

A function that takes the name of a parameter (as in torch.nn.Module.named_parameters) and the parameter itself.

init_forward_pre_hook(bagua_ddp)

Given a BaguaDistributedDataParallel, return a hook function that will be executed before the forward process.

Parameters:

bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

Returns:

A function that takes the model’s input.

init_operations(bagua_ddp, bucket)

Given a BaguaDistributedDataParallel, and a BaguaBucket, register operations to be executed on the bucket.

Parameters:
  • bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

  • bucket (bagua.torch_api.bucket.BaguaBucket) – A single bucket to register operations.

init_post_backward_hook(bagua_ddp)

Given a BaguaDistributedDataParallel, return a hook function that will be executed when the backward pass is done.

Parameters:

bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

Returns:

A function that takes no argument.

init_post_optimizer_step_hook(bagua_ddp)

Given a BaguaDistributedDataParallel, return a hook function that will be executed when the optimizer.step() is done.

Parameters:

bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

Returns:

A function that gets called after an optimizer’s step() method is called. The function takes the optimizer as its argument.

init_tensors(bagua_ddp)

Given a BaguaDistributedDataParallel, return Bagua tensors to be used in Bagua for later operations.

Parameters:

bagua_ddp (bagua.torch_api.data_parallel.bagua_distributed.BaguaDistributedDataParallel) – bagua.torch_api.data_parallel.BaguaDistributedDataParallel.

Returns:

A list of Bagua tensors for communication.

Return type:

List[bagua.torch_api.tensor.BaguaTensor]

need_reset()
Returns:

True if all initialization methods of the current algorithms should be called again. This is useful for algorithms that have multiple stages where each stage needs different initializations.

Return type:

bool

tensors_to_buckets(tensors, do_flatten)

Given the bucketing suggestion from Bagua, return the actual Bagua buckets. The default implementation follows the suggestion to do the bucketing.

Parameters:
  • tensors (List[List[bagua.torch_api.tensor.BaguaTensor]]) – Bagua tensors grouped in different lists, representing Bagua’s suggestion on how to bucketing the tensors.

  • do_flatten (bool) – Whether to flatten the Bagua buckets.

Returns:

A list of Bagua buckets.

Return type:

List[bagua.torch_api.bucket.BaguaBucket]

bagua.torch_api.algorithms.GlobalAlgorithmRegistry
bagua.torch_api.checkpoint
Submodules
bagua.torch_api.checkpoint.checkpointing
Module Contents
bagua.torch_api.checkpoint.checkpointing.load_checkpoint(checkpoints_path, model, optimizer=None, lr_scheduler=None, strict=True)

Load a model checkpoint and return the iteration.

Parameters:
  • checkpoints_path (str) – Path of checkpoints.

  • model (BaguaModule) – The model to load on.

  • optimizer (torch.optim.Optimizer, optional) – The optimizer to load on. Default: None.

  • lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to load on. Default: None.

  • strict (bool, optional) – whether to strictly enforce that the keys in state_dict of the checkpoint match the keys returned by this module’s state_dict() function. Default: True.

bagua.torch_api.checkpoint.checkpointing.save_checkpoint(iteration, checkpoints_path, model, optimizer=None, lr_scheduler=None)

Save model checkpoint.

Parameters:
  • iteration (int) – Training Iteration.

  • checkpoints_path (str) – Path of checkpoints.

  • model (BaguaModule) – The model to save.

  • optimizer (torch.optim.Optimizer, optional) – The optimizer to save. Default: None.

  • lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to save. Default: None.

Package Contents
bagua.torch_api.checkpoint.load_checkpoint(checkpoints_path, model, optimizer=None, lr_scheduler=None, strict=True)

Load a model checkpoint and return the iteration.

Parameters:
  • checkpoints_path (str) – Path of checkpoints.

  • model (BaguaModule) – The model to load on.

  • optimizer (torch.optim.Optimizer, optional) – The optimizer to load on. Default: None.

  • lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to load on. Default: None.

  • strict (bool, optional) – whether to strictly enforce that the keys in state_dict of the checkpoint match the keys returned by this module’s state_dict() function. Default: True.

bagua.torch_api.checkpoint.save_checkpoint(iteration, checkpoints_path, model, optimizer=None, lr_scheduler=None)

Save model checkpoint.

Parameters:
  • iteration (int) – Training Iteration.

  • checkpoints_path (str) – Path of checkpoints.

  • model (BaguaModule) – The model to save.

  • optimizer (torch.optim.Optimizer, optional) – The optimizer to save. Default: None.

  • lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to save. Default: None.

bagua.torch_api.contrib
Subpackages
bagua.torch_api.contrib.fuse
Submodules
bagua.torch_api.contrib.fuse.optimizer
Module Contents
bagua.torch_api.contrib.fuse.optimizer.fuse_optimizer(optimizer, do_flatten=True, check_flatten=True)

Convert any optimizer into a fused optimizer.

A fused optimizer can fuse multiple parameter updates into one or a few updates. To achieve this, users need to:

1) flatten multiple parameters in the same group into fused parameter by setting do_flatten=True, which is also the default behavior of a fused optimizer;
2) perform a fused parameter update by calling fuse_step.

This fused optimizer is implemented for general use. It can be used used in conjunction with a BaguaModule as well as a torch.nn.parallel.DistributedDataParallel wrapped module, or some other cases (not listed here).

Parameters:
  • optimizer (torch.optim.Optimizer) – Any PyTorch optimizer.

  • do_flatten (bool) – Whether to flatten the parameters. The flatten operation will reset data pointers of parameter tensors so that they can be fused together. Default: True.

  • check_flatten (bool) – When setting to True, it enables fused optimizer to automatically check if parameter tensors are contiguous as they are flattened to. Can only work with do_flatten=True. Default: True.

Returns:

A Fused optimizer.

Example::
>>> optimizer = torch.optim.Adadelta(model.parameters(), ....)
>>> optimizer = bagua.torch_api.contrib.fuse_optimizer(optimizer, do_flatten=True)
>>>
>>> optimizer.fuse_step()

When use in conjunction with a BaguaModule, set do_flatten=False in with_bagua explicitly:

>>> optimizer = bagua.torch_api.contrib.fuse_optimizer(optimizer, do_flatten=True)
>>> model = model.with_bagua([optimizer], GradientAllReduceAlgorithm(), do_flatten=False)
>>>
>>> optimizer.fuse_step()

Note

This function and with_bagua method both will reset data pointers of module parameters by default. In order to perform a more effective fused parameter update, users need to disable bucket flattening in with_bagua by setting its do_flatten to False.

Note

A fuse optimizer does not change the original behaviors of optimizer, but enabling it to perform a fused parameter update through fuse_step. Users can still perform a normal parameter update through step.

bagua.torch_api.contrib.fuse.optimizer.fuse_step(optimizer, closure=None)

Perform a fused parameter update.

This operation will fuse multiple contiguous parameters into a fused parameter, by creating a tensor view sharing the same underlying storage with them, and then perform parameter update on fused parameters. If none of the parameter tensors are contiguous, this operation is equivalent to step.

Parameters:
  • optimizer (torch.optim.Optimizer) – A fused optimizer.

  • closure (Callable) – A closure that reevaluates the model and returns the loss. Optional for most optimizers.

Note

This function will not modify the storage of parameter tensors.

bagua.torch_api.contrib.fuse.optimizer.is_fused_optimizer(optimizer)

Checking if optimizer is a fused optimizer or not.

Parameters:

optimizer (torch.optim.Optimizer) –

bagua.torch_api.contrib.utils
Submodules
bagua.torch_api.contrib.utils.redis_store
Module Contents
class bagua.torch_api.contrib.utils.redis_store.RedisStore(hosts=None, cluster_mode=True, capacity_per_node=107374182400)

Bases: bagua.torch_api.contrib.utils.store.ClusterStore

A Redis-based distributed key-value store implementation, with set and get API exposed.

Parameters:
  • hosts (List[Dict[str, str]]) – A list of redis servers, defined by a list of dict containing Redis host and port information like [{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}]. A new Redis instance will be spawned on each node if hosts=None.

  • cluster_mode (bool) – If True, data is sharded across all Redis instances. Otherwise, if there are \(m\) Redis instances, the workers on the \(n\)-th node will use the \(n % m\)-th Redis instance.

  • capacity_per_node (int) – Maximum memory limit in bytes when spawning new Redis instances. Old values will be evicted when the limit is reached. Default is 100GB.

Note

All Bagua jobs within the same node will share the same local Redis instance if hosts=None. The capacity_per_node only affects newly spawned Redis instances, and has no effect on existing ones.

bagua.torch_api.contrib.utils.store
Module Contents
class bagua.torch_api.contrib.utils.store.ClusterStore(stores)

Bases: Store

Base class for distributed key-value stores.

In cluster store, entries will be sharded equally among multiple store instances based on their keys.

Parameters:

stores (List[Store]) – A list of stores to shard entries on.

class bagua.torch_api.contrib.utils.store.Store

Base class for key-value store implementations. Entries are added to store with set or mset, and retrieved with get or mget.

clear()

Delete all keys in the current store.

get(key)

Returns the value associated with key, or None if the key doesn’t exist.

Parameters:

key (str) –

Return type:

Optional[Union[str, bytes]]

mget(keys)

Retrieve each key’s corresponding value and return them in a list with the same order as keys.

Parameters:

keys (List[str]) –

Return type:

List[Optional[Union[str, bytes]]]

mset(dictionary)

Set multiple entries at once with a dictionary. Each key-value pair in the dictionary will be set.

Parameters:

dictionary (Dict[str, Union[str, bytes]]) –

num_keys()

Returns the number of keys in the current store.

Return type:

int

set(key, value)

Set a key-value pair.

Parameters:
  • key (str) –

  • value (Union[str, bytes]) –

shutdown()

Shutdown the managed store instances. Unmanaged instances will not be killed.

status()

Returns True if the current store is alive.

Return type:

bool

Submodules
bagua.torch_api.contrib.cache_loader
Module Contents
class bagua.torch_api.contrib.cache_loader.CacheLoader(backend='redis', dataset_name='', writer_buffer_size=1, **kwargs)

Cache loader caches values calculated by an expensive function by theirs keys via get, so that the values can be retrieved faster next time.

Internally, values are indexed by "{dataset_name}_{key}" and saved in a distributed key-value store, where dataset_name is specified on initializing, and key is the argument in get.

By default, cache loader uses RedisStore as its backend distributed key-value store implementation. It supports using a list of existing redis servers or spawning new redis servers. Parameters for RedisStore can be provided here in **kwargs.

Parameters:
  • backend (str) – Backend distributed key-value store implementation. Can be "redis".

  • dataset_name (str) – Name of the dataset. Default "".

  • writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.

Example::

To use a list of existing redis servers for the “redis” backend:

>>> from bagua.torch_api.contrib import CacheLoader
>>>
>>> hosts = [{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}]
>>> loader = CacheLoader(backend="redis", hosts=hosts, cluster_mode=True, dataset_name="test")
>>>
>>> loader.get(index, lambda x: items[x])

To spawn new redis servers on training nodes for the “redis” backend, each node with a maximum memory limit of 100000000 bytes:

>>> loader = CacheLoader(backend="redis", hosts=None, cluster_mode=True, capacity_per_node=100000000)

Note

Cache loaders with the same dataset_name will reuse and overwrite each other’s cache. Use a different dataset_name if this is not desired.

get(key, load_fn)

Returns the value associated with key in cache, use load_fn to create the entry if the key does not exist in the cache. load_fn is a function taking key as its argument, and returning corresponding value to be cached.

Parameters:
  • key (str) –

  • load_fn (Callable[[str], None]) –

num_keys()

Returns the number of keys in the cache.

bagua.torch_api.contrib.cached_dataset
Module Contents
class bagua.torch_api.contrib.cached_dataset.CachedDataset(dataset, backend='redis', dataset_name='', writer_buffer_size=20, **kwargs)

Bases: torch.utils.data.dataset.Dataset

Cached dataset wraps a PyTorch dataset to cache its samples in memory, so that accessing these samples after the first time can be much faster. This is useful when samples need tedious preprocessing to produce, or reading the dataset itself is slow, which could slow down the whole training process.

Internally, the samples are indexed by a string key "{dataset_name}_{index}" and saved in a distributed key-value store, where dataset_name is specified when initializing the cached dataset, and index is the index of a specific sample (the argument of __getitem__ method in a PyTorch dataset).

Parameters:
  • dataset (torch.utils.data.dataset.Dataset) – PyTorch dataset to be wrapped.

  • backend (str) – Backend distributed key-value store implementation. Can be "redis".

  • dataset_name (str) – Name of the dataset. Default "".

  • writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.

Example:

>>> from bagua.torch_api.contrib import CachedDataset
>>> cache_dataset = CachedDataset(dataset, backend="redis", dataset_name="ds")
>>> dataloader = torch.utils.data.DataLoader(cached_dataset)

Note

Cached dataset is a special case of cache loader. Parameter backend and writer_buffer_size in initializing a cached dataset have the same meanings as those in initializing a cache loader. You can provide the arguments for cache loader here in **kwargs. See also CacheLoader.

cache_loader

The backend cache loader instance.

bagua.torch_api.contrib.load_balancing_data_loader
Module Contents
class bagua.torch_api.contrib.load_balancing_data_loader.LoadBalancingDistributedBatchSampler(sampler, batch_fn, drop_last=False)

Bases: torch.utils.data.sampler.Sampler

Wraps another load balance sampler to yield variable sized mini-batches.

Parameters:
  • sampler (LoadBalancingDistributedSampler) – Load balance sampler.

  • batch_fn (Callable) – Callable to yield mini-batch indices.

  • drop_last (bool) – If True, the sampler will drop the last few batches exceeding the least number of batches among replicas, otherwise, the number of batches on each replica will be padded to the same.

batch_fn will have the signature of:

def batch_fn(indices: List[int]) -> List[List[int]]
Example::
>>> from bagua.torch_api.contrib import LoadBalancingDistributedSampler, \
...     LoadBalancingDistributedBatchSampler
>>>
>>> sampler = LoadBalancingDistributedSampler(dataset, complexity_fn=complexity_fn)
>>> batch_sampler = LoadBalancingDistributedBatchSampler(sampler, batch_fn=batch_fn)
>>> loader = torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler)
>>>
>>> for epoch in range(start_epoch, n_epochs):
...     batch_sampler.set_epoch(epoch)
...     train(loader)
set_epoch(epoch)

Sets the epoch for this sampler. When shuffle=True, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.

Parameters:

epoch (int) – Epoch number.

Return type:

None

class bagua.torch_api.contrib.load_balancing_data_loader.LoadBalancingDistributedSampler(dataset, complexity_fn, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False, random_level=0)

Bases: torch.utils.data.sampler.Sampler

Sampler that restricts data loading to a subset of the dataset.

This sampler use a complexity_fn to calculate each sample’s computational complexity and make each batch get similar computation complexity.

This is useful in scenarios like speech and NLP, where each batch has variable length and distributed training suffers from straggler problem.

The usage is similar to torch.utils.data.DistributedSampler, where each process loads a subset of the original dataset that is exclusive to it.

Note

Dataset is assumed to be of constant size.

Parameters:
  • dataset (torch.utils.data.dataset.Dataset) – Dataset used for sampling.

  • complexity_fn (Callable) – A function whose input is a sample and output is an integer as a measure of the computational complexity of the sample.

  • num_replicas (int, optional) – Number of processes participating in distributed training. By default, world_size is retrieved from the current distributed group.

  • rank (int, optional) – Rank of the current process within num_replicas. By default, rank is retrieved from the current distributed group.

  • shuffle (bool, optional) – If True (default), sampler will shuffle the indices.

  • seed (int, optional) – random seed used to shuffle the sampler if shuffle=True. This number should be identical across all processes in the distributed group. Default: 0.

  • drop_last (bool, optional) – if True, then the sampler will drop the tail of the data to make it evenly divisible across the number of replicas. If False, the sampler will add extra indices to make the data evenly divisible across the replicas. Default: False.

  • random_level (float, optional) – A float varies from 0 and 1 that controls the extent of load balance. 0 means the best load balance, while 1 means the opposite.

Warning

In distributed mode, calling the set_epoch method at the beginning of each epoch before creating the DataLoader iterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be always used.

Example::

Define your complexity_fn, which accepts a dataset sample as its input and produces an integer as the sample’s computational complexity:

>>> dataset = torch.utils.data.TensorDataset(torch.randn(n, 2), torch.randperm(n))
>>> complexity_fn = lambda x: x[1]

Below is the usage of LoadBalancingDistributedSampler and DataLoader:

>>> sampler = bagua.torch_api.contrib.LoadBalancingDistributedSampler(
...     dataset,
...     complexity_fn=complexity_fn) if is_distributed else None
>>> loader = torch.utils.data.DataLoader(dataset,
...     shuffle=(sampler is None),
...     sampler=sampler)
>>>
>>> for epoch in range(start_epoch, n_epochs):
...     if is_distributed:
...         sampler.set_epoch(epoch)
...     train(loader)
set_epoch(epoch)

Sets the epoch for this sampler. When shuffle=True, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.

Parameters:

epoch (int) – Epoch number.

Return type:

None

bagua.torch_api.contrib.sync_batchnorm
Module Contents
class bagua.torch_api.contrib.sync_batchnorm.SyncBatchNorm(num_features, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)

Bases: torch.nn.modules.batchnorm._BatchNorm

Applies synchronous BatchNorm for distributed module with N-dimensional BatchNorm layer(s). See BatchNorm for more details.

Parameters:
  • num_features – Number of channels \(C\) from the shape \((N, C, ...)\).

  • eps – A value added to the denominator for numerical stability. Default: 1e-5.

  • momentum – The value used for the running_mean and running_var computation. Can be set to None for cumulative moving average (i.e. simple average). Default: 0.1.

  • affine – A boolean value that when set to True, this module has learnable affine parameters. Default: True.

  • track_running_stats – A boolean value that when set to True, this module tracks the running mean and variance, and when set to False, this module does not track such statistics and always uses batch statistics in both training and eval modes. Default: True.

Note

Only GPU input tensors are supported in the training mode.

classmethod convert_sync_batchnorm(module)

Helper function to convert all BatchNorm*D layers in the model to torch.nn.SyncBatchNorm layers.

Parameters:

module (nn.Module) – Module containing one or more BatchNorm*D layers

Returns:

The original module with the converted torch.nn.SyncBatchNorm layers. If the original module is a BatchNorm*D layer, a new torch.nn.SyncBatchNorm layer object will be returned instead.

Note

This function must be called before with_bagua method.

Example::
>>> # Network with nn.BatchNorm layer
>>> model = torch.nn.Sequential(
...      torch.nn.Linear(D_in, H),
...      torch.nn.ReLU(),
...      torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...      model.parameters(),
...      lr=0.01,
...      momentum=0.9
...    )
>>> sync_bn_model = bagua.torch_api.contrib.sync_batchnorm.SyncBatchNorm.convert_sync_batchnorm(model)
>>> bagua_model = sync_bn_model.with_bagua([optimizer], GradientAllReduce())
forward(input)
Package Contents
class bagua.torch_api.contrib.CacheLoader(backend='redis', dataset_name='', writer_buffer_size=1, **kwargs)

Cache loader caches values calculated by an expensive function by theirs keys via get, so that the values can be retrieved faster next time.

Internally, values are indexed by "{dataset_name}_{key}" and saved in a distributed key-value store, where dataset_name is specified on initializing, and key is the argument in get.

By default, cache loader uses RedisStore as its backend distributed key-value store implementation. It supports using a list of existing redis servers or spawning new redis servers. Parameters for RedisStore can be provided here in **kwargs.

Parameters:
  • backend (str) – Backend distributed key-value store implementation. Can be "redis".

  • dataset_name (str) – Name of the dataset. Default "".

  • writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.

Example::

To use a list of existing redis servers for the “redis” backend:

>>> from bagua.torch_api.contrib import CacheLoader
>>>
>>> hosts = [{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}]
>>> loader = CacheLoader(backend="redis", hosts=hosts, cluster_mode=True, dataset_name="test")
>>>
>>> loader.get(index, lambda x: items[x])

To spawn new redis servers on training nodes for the “redis” backend, each node with a maximum memory limit of 100000000 bytes:

>>> loader = CacheLoader(backend="redis", hosts=None, cluster_mode=True, capacity_per_node=100000000)

Note

Cache loaders with the same dataset_name will reuse and overwrite each other’s cache. Use a different dataset_name if this is not desired.

get(key, load_fn)

Returns the value associated with key in cache, use load_fn to create the entry if the key does not exist in the cache. load_fn is a function taking key as its argument, and returning corresponding value to be cached.

Parameters:
  • key (str) –

  • load_fn (Callable[[str], None]) –

num_keys()

Returns the number of keys in the cache.

class bagua.torch_api.contrib.CachedDataset(dataset, backend='redis', dataset_name='', writer_buffer_size=20, **kwargs)

Bases: torch.utils.data.dataset.Dataset

Cached dataset wraps a PyTorch dataset to cache its samples in memory, so that accessing these samples after the first time can be much faster. This is useful when samples need tedious preprocessing to produce, or reading the dataset itself is slow, which could slow down the whole training process.

Internally, the samples are indexed by a string key "{dataset_name}_{index}" and saved in a distributed key-value store, where dataset_name is specified when initializing the cached dataset, and index is the index of a specific sample (the argument of __getitem__ method in a PyTorch dataset).

Parameters:
  • dataset (torch.utils.data.dataset.Dataset) – PyTorch dataset to be wrapped.

  • backend (str) – Backend distributed key-value store implementation. Can be "redis".

  • dataset_name (str) – Name of the dataset. Default "".

  • writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.

Example:

>>> from bagua.torch_api.contrib import CachedDataset
>>> cache_dataset = CachedDataset(dataset, backend="redis", dataset_name="ds")
>>> dataloader = torch.utils.data.DataLoader(cached_dataset)

Note

Cached dataset is a special case of cache loader. Parameter backend and writer_buffer_size in initializing a cached dataset have the same meanings as those in initializing a cache loader. You can provide the arguments for cache loader here in **kwargs. See also CacheLoader.

cache_loader

The backend cache loader instance.

class bagua.torch_api.contrib.LoadBalancingDistributedBatchSampler(sampler, batch_fn, drop_last=False)

Bases: torch.utils.data.sampler.Sampler

Wraps another load balance sampler to yield variable sized mini-batches.

Parameters:
  • sampler (LoadBalancingDistributedSampler) – Load balance sampler.

  • batch_fn (Callable) – Callable to yield mini-batch indices.

  • drop_last (bool) – If True, the sampler will drop the last few batches exceeding the least number of batches among replicas, otherwise, the number of batches on each replica will be padded to the same.

batch_fn will have the signature of:

def batch_fn(indices: List[int]) -> List[List[int]]
Example::
>>> from bagua.torch_api.contrib import LoadBalancingDistributedSampler, \
...     LoadBalancingDistributedBatchSampler
>>>
>>> sampler = LoadBalancingDistributedSampler(dataset, complexity_fn=complexity_fn)
>>> batch_sampler = LoadBalancingDistributedBatchSampler(sampler, batch_fn=batch_fn)
>>> loader = torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler)
>>>
>>> for epoch in range(start_epoch, n_epochs):
...     batch_sampler.set_epoch(epoch)
...     train(loader)
set_epoch(epoch)

Sets the epoch for this sampler. When shuffle=True, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.

Parameters:

epoch (int) – Epoch number.

Return type:

None

class bagua.torch_api.contrib.LoadBalancingDistributedSampler(dataset, complexity_fn, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False, random_level=0)

Bases: torch.utils.data.sampler.Sampler

Sampler that restricts data loading to a subset of the dataset.

This sampler use a complexity_fn to calculate each sample’s computational complexity and make each batch get similar computation complexity.

This is useful in scenarios like speech and NLP, where each batch has variable length and distributed training suffers from straggler problem.

The usage is similar to torch.utils.data.DistributedSampler, where each process loads a subset of the original dataset that is exclusive to it.

Note

Dataset is assumed to be of constant size.

Parameters:
  • dataset (torch.utils.data.dataset.Dataset) – Dataset used for sampling.

  • complexity_fn (Callable) – A function whose input is a sample and output is an integer as a measure of the computational complexity of the sample.

  • num_replicas (int, optional) – Number of processes participating in distributed training. By default, world_size is retrieved from the current distributed group.

  • rank (int, optional) – Rank of the current process within num_replicas. By default, rank is retrieved from the current distributed group.

  • shuffle (bool, optional) – If True (default), sampler will shuffle the indices.

  • seed (int, optional) – random seed used to shuffle the sampler if shuffle=True. This number should be identical across all processes in the distributed group. Default: 0.

  • drop_last (bool, optional) – if True, then the sampler will drop the tail of the data to make it evenly divisible across the number of replicas. If False, the sampler will add extra indices to make the data evenly divisible across the replicas. Default: False.

  • random_level (float, optional) – A float varies from 0 and 1 that controls the extent of load balance. 0 means the best load balance, while 1 means the opposite.

Warning

In distributed mode, calling the set_epoch method at the beginning of each epoch before creating the DataLoader iterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be always used.

Example::

Define your complexity_fn, which accepts a dataset sample as its input and produces an integer as the sample’s computational complexity:

>>> dataset = torch.utils.data.TensorDataset(torch.randn(n, 2), torch.randperm(n))
>>> complexity_fn = lambda x: x[1]

Below is the usage of LoadBalancingDistributedSampler and DataLoader:

>>> sampler = bagua.torch_api.contrib.LoadBalancingDistributedSampler(
...     dataset,
...     complexity_fn=complexity_fn) if is_distributed else None
>>> loader = torch.utils.data.DataLoader(dataset,
...     shuffle=(sampler is None),
...     sampler=sampler)
>>>
>>> for epoch in range(start_epoch, n_epochs):
...     if is_distributed:
...         sampler.set_epoch(epoch)
...     train(loader)
set_epoch(epoch)

Sets the epoch for this sampler. When shuffle=True, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.

Parameters:

epoch (int) – Epoch number.

Return type:

None

bagua.torch_api.contrib.fuse_optimizer(optimizer, do_flatten=True, check_flatten=True)

Convert any optimizer into a fused optimizer.

A fused optimizer can fuse multiple parameter updates into one or a few updates. To achieve this, users need to:

1) flatten multiple parameters in the same group into fused parameter by setting do_flatten=True, which is also the default behavior of a fused optimizer;
2) perform a fused parameter update by calling fuse_step.

This fused optimizer is implemented for general use. It can be used used in conjunction with a BaguaModule as well as a torch.nn.parallel.DistributedDataParallel wrapped module, or some other cases (not listed here).

Parameters:
  • optimizer (torch.optim.Optimizer) – Any PyTorch optimizer.

  • do_flatten (bool) – Whether to flatten the parameters. The flatten operation will reset data pointers of parameter tensors so that they can be fused together. Default: True.

  • check_flatten (bool) – When setting to True, it enables fused optimizer to automatically check if parameter tensors are contiguous as they are flattened to. Can only work with do_flatten=True. Default: True.

Returns:

A Fused optimizer.

Example::
>>> optimizer = torch.optim.Adadelta(model.parameters(), ....)
>>> optimizer = bagua.torch_api.contrib.fuse_optimizer(optimizer, do_flatten=True)
>>>
>>> optimizer.fuse_step()

When use in conjunction with a BaguaModule, set do_flatten=False in with_bagua explicitly:

>>> optimizer = bagua.torch_api.contrib.fuse_optimizer(optimizer, do_flatten=True)
>>> model = model.with_bagua([optimizer], GradientAllReduceAlgorithm(), do_flatten=False)
>>>
>>> optimizer.fuse_step()

Note

This function and with_bagua method both will reset data pointers of module parameters by default. In order to perform a more effective fused parameter update, users need to disable bucket flattening in with_bagua by setting its do_flatten to False.

Note

A fuse optimizer does not change the original behaviors of optimizer, but enabling it to perform a fused parameter update through fuse_step. Users can still perform a normal parameter update through step.

bagua.torch_api.data_parallel
Submodules
bagua.torch_api.data_parallel.bagua_distributed
bagua.torch_api.data_parallel.distributed
Module Contents
bagua.torch_api.data_parallel.distributed.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=True, optimizers=[], algorithm=GradientAllReduceAlgorithm())

This function provides a PyTorch DDP compatible interface plus several Bagua specific parameters.

Parameters:
  • module (Module) – module to be parallelized

  • device_ids (Optional[List[Union[int, torch.device]]], optional) –

    CUDA devices.

    1) For single-device modules, device_ids can contain exactly one device id, which represents the only CUDA device where the input module corresponding to this process resides. Alternatively, device_ids can also be None.

    2) For multi-device modules and CPU modules, device_ids must be None.

    When device_ids is None for both cases, both the input data for the forward pass and the actual module must be placed on the correct device. (default: None)

  • output_device (Union[int, torch.device], optional) – Device location of output for single-device CUDA modules. For multi-device modules and CPU modules, it must be None, and the module itself dictates the output location. (default: device_ids[0] for single-device modules)

  • dim (int, optional) – Flag that enables syncing (broadcasting) buffers of the module at beginning of the forward function. (default: True)

  • broadcast_buffers (bool, optional) – Flag that enables syncing (broadcasting) buffers of the module at beginning of the forward function. (default: True)

  • process_group (Union[None, TorchProcessGroup], optional) – The process group to be used for distributed data all-reduction. If None, the default process group, which is created by torch.distributed.init_process_group, will be used. (default: None)

  • bucket_cap_mb (int, optional) – DistributedDataParallel will bucket parameters into multiple buckets so that gradient reduction of each bucket can potentially overlap with backward computation. bucket_cap_mb controls the bucket size in MegaBytes (MB). (default: 25)

  • find_unused_parameters (bool, optional) – Traverse the autograd graph from all tensors contained in the return value of the wrapped module’s forward function. Parameters that don’t receive gradients as part of this graph are preemptively marked as being ready to be reduced. In addition, parameters that may have been used in the wrapped module’s forward function but were not part of loss computation and thus would also not receive gradients are preemptively marked as ready to be reduced. (default: False)

  • check_reduction (bool, optional) – This argument is deprecated.

  • gradient_as_bucket_view (bool, optional) – When set to True, gradients will be views pointing to different offsets of allreduce communication buckets. This can reduce peak memory usage, where the saved memory size will be equal to the total gradients size. Moreover, it avoids the overhead of copying between gradients and allreduce communication buckets. When gradients are views, detach_() cannot be called on the gradients. If hitting such errors, please fix it by referring to the zero_grad function in torch/optim/optimizer.py as a solution.

  • optimizers (List[torch.optim.Optimizer], optional) – Optimizer(s) used by the module. It can contain one or more PyTorch optimizers. Defaults to [].

  • algorithm (bagua.torch_api.algorithms.Algorithm, optional) – Data parallel distributed algorithm, decide how to communication mode and the way the model is updated. Defaults to GradientAllReduceAlgorithm.

Returns:

Bagua distributed data parallel instance used for distributed training.

Return type:

Union[TorchDistributedDataParallel, DistributedDataParallel_V1_9_0]

Example:

>>> bagua.init_process_group()
>>> net = bagua.data_parallel.DistributedDataParallel(model)

Example using faster algorithms in Bagua:

>>> from bagua.torch_api.algorithms import bytegrad
>>> bagua.init_process_group()
>>> net = bagua.data_parallel.DistributedDataParallel(model, algorithm=bytegrad.ByteGradAlgorithm())
>>> # For more possible algorithms, see https://tutorials.baguasys.com/algorithms/.
bagua.torch_api.data_parallel.distributed.to_bagua_process_group(process_group=None)

Convert a PyTorch process group to a Bagua process group.

Parameters:

process_group (Union[TorchProcessGroup, BaguaProcessGroup, None], optional) – PyTorch process group or Bagua process group. The default PyTorch process group is used if None is passed in.

Raises:

Exception – raise unexpect input exception if input is not TorchProcessGroup, BaguaProcessGroup or None.

Returns:

process group for communication in bagua.

Return type:

BaguaProcessGroup

bagua.torch_api.data_parallel.functional
Module Contents
class bagua.torch_api.data_parallel.functional.ReduceOp

Bases: enum.IntEnum

An enum-like class for available reduction operations: SUM, PRODUCT, MIN, MAX, BAND, BOR, BXOR and AVG.

Initialize self. See help(type(self)) for accurate signature.

AVG = 10
BAND = 8
BOR = 7
BXOR = 9
MAX = 3
MIN = 2
PRODUCT = 1
SUM = 0
bagua.torch_api.data_parallel.functional.all_reduce(tensor, op=dist.ReduceOp.SUM, group=dist.group.WORLD)

Reduces the tensor data across all machines in such a way that all get the final result.

After the call the returned tensor is going to be bitwise identical in all processes.

Parameters:
  • tensor (Tensor) – Input of the collective.

  • op (optional) – One of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions.

  • group (ProcessGroup, optional) – The process group to work on.

Returns:

Output of the collective

Return type:

Tensor

Package Contents
bagua.torch_api.data_parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=True, optimizers=[], algorithm=GradientAllReduceAlgorithm())

This function provides a PyTorch DDP compatible interface plus several Bagua specific parameters.

Parameters:
  • module (Module) – module to be parallelized

  • device_ids (Optional[List[Union[int, torch.device]]], optional) –

    CUDA devices.

    1) For single-device modules, device_ids can contain exactly one device id, which represents the only CUDA device where the input module corresponding to this process resides. Alternatively, device_ids can also be None.

    2) For multi-device modules and CPU modules, device_ids must be None.

    When device_ids is None for both cases, both the input data for the forward pass and the actual module must be placed on the correct device. (default: None)

  • output_device (Union[int, torch.device], optional) – Device location of output for single-device CUDA modules. For multi-device modules and CPU modules, it must be None, and the module itself dictates the output location. (default: device_ids[0] for single-device modules)

  • dim (int, optional) – Flag that enables syncing (broadcasting) buffers of the module at beginning of the forward function. (default: True)

  • broadcast_buffers (bool, optional) – Flag that enables syncing (broadcasting) buffers of the module at beginning of the forward function. (default: True)

  • process_group (Union[None, TorchProcessGroup], optional) – The process group to be used for distributed data all-reduction. If None, the default process group, which is created by torch.distributed.init_process_group, will be used. (default: None)

  • bucket_cap_mb (int, optional) – DistributedDataParallel will bucket parameters into multiple buckets so that gradient reduction of each bucket can potentially overlap with backward computation. bucket_cap_mb controls the bucket size in MegaBytes (MB). (default: 25)

  • find_unused_parameters (bool, optional) – Traverse the autograd graph from all tensors contained in the return value of the wrapped module’s forward function. Parameters that don’t receive gradients as part of this graph are preemptively marked as being ready to be reduced. In addition, parameters that may have been used in the wrapped module’s forward function but were not part of loss computation and thus would also not receive gradients are preemptively marked as ready to be reduced. (default: False)

  • check_reduction (bool, optional) – This argument is deprecated.

  • gradient_as_bucket_view (bool, optional) – When set to True, gradients will be views pointing to different offsets of allreduce communication buckets. This can reduce peak memory usage, where the saved memory size will be equal to the total gradients size. Moreover, it avoids the overhead of copying between gradients and allreduce communication buckets. When gradients are views, detach_() cannot be called on the gradients. If hitting such errors, please fix it by referring to the zero_grad function in torch/optim/optimizer.py as a solution.

  • optimizers (List[torch.optim.Optimizer], optional) – Optimizer(s) used by the module. It can contain one or more PyTorch optimizers. Defaults to [].

  • algorithm (bagua.torch_api.algorithms.Algorithm, optional) – Data parallel distributed algorithm, decide how to communication mode and the way the model is updated. Defaults to GradientAllReduceAlgorithm.

Returns:

Bagua distributed data parallel instance used for distributed training.

Return type:

Union[TorchDistributedDataParallel, DistributedDataParallel_V1_9_0]

Example:

>>> bagua.init_process_group()
>>> net = bagua.data_parallel.DistributedDataParallel(model)

Example using faster algorithms in Bagua:

>>> from bagua.torch_api.algorithms import bytegrad
>>> bagua.init_process_group()
>>> net = bagua.data_parallel.DistributedDataParallel(model, algorithm=bytegrad.ByteGradAlgorithm())
>>> # For more possible algorithms, see https://tutorials.baguasys.com/algorithms/.
bagua.torch_api.model_parallel
Subpackages
bagua.torch_api.model_parallel.moe
Submodules
bagua.torch_api.model_parallel.moe.experts
Module Contents
class bagua.torch_api.model_parallel.moe.experts.Experts(expert, num_local_experts=1)

Bases: torch.nn.Module

forward(inputs)
bagua.torch_api.model_parallel.moe.layer
Module Contents
class bagua.torch_api.model_parallel.moe.layer.MoE(hidden_size, expert, num_local_experts=1, k=1, output_dropout_prob=0.0, capacity_factor=1.0, eval_capacity_factor=1.0, min_capacity=4, noisy_gate_policy=None)

Bases: torch.nn.Module

Initialize an MoE layer.

Parameters:
  • hidden_size (int) – the hidden dimension of the model, importantly this is also the input and output dimension.

  • expert (torch.nn.Module) – the torch module that defines the expert (e.g., MLP, torch.linear).

  • num_local_experts (int, optional) – default=1, number of local experts per gpu.

  • k (int, optional) – default=1, top-k gating value, only supports k=1 or k=2.

  • output_dropout_prob (float, optional) – default=0.0, output dropout probability.

  • capacity_factor (float, optional) – default=1.0, the capacity of the expert at training time.

  • eval_capacity_factor (float, optional) – default=1.0, the capacity of the expert at eval time.

  • min_capacity (int, optional) – default=4, the minimum capacity per expert regardless of the capacity_factor.

  • noisy_gate_policy (str, optional) – default=None, noisy gate policy, valid options are ‘Jitter’, ‘RSample’ or ‘None’.

forward(hidden_states, used_token=None)

MoE forward

Parameters:
  • hidden_states (Tensor) – input to the layer

  • used_token (Tensor, optional) – default: None, mask only used tokens

Returns:

A tuple including output, gate loss, and expert count.

  • output (Tensor): output of the model

  • l_aux (Tensor): gate loss value

  • exp_counts (int): expert count

bagua.torch_api.model_parallel.moe.sharded_moe
Module Contents
class bagua.torch_api.model_parallel.moe.sharded_moe.MOELayer(gate, experts, num_local_experts, group=None)

Bases: Base

MOELayer module which implements MixtureOfExperts as described in Gshard_.

gate = TopKGate(model_dim, num_experts)
moe = MOELayer(gate, expert)
output = moe(input)
l_aux = moe.l_aux
Parameters:
  • gate (torch.nn.Module) – gate network

  • expert (torch.nn.Module) – expert network

  • experts (torch.nn.Module) –

  • num_local_experts (int) –

  • group (Optional[Any]) –

forward(*input, **kwargs)
Parameters:
  • input (torch.Tensor) –

  • kwargs (Any) –

Return type:

torch.Tensor

class bagua.torch_api.model_parallel.moe.sharded_moe.TopKGate(model_dim, num_experts, k=1, capacity_factor=1.0, eval_capacity_factor=1.0, min_capacity=4, noisy_gate_policy=None)

Bases: torch.nn.Module

Gate module which implements Top2Gating as described in Gshard_.

gate = TopKGate(model_dim, num_experts)
l_aux, combine_weights, dispatch_mask = gate(input)
Parameters:
  • model_dim (int) – size of model embedding dimension

  • num_experts (ints) – number of experts in model

  • k (int) –

  • capacity_factor (float) –

  • eval_capacity_factor (float) –

  • min_capacity (int) –

  • noisy_gate_policy (Optional[str]) –

wg :torch.nn.Linear
forward(input, used_token=None)
Parameters:
  • input (torch.Tensor) –

  • used_token (torch.Tensor) –

Return type:

Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]

bagua.torch_api.model_parallel.moe.sharded_moe.gumbel_rsample(shape, device)
Parameters:
  • shape (Tuple) –

  • device (torch.device) –

Return type:

torch.Tensor

bagua.torch_api.model_parallel.moe.sharded_moe.multiplicative_jitter(x, device, epsilon=0.01)

Modified from swtich transformer paper. mesh transformers Multiply values by a random number between 1-epsilon and 1+epsilon. Makes models more resilient to rounding errors introduced by bfloat16. This seems particularly important for logits. :param x: a torch.tensor :param device: torch.device :param epsilon: a floating point value

Returns:

a jittered x.

Parameters:

device (torch.device) –

bagua.torch_api.model_parallel.moe.sharded_moe.top1gating(logits, capacity_factor, min_capacity, used_token=None, noisy_gate_policy=None)

Implements Top1Gating on logits.

Parameters:
  • logits (torch.Tensor) –

  • capacity_factor (float) –

  • min_capacity (int) –

  • used_token (torch.Tensor) –

  • noisy_gate_policy (Optional[str]) –

Return type:

Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]

bagua.torch_api.model_parallel.moe.sharded_moe.top2gating(logits, capacity_factor)

Implements Top2Gating on logits.

Parameters:
  • logits (torch.Tensor) –

  • capacity_factor (float) –

Return type:

Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]

bagua.torch_api.model_parallel.moe.sharded_moe.Base
bagua.torch_api.model_parallel.moe.sharded_moe.exp_selection_uniform_map :Dict[torch.device, Callable]
bagua.torch_api.model_parallel.moe.sharded_moe.gumbel_map :Dict[torch.device, Callable]
bagua.torch_api.model_parallel.moe.sharded_moe.uniform_map :Dict[torch.device, Callable]
bagua.torch_api.model_parallel.moe.utils
Module Contents
bagua.torch_api.model_parallel.moe.utils.is_moe_param(param)
Parameters:

param (torch.Tensor) –

Return type:

bool

Package Contents
class bagua.torch_api.model_parallel.moe.MoE(hidden_size, expert, num_local_experts=1, k=1, output_dropout_prob=0.0, capacity_factor=1.0, eval_capacity_factor=1.0, min_capacity=4, noisy_gate_policy=None)

Bases: torch.nn.Module

Initialize an MoE layer.

Parameters:
  • hidden_size (int) – the hidden dimension of the model, importantly this is also the input and output dimension.

  • expert (torch.nn.Module) – the torch module that defines the expert (e.g., MLP, torch.linear).

  • num_local_experts (int, optional) – default=1, number of local experts per gpu.

  • k (int, optional) – default=1, top-k gating value, only supports k=1 or k=2.

  • output_dropout_prob (float, optional) – default=0.0, output dropout probability.

  • capacity_factor (float, optional) – default=1.0, the capacity of the expert at training time.

  • eval_capacity_factor (float, optional) – default=1.0, the capacity of the expert at eval time.

  • min_capacity (int, optional) – default=4, the minimum capacity per expert regardless of the capacity_factor.

  • noisy_gate_policy (str, optional) – default=None, noisy gate policy, valid options are ‘Jitter’, ‘RSample’ or ‘None’.

forward(hidden_states, used_token=None)

MoE forward

Parameters:
  • hidden_states (Tensor) – input to the layer

  • used_token (Tensor, optional) – default: None, mask only used tokens

Returns:

A tuple including output, gate loss, and expert count.

  • output (Tensor): output of the model

  • l_aux (Tensor): gate loss value

  • exp_counts (int): expert count

bagua.torch_api.model_parallel.moe.is_moe_param(param)
Parameters:

param (torch.Tensor) –

Return type:

bool

Submodules
bagua.torch_api.bucket
Module Contents
class bagua.torch_api.bucket.BaguaBucket(tensors, name, flatten, alignment=1)

Create a Bagua bucket with a list of Bagua tensors.

Parameters:
  • tensors (List[bagua.torch_api.tensor.BaguaTensor]) – A list of Bagua tensors to be put in the bucket.

  • name (str) – The unique name of the bucket.

  • flatten (bool) – If True, flatten the input tensors so that they are contiguous in memory.

  • alignment (int) – If alignment > 1, Bagua will create a padding tensor to the bucket so that the total number of elements in the bucket divides the given alignment.

name

The bucket’s name.

tensors

The Bagua tensors contained in the bucket.

append_asynchronous_model_average_op(peer_selection_mode, group=None)

Append an asynchronous model average operation to a bucket. This operation will enable continuous model averaging between workers while training a model.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

This operation is intended to run in parallel with the computation process. It returns a reference to the op. The op features a lock to exclusively access the model. Call op.lock_weight() to acquire the lock and op.unlock_weight() to release it.

Parameters:
  • peer_selection_mode (str) – The way how workers communicate with each otehr. Currently "all" is supported. "all" means all workers’ weights are averaged during each communication.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

Returns:

The asynchronous model average operation itself.

append_centralized_synchronous_op(hierarchical=False, average=True, scattergather=False, compression=None, group=None)

Append a centralized synchronous operation to a bucket. It will sum or average the tensors in the bucket for all workers.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters:
  • hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.

  • average (bool) – If True, the gradients on each worker are averaged. Otherwise, they are summed.

  • scattergather (bool) – If True, the communication between workers are done with scatter gather instead of allreduce. This is required for using compression.

  • compression (Optional[str]) – If not None, the tensors will be compressed for communication. Currently "MinMaxUInt8" is supported.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

append_decentralized_synchronous_op(peer_weight, hierarchical=True, peer_selection_mode='all', group=None)

Append a decentralized synchronous operation to a bucket. It will do gossipy style model averaging among workers.

This operation is not inplace, which means the bucket weights is first copied to peer_weight, and the result of decentralized averaging will be in peer_weight. To copy peer_weight back to self, call op.copy_back_peer_weight(self).

This operation will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters:
  • peer_weight (BaguaTensor) – A tensor used for averaging model with peers, should be of the same size with the bucket tensors total size. Use self.flattened_tensor().ensure_bagua_tensor(...) to create such a tensor.

  • hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.

  • peer_selection_mode (str) – Can be "all" or "shift_one". "all" means all workers’ weights are averaged in each communication step. "shift_one" means each worker selects a different peer to do weights average in each communication step.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

Returns:

The decentralized synchronous operation itself.

append_low_precision_decentralized_synchronous_op(weight, left_peer_weight, right_peer_weight, hierarchical=True, compression='MinMaxUInt8', group=None)

Append a low precision decentralized synchronous operation to a bucket. It will compress the difference of local models between two successive iterations and exchange them among workers.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters:
  • weight (BaguaTensor) – Model replica of current worker’s local model. It should be of the same size with the bucket tensors total size. Use self.flattened_tensor().ensure_bagua_tensor(...) to create such a tensor.

  • left_peer_weight (BaguaTensor) – Model replica of current worker’s left peer. It should be of the same size with the bucket tensors total size. Use self.flattened_tensor().ensure_bagua_tensor(...) to create such a tensor, then copy the initializing weights of current worker’s left peer to the tensor.

  • right_peer_weight (BaguaTensor) – Model replica of current worker’s right peer. It should be of the same size with the bucket tensors total size. Use self.flattened_tensor().ensure_bagua_tensor(...) to create such a tensor. then copy the initializing weights of current worker’s right peer to the tensor.

  • hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.

  • compression (str) – The way how tensors are compressed for communication. Currently "MinMaxUInt8" is supported.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

append_python_op(python_function, group=None)

Append a Python operation to a bucket. A Python operation is a Python function that takes the bucket’s name and returns None. It can do arbitrary things within the function body.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters:
bytes()

Returns the total number of bytes occupied by the bucket.

Return type:

int

check_flatten()
Returns:

True if effective tensors are contiguous in memory.

Return type:

bool

clear_ops()

Clear the previously appended operations.

Return type:

BaguaBucket

flattened_tensor()

Returns a tensor contiguous in memory which contains the same data as effective tensors, i.e. returned by calling bagua_getter_closure on self tensors and padding tensor (if exists).

Return type:

torch.Tensor

bagua.torch_api.communication
Module Contents
class bagua.torch_api.communication.BaguaProcessGroup(ranks, stream, group_name)

Definition of Bagua process group.

get_global_communicator()

Returns the global communicator of current process group.

Return type:

bagua_core.BaguaSingleCommunicatorPy

get_inter_node_communicator()

Returns the inter-node communicator of current process group.

Return type:

bagua_core.BaguaSingleCommunicatorPy

get_intra_node_communicator()

Returns the intra-node communicator of current process group.

Return type:

bagua_core.BaguaSingleCommunicatorPy

class bagua.torch_api.communication.ReduceOp

Bases: enum.IntEnum

An enum-like class for available reduction operations: SUM, PRODUCT, MIN, MAX, BAND, BOR, BXOR and AVG.

Initialize self. See help(type(self)) for accurate signature.

AVG = 10
BAND = 8
BOR = 7
BXOR = 9
MAX = 3
MIN = 2
PRODUCT = 1
SUM = 0
bagua.torch_api.communication.allgather(send_tensor, recv_tensor, comm=None)

Gathers send tensors from all processes associated with the communicator into recv_tensor.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have a size of comm.nranks * send_tensor.size() elements.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.allgather_inplace(tensor, comm=None)

The in-place version of allgather.

Parameters:
  • tensor (torch.Tensor) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.communication.allreduce(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)

Reduces the tensor data across all processes associated with the communicator in such a way that all get the final result. After the call recv_tensor is going to be bitwise identical in all processes.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have the same size with send_tensor.

  • op (ReduceOp) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

Examples:

>>> from bagua.torch_api import allreduce
>>>
>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> send_tensor = torch.arange(2, dtype=torch.int64, device=tensor.device) + 1 + 2 * rank
>>> recv_tensor = torch.zeros(2, dtype=torch.int64, device=tensor.device)
>>> send_tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> allreduce(send_tensor, recv_tensor)
>>> recv_tensor
tensor([4, 6], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1

>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> send_tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=tensor.device) + 2 * rank * (1+1j)
>>> recv_tensor = torch.zeros(2, dtype=torch.cfloat, device=tensor.device)
>>> send_tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> allreduce(send_tensor, recv_tensor)
>>> recv_tensor
tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0
tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
bagua.torch_api.communication.allreduce_inplace(tensor, op=ReduceOp.SUM, comm=None)

The in-place version of allreduce.

Parameters:
  • tensor (torch.Tensor) –

  • op (ReduceOp) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.communication.alltoall(send_tensor, recv_tensor, comm=None)

Each process scatters send_tensor to all processes associated with the communicator and return the gathered data in recv_tensor.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by comm.nranks.

  • recv_tensor (torch.Tensor) – Output of the collective, must have equal size with send_tensor.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.alltoall_inplace(tensor, comm=None)

The in-place version of alltoall.

Parameters:
  • tensor (torch.Tensor) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.communication.barrier(comm=None)

Synchronizes all processes. This collective blocks processes until all processes associated with the communicator enters this function.

Parameters:

comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.broadcast(tensor, src=0, comm=None)

Broadcasts the tensor to all processes associated with the communicator.

tensor must have the same number of elements in all processes participating in the collective.

Parameters:
  • tensor (torch.Tensor) – Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise.

  • src (int) – Source rank. Default: 0.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.broadcast_object(obj, src=0, comm=None)

Serializes and broadcasts an object from root rank to all other processes. Typical usage is to broadcast the optimizer.state_dict(), for example:

>>> state_dict = broadcast_object(optimizer.state_dict(), 0)
>>> if get_rank() > 0:
>>>     optimizer.load_state_dict(state_dict)
Parameters:
  • obj (object) – An object capable of being serialized without losing any context.

  • src (int) – The rank of the process from which parameters will be broadcasted to all other processes.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

Returns:

The object that was broadcasted from the src.

Return type:

object

Note

This operation will move data to GPU before communication and back to CPU after communication, and it requires CPU-GPU synchronization.

bagua.torch_api.communication.from_torch_group(group, stream=None)

Convert a Pytorch process group to its equivalent Bagua process group.

Parameters:
  • group – A handle of the Pytorch process group.

  • stream (Optional[torch.cuda.Stream]) – A CUDA stream used to execute NCCL operations. If None, CUDA stream of the default group will be used. See new_group for more information.

Returns:

A handle of the Bagua process group.

Return type:

BaguaProcessGroup

bagua.torch_api.communication.gather(send_tensor, recv_tensor, dst, comm=None)

Gathers send tensors from all processes associated with the communicator to recv_tensor in a single process.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have a size of comm.nranks * send_tensor.size() elements.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.gather_inplace(tensor, count, dst, comm=None)

The in-place version of gather.

Parameters:
  • tensor (torch.Tensor) – Input and output of the collective, On the dst rank, it must have a size of comm.nranks * count elements. On non-dst ranks, its size must be equal to :attr:count.

  • count (int) – The per-rank data count to gather.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.init_process_group(store=None)

Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of Bagua.

Parameters:

store (Optional[torch.distributed.Store]) – Key/value store accessible to all workers, used to exchange connection/address information. If None, a TCP-based store will be created. Default: None.

Examples::
>>> import torch
>>> import bagua.torch_api as bagua
>>>
>>> torch.cuda.set_device(bagua.get_local_rank()) # THIS LINE IS IMPORTANT. See the notes below.
>>> bagua.init_process_group()
>>>
>>> model = torch.nn.Sequential(
...    torch.nn.Linear(D_in, H),
...    torch.nn.ReLU(),
...    torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...    model.parameters(),
...    lr=0.01,
...    momentum=0.9
...    )
>>> model = model.with_bagua([optimizer], ...)

Note

Each process should be associated to a CUDA device using torch.cuda.set_device(), before calling init_process_group. Otherwise you may encounter the fatal runtime error: Rust cannot catch foreign exceptions error.

bagua.torch_api.communication.is_initialized()

Checking if the default process group has been initialized.

bagua.torch_api.communication.new_group(ranks=None, stream=None)

Creates a new process group.

This function requires that all processes in the default group (i.e. all processes that are part of the distributed job) enter this function, even if they are not going to be members of the group. Additionally, groups should be created in the same order in all processes.

Each process group will create three communicators on request, a global communicator, a inter-node communicator and a intra-node communicator. Users can access them through group.get_global_communicator(), group.get_inter_node_communicator() and group.get_intra_node_communicator() respectively.

Parameters:
  • ranks (Optional[List[int]]) – List of ranks of group members. If None, will be set to all ranks. Default is None.

  • stream (Optional[torch.cuda.Stream]) – A CUDA stream used to execute NCCL operations. If None, CUDA stream of the default group will be used. See CUDA semantics for details.

Returns:

A handle of process group that can be given to collective calls.

Return type:

BaguaProcessGroup

Note

The global communicator is used for global communications involving all ranks in the process group. The inter-node communicator and the intra-node communicator is used for hierarchical communications in this process group.

Note

For a specific communicator comm, comm.rank() returns the rank of current process and comm.nranks() returns the size of the communicator.

bagua.torch_api.communication.recv(tensor, src, comm=None)

Receives a tensor synchronously.

Parameters:
  • tensor (torch.Tensor) – Tensor to fill with received data.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.reduce(send_tensor, recv_tensor, dst, op=ReduceOp.SUM, comm=None)

Reduces the tensor data across all processes.

Only the process whit rank dst is going to receive the final result.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have the same size with send_tensor.

  • dst (int) – Destination rank.

  • op (ReduceOp) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.reduce_inplace(tensor, dst, op=ReduceOp.SUM, comm=None)

The in-place version of reduce.

Parameters:
  • tensor (torch.Tensor) –

  • dst (int) –

  • op (ReduceOp) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.communication.reduce_scatter(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)

Reduces, then scatters send_tensor to all processes associated with the communicator.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective, must have a size of comm.nranks * recv_tensor.size() elements.

  • recv_tensor (torch.Tensor) – Output of the collective.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.reduce_scatter_inplace(tensor, op=ReduceOp.SUM, comm=None)

The in-place version of reduce_scatter.

Parameters:
  • tensor (torch.Tensor) – Input and output of the collective, the size must be divisible by comm.nranks.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.scatter(send_tensor, recv_tensor, src, comm=None)

Scatters send tensor to all processes associated with the communicator.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective, must have a size of comm.nranks * recv_tensor.size() elements.

  • recv_tensor (torch.Tensor) – Output of the collective.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.scatter_inplace(tensor, count, src, comm=None)

The in-place version of scatter.

Parameters:
  • tensor (torch.Tensor) – Input and output of the collective, On the src rank, it must have a size of comm.nranks * count elements. On non-src ranks, its size must be equal to count.

  • count (int) – The per-rank data count to scatter.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.communication.send(tensor, dst, comm=None)

Sends a tensor to dst synchronously.

Parameters:
  • tensor (torch.Tensor) – Tensor to send.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.distributed
Module Contents
class bagua.torch_api.distributed.BaguaModule

This class patches torch.nn.Module with several methods to enable Bagua functionalities.

Variables:
  • bagua_optimizers (str) – The optimizers passed in by with_bagua.

  • bagua_algorithm (bagua.torch_api.algorithms.AlgorithmImpl) – The algorithm implementation used by the module, reified by the algorithm passed in by with_bagua.

  • process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group used by the module.

  • bagua_module_name – The module’s name. Bagua uses the module name to distinguish different modules.

  • parameters_to_ignore (List[str]) – The parameter names in "{module_name}.{param_name}" format to ignore when calling self.bagua_build_params().

  • bagua_train_step_counter (int) – Number of iterations in training mode.

  • bagua_buckets (List[bagua.torch_api.bucket.BaguaBucket]) – All Bagua buckets in a list.

with_bagua(optimizers, algorithm, process_group=None, do_flatten=True)

with_bagua enables easy distributed data parallel training on a torch.nn.Module.

Parameters:
  • optimizers (List[torch.optim.Optimizer]) – Optimizer(s) used by the module. It can contain one or more PyTorch optimizers.

  • algorithm (bagua.torch_api.algorithms.Algorithm) – Distributed algorithm used to do the actual communication and update.

  • process_group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to be used for distributed data all-reduction. If None, the default process group, which is created by bagua.torch_api.init_process_group, will be used. (default: None)

  • do_flatten (bool) – Whether to flatten the Bagua buckets. The flatten operation will reset data pointer of bucket tensors so that they can use faster code paths. Default: True.

Returns:

The original module, with Bagua related environments initialized.

Return type:

BaguaModule

Note

If we want to ignore some layers for communication, we can first check these layer’s corresponding keys in the module’s state_dict (they are in "{module_name}.{param_name}" format), then assign the list of keys to your_module._bagua_params_and_buffers_to_ignore.

Examples:

>>> model = torch.nn.Sequential(
...      torch.nn.Linear(D_in, H),
...      torch.nn.ReLU(),
...      torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...      model.parameters(),
...      lr=0.01,
...      momentum=0.9
...    )
>>> model = model.with_bagua(
...      [optimizer],
...      GradientAllReduce()
...    )
bagua.torch_api.env
Module Contents
bagua.torch_api.env.find_free_network_port()

Finds a free port on localhost.

Return type:

int

bagua.torch_api.env.get_autotune_server_wait_time()
Return type:

int

bagua.torch_api.env.get_default_bucket_size()

Get default communication bucket byte size.

Returns:

The default bucket size.

Return type:

int

bagua.torch_api.env.get_local_rank()

Get the rank of current node.

Local rank is a unique identifier assigned to each process within a node. They are always consecutive integers ranging from 0 to local_size.

Returns:

The local rank of the node.

Return type:

int

bagua.torch_api.env.get_local_size()

Get the number of processes in the node.

Returns:

The local size of the node.

Return type:

int

bagua.torch_api.env.get_node_rank()

Get the rank among all nodes.

Returns:

The node rank of the node.

Return type:

int

bagua.torch_api.env.get_rank()

Get the rank of the default process group.

Rank is a unique identifier assigned to each process within the default process group. They are always consecutive integers ranging from 0 to world_size.

Returns:

The rank of the default process group.

Return type:

int

bagua.torch_api.env.get_world_size()

Get the number of processes in the default process group.

Returns:

The world size of the default process group.

Return type:

int

bagua.torch_api.tensor
Module Contents
class bagua.torch_api.tensor.BaguaTensor

This class patch torch.Tensor with additional methods.

A Bagua tensor is required to use Bagua’s communication algorithms. Users can convert a PyTorch tensor to Bagua tensor by ensure_bagua_tensor.

Bagua tensor features a proxy structure, where the actual tensor used by backend is accessed via a “Proxy Tensor”. The proxy tensor is registered in Bagua, whenever the Bagua backend needs a tensor (for example use it for communication), it calls the bagua_getter_closure on the proxy tensor to get the tensor that is actually worked on. We call this tensor “Effective Tensor”. The bagua_setter_closure is also provided to replace the effective tensor during runtime. It is intended to be used to replace the effective tensor with customized workflow.

Their relation can be seen in the following diagram:

https://user-images.githubusercontent.com/18649508/139179394-51d0c0f5-e233-4ada-8e5e-0e70a889540d.png

For example, in the gradient allreduce algorithm, the effective tensor that needs to be exchanged between machines is the gradient. In this case, we will register the model parameters as proxy tensor, and register bagua_getter_closure to be lambda proxy_tensor: proxy_tensor.grad. In this way, even if the gradient tensor is recreated or changed during runtime, Bagua can still identify the correct tensor and use it for communication, since the proxy tensor serves as the root for access and is never replaced.

bagua_backend_tensor()
Returns:

The raw Bagua backend tensor.

Return type:

bagua_core.BaguaTensorPy

bagua_ensure_grad()

Create a zero gradient tensor for the current parameter if not exist.

Returns:

The original tensor.

Return type:

torch.Tensor

bagua_getter_closure()

Returns the tensor that will be used in runtime.

Return type:

torch.Tensor

bagua_mark_communication_ready()

Mark a Bagua tensor ready for scheduled operations execution.

bagua_mark_communication_ready_without_synchronization()

Mark a Bagua tensor ready immediately, without CUDA event synchronization.

bagua_set_storage(storage, storage_offset=0)

Sets the underlying storage for the effective tensor returned by bagua_getter_closure with an existing torch.Storage.

Parameters:
  • storage (torch.Storage) – The storage to use.

  • storage_offset (int) – The offset in the storage.

bagua_setter_closure(tensor)

Sets the tensor that will be used in runtime to a new Pytorch tensor tensor.

Parameters:

tensor (torch.Tensor) – The new tensor to be set to.

ensure_bagua_tensor(name=None, module_name=None, getter_closure=None, setter_closure=None)

Convert a PyTorch tensor or parameter to Bagua tensor inplace and return it. A Bagua tensor is required to use Bagua’s communication algorithms.

This operation will register self as proxy tensor to the Bagua backend. getter_closure takes the proxy tensor as input and returns a Pytorch tensor. When using the Bagua tensor, the getter_closure will be called and returns the effective tensor which will be used for communication and other operations. For example, if one of a model’s parameter param is registered as proxy tensor, and getter_closure is lambda x: x.grad, during runtime its gradient will be used.

setter_closure takes the proxy tensor and another tensor as inputs and returns nothing. It is mainly used for changing the effective tensor used in runtime. For example when one of a model’s parameter param is registered as proxy tensor, and getter_closure is lambda x: x.grad, the setter_closure can be lambda param, new_grad_tensor: setattr(param, "grad", new_grad_tensor). When the setter_closure is called, the effective tensor used in later operations will be changed to new_grad_tensor.

Parameters:
  • name (Optional[str]) – The unique name of the tensor.

  • module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using model.bagua_module_name. This is required to call bagua_mark_communication_ready related methods.

  • getter_closure (Optional[Callable[[torch.Tensor], torch.Tensor]]) – A function that accepts a Pytorch tensor as its input and returns a Pytorch tensor as its output. Could be None, which means an identity mapping lambda x: x is used. Default: None.

  • setter_closure (Optional[Callable[[torch.Tensor, torch.Tensor], None]]) – A function that accepts two Pytorch tensors as its inputs and returns nothing. Could be None, which is a no-op. Default: None.

Returns:

The original tensor with Bagua tensor attributes initialized.

is_bagua_tensor()

Checking if this is a Bagua tensor.

Return type:

bool

to_bagua_tensor(name=None, module_name=None, getter_closure=None, setter_closure=None)

Create a new Bagua tensor from a PyTorch tensor or parameter and return it. The new Bagua tensor will share the same storage with the input PyTorch tensor. A Bagua tensor is required to use Bagua’s communication algorithms. See ensure_bagua_tensor for more information.

Caveat: Be aware that if the original tensor changes to use a different storage using for example torch.Tensor.set_(...), the new Bagua tensor will still use the old storage.

Parameters:
  • name (Optional[str]) – The unique name of the tensor.

  • module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using model.bagua_module_name. This is required to call bagua_mark_communication_ready related methods.

  • getter_closure (Optional[Callable[[torch.Tensor], torch.Tensor]]) – A function that accepts a Pytorch tensor as its input and returns a Pytorch tensor as its output. See ensure_bagua_tensor.

  • setter_closure (Optional[Callable[[torch.Tensor, torch.Tensor], None]]) – A function that accepts two Pytorch tensors as its inputs and returns nothing. See ensure_bagua_tensor.

Returns:

The new Bagua tensor sharing the same storage with the original tensor.

Package Contents
class bagua.torch_api.BaguaModule

This class patches torch.nn.Module with several methods to enable Bagua functionalities.

Variables:
  • bagua_optimizers (str) – The optimizers passed in by with_bagua.

  • bagua_algorithm (bagua.torch_api.algorithms.AlgorithmImpl) – The algorithm implementation used by the module, reified by the algorithm passed in by with_bagua.

  • process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group used by the module.

  • bagua_module_name – The module’s name. Bagua uses the module name to distinguish different modules.

  • parameters_to_ignore (List[str]) – The parameter names in "{module_name}.{param_name}" format to ignore when calling self.bagua_build_params().

  • bagua_train_step_counter (int) – Number of iterations in training mode.

  • bagua_buckets (List[bagua.torch_api.bucket.BaguaBucket]) – All Bagua buckets in a list.

with_bagua(optimizers, algorithm, process_group=None, do_flatten=True)

with_bagua enables easy distributed data parallel training on a torch.nn.Module.

Parameters:
  • optimizers (List[torch.optim.Optimizer]) – Optimizer(s) used by the module. It can contain one or more PyTorch optimizers.

  • algorithm (bagua.torch_api.algorithms.Algorithm) – Distributed algorithm used to do the actual communication and update.

  • process_group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to be used for distributed data all-reduction. If None, the default process group, which is created by bagua.torch_api.init_process_group, will be used. (default: None)

  • do_flatten (bool) – Whether to flatten the Bagua buckets. The flatten operation will reset data pointer of bucket tensors so that they can use faster code paths. Default: True.

Returns:

The original module, with Bagua related environments initialized.

Return type:

BaguaModule

Note

If we want to ignore some layers for communication, we can first check these layer’s corresponding keys in the module’s state_dict (they are in "{module_name}.{param_name}" format), then assign the list of keys to your_module._bagua_params_and_buffers_to_ignore.

Examples:

>>> model = torch.nn.Sequential(
...      torch.nn.Linear(D_in, H),
...      torch.nn.ReLU(),
...      torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...      model.parameters(),
...      lr=0.01,
...      momentum=0.9
...    )
>>> model = model.with_bagua(
...      [optimizer],
...      GradientAllReduce()
...    )
class bagua.torch_api.BaguaTensor

This class patch torch.Tensor with additional methods.

A Bagua tensor is required to use Bagua’s communication algorithms. Users can convert a PyTorch tensor to Bagua tensor by ensure_bagua_tensor.

Bagua tensor features a proxy structure, where the actual tensor used by backend is accessed via a “Proxy Tensor”. The proxy tensor is registered in Bagua, whenever the Bagua backend needs a tensor (for example use it for communication), it calls the bagua_getter_closure on the proxy tensor to get the tensor that is actually worked on. We call this tensor “Effective Tensor”. The bagua_setter_closure is also provided to replace the effective tensor during runtime. It is intended to be used to replace the effective tensor with customized workflow.

Their relation can be seen in the following diagram:

https://user-images.githubusercontent.com/18649508/139179394-51d0c0f5-e233-4ada-8e5e-0e70a889540d.png

For example, in the gradient allreduce algorithm, the effective tensor that needs to be exchanged between machines is the gradient. In this case, we will register the model parameters as proxy tensor, and register bagua_getter_closure to be lambda proxy_tensor: proxy_tensor.grad. In this way, even if the gradient tensor is recreated or changed during runtime, Bagua can still identify the correct tensor and use it for communication, since the proxy tensor serves as the root for access and is never replaced.

bagua_backend_tensor()
Returns:

The raw Bagua backend tensor.

Return type:

bagua_core.BaguaTensorPy

bagua_ensure_grad()

Create a zero gradient tensor for the current parameter if not exist.

Returns:

The original tensor.

Return type:

torch.Tensor

bagua_getter_closure()

Returns the tensor that will be used in runtime.

Return type:

torch.Tensor

bagua_mark_communication_ready()

Mark a Bagua tensor ready for scheduled operations execution.

bagua_mark_communication_ready_without_synchronization()

Mark a Bagua tensor ready immediately, without CUDA event synchronization.

bagua_set_storage(storage, storage_offset=0)

Sets the underlying storage for the effective tensor returned by bagua_getter_closure with an existing torch.Storage.

Parameters:
  • storage (torch.Storage) – The storage to use.

  • storage_offset (int) – The offset in the storage.

bagua_setter_closure(tensor)

Sets the tensor that will be used in runtime to a new Pytorch tensor tensor.

Parameters:

tensor (torch.Tensor) – The new tensor to be set to.

ensure_bagua_tensor(name=None, module_name=None, getter_closure=None, setter_closure=None)

Convert a PyTorch tensor or parameter to Bagua tensor inplace and return it. A Bagua tensor is required to use Bagua’s communication algorithms.

This operation will register self as proxy tensor to the Bagua backend. getter_closure takes the proxy tensor as input and returns a Pytorch tensor. When using the Bagua tensor, the getter_closure will be called and returns the effective tensor which will be used for communication and other operations. For example, if one of a model’s parameter param is registered as proxy tensor, and getter_closure is lambda x: x.grad, during runtime its gradient will be used.

setter_closure takes the proxy tensor and another tensor as inputs and returns nothing. It is mainly used for changing the effective tensor used in runtime. For example when one of a model’s parameter param is registered as proxy tensor, and getter_closure is lambda x: x.grad, the setter_closure can be lambda param, new_grad_tensor: setattr(param, "grad", new_grad_tensor). When the setter_closure is called, the effective tensor used in later operations will be changed to new_grad_tensor.

Parameters:
  • name (Optional[str]) – The unique name of the tensor.

  • module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using model.bagua_module_name. This is required to call bagua_mark_communication_ready related methods.

  • getter_closure (Optional[Callable[[torch.Tensor], torch.Tensor]]) – A function that accepts a Pytorch tensor as its input and returns a Pytorch tensor as its output. Could be None, which means an identity mapping lambda x: x is used. Default: None.

  • setter_closure (Optional[Callable[[torch.Tensor, torch.Tensor], None]]) – A function that accepts two Pytorch tensors as its inputs and returns nothing. Could be None, which is a no-op. Default: None.

Returns:

The original tensor with Bagua tensor attributes initialized.

is_bagua_tensor()

Checking if this is a Bagua tensor.

Return type:

bool

to_bagua_tensor(name=None, module_name=None, getter_closure=None, setter_closure=None)

Create a new Bagua tensor from a PyTorch tensor or parameter and return it. The new Bagua tensor will share the same storage with the input PyTorch tensor. A Bagua tensor is required to use Bagua’s communication algorithms. See ensure_bagua_tensor for more information.

Caveat: Be aware that if the original tensor changes to use a different storage using for example torch.Tensor.set_(...), the new Bagua tensor will still use the old storage.

Parameters:
  • name (Optional[str]) – The unique name of the tensor.

  • module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using model.bagua_module_name. This is required to call bagua_mark_communication_ready related methods.

  • getter_closure (Optional[Callable[[torch.Tensor], torch.Tensor]]) – A function that accepts a Pytorch tensor as its input and returns a Pytorch tensor as its output. See ensure_bagua_tensor.

  • setter_closure (Optional[Callable[[torch.Tensor, torch.Tensor], None]]) – A function that accepts two Pytorch tensors as its inputs and returns nothing. See ensure_bagua_tensor.

Returns:

The new Bagua tensor sharing the same storage with the original tensor.

class bagua.torch_api.ReduceOp

Bases: enum.IntEnum

An enum-like class for available reduction operations: SUM, PRODUCT, MIN, MAX, BAND, BOR, BXOR and AVG.

Initialize self. See help(type(self)) for accurate signature.

AVG = 10
BAND = 8
BOR = 7
BXOR = 9
MAX = 3
MIN = 2
PRODUCT = 1
SUM = 0
bagua.torch_api.allgather(send_tensor, recv_tensor, comm=None)

Gathers send tensors from all processes associated with the communicator into recv_tensor.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have a size of comm.nranks * send_tensor.size() elements.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.allgather_inplace(tensor, comm=None)

The in-place version of allgather.

Parameters:
  • tensor (torch.Tensor) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.allreduce(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)

Reduces the tensor data across all processes associated with the communicator in such a way that all get the final result. After the call recv_tensor is going to be bitwise identical in all processes.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have the same size with send_tensor.

  • op (ReduceOp) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

Examples:

>>> from bagua.torch_api import allreduce
>>>
>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> send_tensor = torch.arange(2, dtype=torch.int64, device=tensor.device) + 1 + 2 * rank
>>> recv_tensor = torch.zeros(2, dtype=torch.int64, device=tensor.device)
>>> send_tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> allreduce(send_tensor, recv_tensor)
>>> recv_tensor
tensor([4, 6], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1

>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> send_tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=tensor.device) + 2 * rank * (1+1j)
>>> recv_tensor = torch.zeros(2, dtype=torch.cfloat, device=tensor.device)
>>> send_tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> allreduce(send_tensor, recv_tensor)
>>> recv_tensor
tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0
tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
bagua.torch_api.allreduce_inplace(tensor, op=ReduceOp.SUM, comm=None)

The in-place version of allreduce.

Parameters:
  • tensor (torch.Tensor) –

  • op (ReduceOp) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.alltoall(send_tensor, recv_tensor, comm=None)

Each process scatters send_tensor to all processes associated with the communicator and return the gathered data in recv_tensor.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by comm.nranks.

  • recv_tensor (torch.Tensor) – Output of the collective, must have equal size with send_tensor.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.alltoall_inplace(tensor, comm=None)

The in-place version of alltoall.

Parameters:
  • tensor (torch.Tensor) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.alltoall_v(send_tensor, send_counts, send_displs, recv_tensor, recv_counts, recv_displs, comm=None)

Each process scatters send_tensor to all processes associated with the communicator and return the gathered data in recv_tensor, each process may send a different amount of data and provide displacements for the input and output data.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by comm.nranks.

  • send_counts (int) – integer array equal to the group size specifying the number of elements to send to each processor.

  • send_displs (int) – integer array (of length group size). Entry j specifies the displacement (relative to sendbuf from which to take the outgoing data destined for process j.

  • recv_tensor (torch.Tensor) – Output of the collective, must have equal size with send_tensor.

  • recv_counts (int) – integer array equal to the group size specifying the maximum number of elements that can be received from each processor.

  • recv_displs (int) – integer array (of length group size). Entry i specifies the displacement (relative to recvbuf at which to place the incoming data from process i.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.alltoall_v_inplace(tensor, counts, displs, comm=None)

The in-place version of alltoall_v.

Parameters:
  • tensor (torch.Tensor) –

  • counts (int) –

  • displs (int) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.broadcast(tensor, src=0, comm=None)

Broadcasts the tensor to all processes associated with the communicator.

tensor must have the same number of elements in all processes participating in the collective.

Parameters:
  • tensor (torch.Tensor) – Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise.

  • src (int) – Source rank. Default: 0.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.gather(send_tensor, recv_tensor, dst, comm=None)

Gathers send tensors from all processes associated with the communicator to recv_tensor in a single process.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have a size of comm.nranks * send_tensor.size() elements.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.gather_inplace(tensor, count, dst, comm=None)

The in-place version of gather.

Parameters:
  • tensor (torch.Tensor) – Input and output of the collective, On the dst rank, it must have a size of comm.nranks * count elements. On non-dst ranks, its size must be equal to :attr:count.

  • count (int) – The per-rank data count to gather.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.get_backend(model_name)
Parameters:

model_name (str) –

bagua.torch_api.get_local_rank()

Get the rank of current node.

Local rank is a unique identifier assigned to each process within a node. They are always consecutive integers ranging from 0 to local_size.

Returns:

The local rank of the node.

Return type:

int

bagua.torch_api.get_local_size()

Get the number of processes in the node.

Returns:

The local size of the node.

Return type:

int

bagua.torch_api.get_rank()

Get the rank of the default process group.

Rank is a unique identifier assigned to each process within the default process group. They are always consecutive integers ranging from 0 to world_size.

Returns:

The rank of the default process group.

Return type:

int

bagua.torch_api.get_world_size()

Get the number of processes in the default process group.

Returns:

The world size of the default process group.

Return type:

int

bagua.torch_api.init_process_group(store=None)

Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of Bagua.

Parameters:

store (Optional[torch.distributed.Store]) – Key/value store accessible to all workers, used to exchange connection/address information. If None, a TCP-based store will be created. Default: None.

Examples::
>>> import torch
>>> import bagua.torch_api as bagua
>>>
>>> torch.cuda.set_device(bagua.get_local_rank()) # THIS LINE IS IMPORTANT. See the notes below.
>>> bagua.init_process_group()
>>>
>>> model = torch.nn.Sequential(
...    torch.nn.Linear(D_in, H),
...    torch.nn.ReLU(),
...    torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...    model.parameters(),
...    lr=0.01,
...    momentum=0.9
...    )
>>> model = model.with_bagua([optimizer], ...)

Note

Each process should be associated to a CUDA device using torch.cuda.set_device(), before calling init_process_group. Otherwise you may encounter the fatal runtime error: Rust cannot catch foreign exceptions error.

bagua.torch_api.recv(tensor, src, comm=None)

Receives a tensor synchronously.

Parameters:
  • tensor (torch.Tensor) – Tensor to fill with received data.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.reduce(send_tensor, recv_tensor, dst, op=ReduceOp.SUM, comm=None)

Reduces the tensor data across all processes.

Only the process whit rank dst is going to receive the final result.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective.

  • recv_tensor (torch.Tensor) – Output of the collective, must have the same size with send_tensor.

  • dst (int) – Destination rank.

  • op (ReduceOp) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.reduce_inplace(tensor, dst, op=ReduceOp.SUM, comm=None)

The in-place version of reduce.

Parameters:
  • tensor (torch.Tensor) –

  • dst (int) –

  • op (ReduceOp) –

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –

bagua.torch_api.reduce_scatter(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)

Reduces, then scatters send_tensor to all processes associated with the communicator.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective, must have a size of comm.nranks * recv_tensor.size() elements.

  • recv_tensor (torch.Tensor) – Output of the collective.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.reduce_scatter_inplace(tensor, op=ReduceOp.SUM, comm=None)

The in-place version of reduce_scatter.

Parameters:
  • tensor (torch.Tensor) – Input and output of the collective, the size must be divisible by comm.nranks.

  • op (ReduceOp, optional) – One of the values from ReduceOp enum. Specifies an operation used for element-wise reductions.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.scatter(send_tensor, recv_tensor, src, comm=None)

Scatters send tensor to all processes associated with the communicator.

Parameters:
  • send_tensor (torch.Tensor) – Input of the collective, must have a size of comm.nranks * recv_tensor.size() elements.

  • recv_tensor (torch.Tensor) – Output of the collective.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.scatter_inplace(tensor, count, src, comm=None)

The in-place version of scatter.

Parameters:
  • tensor (torch.Tensor) – Input and output of the collective, On the src rank, it must have a size of comm.nranks * count elements. On non-src ranks, its size must be equal to count.

  • count (int) – The per-rank data count to scatter.

  • src (int) – Source rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.send(tensor, dst, comm=None)

Sends a tensor to dst synchronously.

Parameters:
  • tensor (torch.Tensor) – Tensor to send.

  • dst (int) – Destination rank.

  • comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.

bagua.torch_api.version