Bagua¶
This website contains Bagua API documentation. See tutorials if you need step by step instructions on how to use Bagua.
bagua¶
Bagua is a communication library developed by Kuaishou Technology and DS3 Lab for deep learning.
See tutorials for Bagua’s rationale and benchmark.
Subpackages¶
bagua.torch_api¶
The Bagua communication library PyTorch interface.
Subpackages¶
bagua.torch_api.algorithms¶
- class bagua.torch_api.algorithms.async_model_average.AsyncModelAverageAlgorithm(peer_selection_mode='all', sync_interval_ms=500, warmup_steps=0)¶
Bases:
bagua.torch_api.algorithms.Algorithm
Create an instance of the AsyncModelAverage algorithm.
The asynchronous implementation is experimental, and imposes some restrictions. With such asynchronous algorithm, the number of iterations on each worker are different. Therefore the current implementation assumes that the dataset is an endless stream, and all workers continuously synchronize between each other.
Users should call
abort
to manually stop the algorithm’s continuous synchronization process. For example, for a model wrapped with .with_bagua(…), you can abort with model.bagua_algorithm.abort(model), and resume with model.bagua_algorithm.resume(model).- Parameters
peer_selection_mode (str) – The way how workers communicate with each other. Currently
"all"
is supported."all"
means all workers’ weights are synchronized during each communication.sync_interval_ms (int) – Number of milliseconds between model synchronizations.
warmup_steps (int) – Number of steps to warm up by doing gradient allreduce before doing asynchronous model averaging. Use 0 to disable.
- class bagua.torch_api.algorithms.async_model_average.AsyncModelAverageAlgorithmImpl(process_group, peer_selection_mode='all', sync_interval_ms=500, warmup_steps=0)¶
Bases:
bagua.torch_api.algorithms.AlgorithmImpl
Implementation of the AsyncModelAverage algorithm.
The asynchronous implementation is experimental, and imposes some restrictions. With such asynchronous algorithm, the number of iterations on each worker are different. Therefore the current implementation assumes that the dataset is an endless stream, and all workers continuously synchronize between each other.
Users should call
abort
to manually stop the algorithm’s continuous synchronization process. For example, for a model wrapped with .with_bagua(…), you can abort with model.bagua_algorithm.abort(model), and resume with model.bagua_algorithm.resume(model).- Parameters
process_group (BaguaProcessGroup) – The process group to work on.
peer_selection_mode (str) – The way how workers communicate with each other. Currently
"all"
is supported."all"
means all workers’ weights are synchronized during each communication.sync_interval_ms (int) – Number of milliseconds between model synchronizations.
warmup_steps (int) – Number of steps to warm up by doing gradient allreduce before doing asynchronous model averaging. Use 0 to disable.
- abort(self, bagua_module)¶
Stop background asynchronous communications. Should be called after training.
- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.
- resume(self, bagua_module)¶
Resume aborted background asynchronous communications (see
abort
). Should be called before training.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.
- class bagua.torch_api.algorithms.base.Algorithm¶
This is the base class that all Bagua algorithms inherit.
- reify(self, process_group)¶
Create an algorithm instance.
- Parameters
process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group to work on.
- class bagua.torch_api.algorithms.base.AlgorithmImpl(process_group)¶
This is the base class that all Bagua algorithm implementations inherit.
It provides methods that can be override to implement different kinds of distributed algorithms.
- Parameters
process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group to work on.
- init_backward_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed on every parameter’s gradient computation completion.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes the name of a parameter (as in
torch.nn.Module.named_parameters
) and the parameter itself.
- init_forward_pre_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed before the forward process.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes the model’s input.
- init_operations(self, bagua_module, bucket)¶
Given a
BaguaModule
, and aBaguaBucket
, register operations to be executed on the bucket.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.bucket (bagua.torch_api.bucket.BaguaBucket) – A single bucket to register operations.
- init_post_backward_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed when the backward pass is done.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes no argument.
- init_post_optimizer_step_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed when theoptimizer.step()
is done.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that gets called after an optimizer’s
step()
method is called. The function takes the optimizer as its argument.
- init_tensors(self, bagua_module)¶
Given a
BaguaModule
, return Bagua tensors to be used in Bagua for later operations.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A list of Bagua tensors for communication.
- Return type
- need_reset(self)¶
- Returns
True
if all initialization methods of the current algorithms should be called again. This is useful for algorithms that have multiple stages where each stage needs different initializations.- Return type
bool
- tensors_to_buckets(self, tensors, do_flatten)¶
Given the bucketing suggestion from Bagua, return the actual Bagua buckets. The default implementation follows the suggestion to do the bucketing.
- Parameters
tensors (List[List[bagua.torch_api.tensor.BaguaTensor]]) – Bagua tensors grouped in different lists, representing Bagua’s suggestion on how to bucketing the tensors.
do_flatten (bool) – Whether to flatten the Bagua buckets.
- Returns
A list of Bagua buckets.
- Return type
- class bagua.torch_api.algorithms.bytegrad.ByteGradAlgorithm(hierarchical=True, average=True)¶
Bases:
bagua.torch_api.algorithms.Algorithm
This is the base class that all Bagua algorithms inherit.
Create an instance of the ByteGrad algorithm.
- Parameters
hierarchical (bool) – Enable hierarchical communication.
average (bool) – If
True
, the gradients on each worker are averaged. Otherwise, they are summed.
- class bagua.torch_api.algorithms.bytegrad.ByteGradAlgorithmImpl(process_group, hierarchical=True, average=True)¶
Bases:
bagua.torch_api.algorithms.AlgorithmImpl
This is the base class that all Bagua algorithm implementations inherit.
It provides methods that can be override to implement different kinds of distributed algorithms.
- Parameters
process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group to work on.
hierarchical (bool) –
average (bool) –
Implementation of the ByteGrad algorithm.
- Parameters
process_group (BaguaProcessGroup) – The process group to work on.
hierarchical (bool) – Enable hierarchical communication.
average (bool) – If
True
, the gradients on each worker are averaged. Otherwise, they are summed.
- class bagua.torch_api.algorithms.decentralized.DecentralizedAlgorithm(hierarchical=True, peer_selection_mode='all', communication_interval=1)¶
Bases:
bagua.torch_api.algorithms.Algorithm
Create an instance of the Decentralized SGD algorithm.
- Parameters
hierarchical (bool) – Enable hierarchical communication.
peer_selection_mode (str) – Can be
"all"
or"shift_one"
."all"
means all workers’ weights are averaged in each communication step."shift_one"
means each worker selects a different peer to do weights average in each communication step.communication_interval (int) – Number of iterations between two communication steps.
- class bagua.torch_api.algorithms.decentralized.DecentralizedAlgorithmImpl(process_group, hierarchical=True, peer_selection_mode='all', communication_interval=1)¶
Bases:
bagua.torch_api.algorithms.AlgorithmImpl
Implementation of the Decentralized SGD algorithm.
- Parameters
process_group (BaguaProcessGroup) – The process group to work on.
hierarchical (bool) – Enable hierarchical communication.
peer_selection_mode (str) – Can be
"all"
or"shift_one"
."all"
means all workers’ weights are averaged in each communication step."shift_one"
means each worker selects a different peer to do weights average in each communication step.communication_interval (int) – Number of iterations between two communication steps.
- class bagua.torch_api.algorithms.decentralized.LowPrecisionDecentralizedAlgorithm(hierarchical=True, communication_interval=1)¶
Bases:
bagua.torch_api.algorithms.Algorithm
Create an instance of the Low Precision Decentralized SGD algorithm.
- Parameters
hierarchical (bool) – Enable hierarchical communication.
communication_interval (int) – Number of iterations between two communication steps.
- class bagua.torch_api.algorithms.decentralized.LowPrecisionDecentralizedAlgorithmImpl(process_group, hierarchical=True, communication_interval=1)¶
Bases:
bagua.torch_api.algorithms.AlgorithmImpl
Implementation of the Low Precision Decentralized SGD algorithm.
- Parameters
process_group (BaguaProcessGroup) – The process group to work on.
hierarchical (bool) – Enable hierarchical communication.
communication_interval (int) – Number of iterations between two communication steps.
- class bagua.torch_api.algorithms.gradient_allreduce.GradientAllReduceAlgorithm(hierarchical=False, average=True)¶
Bases:
bagua.torch_api.algorithms.Algorithm
Create an instance of the GradientAllReduce algorithm.
- Parameters
hierarchical (bool) – Enable hierarchical communication.
average (bool) – If
True
, the gradients on each worker are averaged. Otherwise, they are summed.
- class bagua.torch_api.algorithms.gradient_allreduce.GradientAllReduceAlgorithmImpl(process_group, hierarchical=False, average=True)¶
Bases:
bagua.torch_api.algorithms.AlgorithmImpl
Implementation of the GradientAllReduce algorithm.
- Parameters
process_group (BaguaProcessGroup) – The process group to work on.
hierarchical (bool) – Enable hierarchical communication.
average (bool) – If
True
, the gradients on each worker are averaged. Otherwise, they are summed.
- class bagua.torch_api.algorithms.q_adam.QAdamAlgorithm(q_adam_optimizer, hierarchical=True)¶
Bases:
bagua.torch_api.algorithms.Algorithm
Create an instance of the QAdam Algorithm .
- Parameters
q_adam_optimizer (QAdamOptimizer) – A QAdamOptimizer initialized with model parameters.
hierarchical (bool) – Enable hierarchical communication.
- class bagua.torch_api.algorithms.q_adam.QAdamAlgorithmImpl(process_group, q_adam_optimizer, hierarchical=True)¶
Bases:
bagua.torch_api.algorithms.AlgorithmImpl
Implementation of the QAdam Algorithm .
- Parameters
process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group to work on.
q_adam_optimizer (QAdamOptimizer) – A QAdamOptimizer initialized with model parameters.
hierarchical (bool) – Enable hierarchical communication.
- class bagua.torch_api.algorithms.q_adam.QAdamOptimizer(params, lr=0.001, warmup_steps=100, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.0)¶
Bases:
torch.optim.optimizer.Optimizer
Create a dedicated optimizer used for QAdam algorithm.
- Parameters
params (iterable) – Iterable of parameters to optimize or dicts defining parameter groups.
lr (float) – Learning rate.
warmup_steps (int) – Number of steps to warm up by doing gradient allreduce before doing asynchronous model averaging. Use 0 to disable.
betas (Tuple[float, float]) – Coefficients used for computing running averages of gradient and its square.
eps (float) – Term added to the denominator to improve numerical stability.
weight_decay (float) – Weight decay (L2 penalty).
- step(self, closure=None)¶
- class bagua.torch_api.algorithms.Algorithm¶
This is the base class that all Bagua algorithms inherit.
- reify(self, process_group)¶
Create an algorithm instance.
- Parameters
process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group to work on.
- class bagua.torch_api.algorithms.AlgorithmImpl(process_group)¶
This is the base class that all Bagua algorithm implementations inherit.
It provides methods that can be override to implement different kinds of distributed algorithms.
- Parameters
process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group to work on.
- init_backward_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed on every parameter’s gradient computation completion.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes the name of a parameter (as in
torch.nn.Module.named_parameters
) and the parameter itself.
- init_forward_pre_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed before the forward process.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes the model’s input.
- init_operations(self, bagua_module, bucket)¶
Given a
BaguaModule
, and aBaguaBucket
, register operations to be executed on the bucket.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.bucket (bagua.torch_api.bucket.BaguaBucket) – A single bucket to register operations.
- init_post_backward_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed when the backward pass is done.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes no argument.
- init_post_optimizer_step_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed when theoptimizer.step()
is done.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that gets called after an optimizer’s
step()
method is called. The function takes the optimizer as its argument.
- init_tensors(self, bagua_module)¶
Given a
BaguaModule
, return Bagua tensors to be used in Bagua for later operations.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A list of Bagua tensors for communication.
- Return type
- need_reset(self)¶
- Returns
True
if all initialization methods of the current algorithms should be called again. This is useful for algorithms that have multiple stages where each stage needs different initializations.- Return type
bool
- tensors_to_buckets(self, tensors, do_flatten)¶
Given the bucketing suggestion from Bagua, return the actual Bagua buckets. The default implementation follows the suggestion to do the bucketing.
- Parameters
tensors (List[List[bagua.torch_api.tensor.BaguaTensor]]) – Bagua tensors grouped in different lists, representing Bagua’s suggestion on how to bucketing the tensors.
do_flatten (bool) – Whether to flatten the Bagua buckets.
- Returns
A list of Bagua buckets.
- Return type
bagua.torch_api.checkpoint¶
- bagua.torch_api.checkpoint.checkpointing.load_checkpoint(checkpoints_path, model, optimizer=None, lr_scheduler=None, strict=True)¶
Load a model checkpoint and return the iteration.
- Parameters
checkpoints_path (str) – Path of checkpoints.
model (BaguaModule) – The model to load on.
optimizer (torch.optim.Optimizer, optional) – The optimizer to load on. Default:
None
.lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to load on. Default:
None
.strict (bool, optional) – whether to strictly enforce that the keys in
state_dict
of the checkpoint match the keys returned by this module’s state_dict() function. Default:True
.
- bagua.torch_api.checkpoint.checkpointing.save_checkpoint(iteration, checkpoints_path, model, optimizer=None, lr_scheduler=None)¶
Save model checkpoint.
- Parameters
iteration (int) – Training Iteration.
checkpoints_path (str) – Path of checkpoints.
model (BaguaModule) – The model to save.
optimizer (torch.optim.Optimizer, optional) – The optimizer to save. Default:
None
.lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to save. Default:
None
.
- bagua.torch_api.checkpoint.load_checkpoint(checkpoints_path, model, optimizer=None, lr_scheduler=None, strict=True)¶
Load a model checkpoint and return the iteration.
- Parameters
checkpoints_path (str) – Path of checkpoints.
model (BaguaModule) – The model to load on.
optimizer (torch.optim.Optimizer, optional) – The optimizer to load on. Default:
None
.lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to load on. Default:
None
.strict (bool, optional) – whether to strictly enforce that the keys in
state_dict
of the checkpoint match the keys returned by this module’s state_dict() function. Default:True
.
- bagua.torch_api.checkpoint.save_checkpoint(iteration, checkpoints_path, model, optimizer=None, lr_scheduler=None)¶
Save model checkpoint.
- Parameters
iteration (int) – Training Iteration.
checkpoints_path (str) – Path of checkpoints.
model (BaguaModule) – The model to save.
optimizer (torch.optim.Optimizer, optional) – The optimizer to save. Default:
None
.lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional) – The LR scheduler to save. Default:
None
.
bagua.torch_api.contrib¶
- bagua.torch_api.contrib.fuse.optimizer.fuse_optimizer(optimizer, do_flatten=True, check_flatten=True)¶
Convert any optimizer into a fused optimizer.
A fused optimizer can fuse multiple parameter updates into one or a few updates. To achieve this, users need to:
1) flatten multiple parameters in the same group into fused parameter by settingdo_flatten=True
, which is also the default behavior of a fused optimizer;2) perform a fused parameter update by callingfuse_step
.This fused optimizer is implemented for general use. It can be used used in conjunction with a
BaguaModule
as well as a torch.nn.parallel.DistributedDataParallel wrapped module, or some other cases (not listed here).- Parameters
optimizer (torch.optim.Optimizer) – Any PyTorch optimizer.
do_flatten (bool) – Whether to flatten the parameters. The flatten operation will reset data pointers of parameter tensors so that they can be fused together. Default:
True
.check_flatten (bool) – When setting to
True
, it enables fused optimizer to automatically check if parameter tensors are contiguous as they are flattened to. Can only work withdo_flatten=True
. Default:True
.
- Returns
A Fused optimizer.
- Example::
>>> optimizer = torch.optim.Adadelta(model.parameters(), ....) >>> optimizer = bagua.torch_api.contrib.fuse_optimizer(optimizer, do_flatten=True) >>> >>> optimizer.fuse_step()
When use in conjunction with a
BaguaModule
, setdo_flatten=False
inwith_bagua
explicitly:>>> optimizer = bagua.torch_api.contrib.fuse_optimizer(optimizer, do_flatten=True) >>> model = model.with_bagua([optimizer], GradientAllReduceAlgorithm(), do_flatten=False) >>> >>> optimizer.fuse_step()
Note
This function and
with_bagua
method both will reset data pointers of module parameters by default. In order to perform a more effective fused parameter update, users need to disable bucket flattening inwith_bagua
by setting itsdo_flatten
toFalse
.Note
A fuse optimizer does not change the original behaviors of
optimizer
, but enabling it to perform a fused parameter update throughfuse_step
. Users can still perform a normal parameter update throughstep
.
- bagua.torch_api.contrib.fuse.optimizer.fuse_step(optimizer, closure=None)¶
Perform a fused parameter update.
This operation will fuse multiple contiguous parameters into a fused parameter, by creating a tensor view sharing the same underlying storage with them, and then perform parameter update on fused parameters. If none of the parameter tensors are contiguous, this operation is equivalent to
step
.- Parameters
optimizer (torch.optim.Optimizer) – A fused optimizer.
closure (Callable) – A closure that reevaluates the model and returns the loss. Optional for most optimizers.
Note
This function will not modify the storage of parameter tensors.
- bagua.torch_api.contrib.fuse.optimizer.is_fused_optimizer(optimizer)¶
Checking if
optimizer
is a fused optimizer or not.- Parameters
optimizer (torch.optim.Optimizer) –
- class bagua.torch_api.contrib.utils.redis_store.RedisStore(hosts=None, cluster_mode=True, capacity_per_node=107374182400)¶
Bases:
bagua.torch_api.contrib.utils.store.ClusterStore
A Redis-based distributed key-value store implementation, with
set
andget
API exposed.- Parameters
hosts (List[Dict[str, str]]) – A list of redis servers, defined by a list of dict containing Redis host and port information like
[{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}]
. A new Redis instance will be spawned on each node ifhosts=None
.cluster_mode (bool) – If
True
, data is sharded across all Redis instances. Otherwise, if there are \(m\) Redis instances, the workers on the \(n\)-th node will use the \(n % m\)-th Redis instance.capacity_per_node (int) – Maximum memory limit in bytes when spawning new Redis instances. Old values will be evicted when the limit is reached. Default is
100GB
.
Note
All Bagua jobs within the same node will share the same local Redis instance if
hosts=None
. Thecapacity_per_node
only affects newly spawned Redis instances, and has no effect on existing ones.
- class bagua.torch_api.contrib.utils.store.ClusterStore(stores)¶
Bases:
Store
Base class for distributed key-value stores.
In cluster store, entries will be sharded equally among multiple store instances based on their keys.
- Parameters
stores (List[Store]) – A list of stores to shard entries on.
- class bagua.torch_api.contrib.utils.store.Store¶
Base class for key-value store implementations. Entries are added to store with
set
ormset
, and retrieved withget
ormget
.- clear(self)¶
Delete all keys in the current store.
- get(self, key)¶
Returns the value associated with
key
, orNone
if the key doesn’t exist.- Parameters
key (str) –
- Return type
Optional[Union[str, bytes]]
- mget(self, keys)¶
Retrieve each key’s corresponding value and return them in a list with the same order as
keys
.- Parameters
keys (List[str]) –
- Return type
List[Optional[Union[str, bytes]]]
- mset(self, dictionary)¶
Set multiple entries at once with a dictionary. Each key-value pair in the
dictionary
will be set.- Parameters
dictionary (Dict[str, Union[str, bytes]]) –
- num_keys(self)¶
Returns the number of keys in the current store.
- Return type
int
- set(self, key, value)¶
Set a key-value pair.
- Parameters
key (str) –
value (Union[str, bytes]) –
- shutdown(self)¶
Shutdown the managed store instances. Unmanaged instances will not be killed.
- status(self)¶
Returns
True
if the current store is alive.- Return type
bool
- class bagua.torch_api.contrib.cache_loader.CacheLoader(backend='redis', dataset_name='', writer_buffer_size=1, **kwargs)¶
Cache loader caches values calculated by an expensive function by theirs keys via
get
, so that the values can be retrieved faster next time.Internally, values are indexed by
"{dataset_name}_{key}"
and saved in a distributed key-value store, wheredataset_name
is specified on initializing, andkey
is the argument inget
.By default, cache loader uses
RedisStore
as its backend distributed key-value store implementation. It supports using a list of existing redis servers or spawning new redis servers. Parameters forRedisStore
can be provided here in**kwargs
.- Parameters
backend (str) – Backend distributed key-value store implementation. Can be
"redis"
.dataset_name (str) – Name of the dataset. Default
""
.writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.
- Example::
To use a list of existing redis servers for the “redis” backend:
>>> from bagua.torch_api.contrib import CacheLoader >>> >>> hosts = [{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}] >>> loader = CacheLoader(backend="redis", hosts=hosts, cluster_mode=True, dataset_name="test") >>> >>> loader.get(index, lambda x: items[x])
To spawn new redis servers for the “redis” backend:
>>> loader = CacheLoader(backend="redis", hosts=None, cluster_mode=True, capacity_per_node=100000000)
Note
Cache loaders with the same
dataset_name
will reuse and overwrite each other’s cache. Use a differentdataset_name
if this is not desired.- get(self, key, load_fn)¶
Returns the value associated with
key
in cache, useload_fn
to create the entry if the key does not exist in the cache.load_fn
is a function takingkey
as its argument, and returning corresponding value to be cached.- Parameters
key (str) –
load_fn (Callable[[str], None]) –
- num_keys(self)¶
Returns the number of keys in the cache.
- class bagua.torch_api.contrib.cached_dataset.CachedDataset(dataset, backend='redis', dataset_name='', writer_buffer_size=20, **kwargs)¶
Bases:
torch.utils.data.dataset.Dataset
Cached dataset wraps a PyTorch dataset to cache its samples in memory, so that accessing these samples after the first time can be much faster. This is useful when samples need tedious preprocessing to produce, or reading the dataset itself is slow, which could slow down the whole training process.
Internally, the samples are indexed by a string key
"{dataset_name}_{index}"
and saved in a distributed key-value store, wheredataset_name
is specified when initializing the cached dataset, andindex
is the index of a specific sample (the argument of__getitem__
method in a PyTorch dataset).- Parameters
dataset (torch.utils.data.dataset.Dataset) – PyTorch dataset to be wrapped.
backend (str) – Backend distributed key-value store implementation. Can be
"redis"
.dataset_name (str) – Name of the dataset. Default
""
.writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.
Example:
>>> from bagua.torch_api.contrib import CachedDataset >>> cache_dataset = CachedDataset(dataset, backend="redis", dataset_name="ds") >>> dataloader = torch.utils.data.DataLoader(cached_dataset)
Note
Cached dataset is a special case of cache loader. Parameter
backend
andwriter_buffer_size
in initializing a cached dataset have the same meanings as those in initializing a cache loader. You can provide the arguments for cache loader here in**kwargs
. See alsoCacheLoader
.- cache_loader¶
The backend cache loader instance.
- class bagua.torch_api.contrib.load_balancing_data_loader.LoadBalancingDistributedBatchSampler(sampler, batch_fn, drop_last=False)¶
Bases:
torch.utils.data.sampler.Sampler
Wraps another load balance sampler to yield variable sized mini-batches.
- Parameters
sampler (LoadBalancingDistributedSampler) – Load balance sampler.
batch_fn (Callable) – Callable to yield mini-batch indices.
drop_last (bool) – If
True
, the sampler will drop the last few batches exceeding the least number of batches among replicas, otherwise, the number of batches on each replica will be padded to the same.
batch_fn
will have the signature of:def batch_fn(indices: List[int]) -> List[List[int]]
- Example::
>>> from bagua.torch_api.contrib import LoadBalancingDistributedSampler, \ ... LoadBalancingDistributedBatchSampler >>> >>> sampler = LoadBalancingDistributedSampler(dataset, complexity_fn=complexity_fn) >>> batch_sampler = LoadBalancingDistributedBatchSampler(sampler, batch_fn=batch_fn) >>> loader = torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler) >>> >>> for epoch in range(start_epoch, n_epochs): ... batch_sampler.set_epoch(epoch) ... train(loader)
- set_epoch(self, epoch)¶
Sets the epoch for this sampler. When
shuffle=True
, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.- Parameters
epoch (int) – Epoch number.
- Return type
None
- class bagua.torch_api.contrib.load_balancing_data_loader.LoadBalancingDistributedSampler(dataset, complexity_fn, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False, random_level=0)¶
Bases:
torch.utils.data.sampler.Sampler
Sampler that restricts data loading to a subset of the dataset.
This sampler use a
complexity_fn
to calculate each sample’s computational complexity and make each batch get similar computation complexity.This is useful in scenarios like speech and NLP, where each batch has variable length and distributed training suffers from straggler problem.
The usage is similar to torch.utils.data.DistributedSampler, where each process loads a subset of the original dataset that is exclusive to it.
Note
Dataset is assumed to be of constant size.
- Parameters
dataset (torch.utils.data.dataset.Dataset) – Dataset used for sampling.
complexity_fn (Callable) – A function whose input is a sample and output is an integer as a measure of the computational complexity of the sample.
num_replicas (int, optional) – Number of processes participating in distributed training. By default,
world_size
is retrieved from the current distributed group.rank (int, optional) – Rank of the current process within
num_replicas
. By default,rank
is retrieved from the current distributed group.shuffle (bool, optional) – If
True
(default), sampler will shuffle the indices.seed (int, optional) – random seed used to shuffle the sampler if
shuffle=True
. This number should be identical across all processes in the distributed group. Default: 0.drop_last (bool, optional) – if
True
, then the sampler will drop the tail of the data to make it evenly divisible across the number of replicas. IfFalse
, the sampler will add extra indices to make the data evenly divisible across the replicas. Default:False
.random_level (float, optional) – A float varies from 0 and 1 that controls the extent of load balance. 0 means the best load balance, while 1 means the opposite.
Warning
In distributed mode, calling the
set_epoch
method at the beginning of each epoch before creating the DataLoader iterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be always used.- Example::
Define your
complexity_fn
, which accepts a dataset sample as its input and produces an integer as the sample’s computational complexity:>>> dataset = torch.utils.data.TensorDataset(torch.randn(n, 2), torch.randperm(n)) >>> complexity_fn = lambda x: x[1]
Below is the usage of
LoadBalancingDistributedSampler
and DataLoader:>>> sampler = bagua.torch_api.contrib.LoadBalancingDistributedSampler( ... dataset, ... complexity_fn=complexity_fn) if is_distributed else None >>> loader = torch.utils.data.DataLoader(dataset, ... shuffle=(sampler is None), ... sampler=sampler) >>> >>> for epoch in range(start_epoch, n_epochs): ... if is_distributed: ... sampler.set_epoch(epoch) ... train(loader)
- set_epoch(self, epoch)¶
Sets the epoch for this sampler. When
shuffle=True
, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.- Parameters
epoch (int) – Epoch number.
- Return type
None
- bagua.torch_api.contrib.sync_batchnorm.unused¶
- class bagua.torch_api.contrib.sync_batchnorm.SyncBatchNorm(num_features, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)¶
Bases:
torch.nn.modules.batchnorm._BatchNorm
Applies synchronous BatchNorm for distributed module with N-dimensional BatchNorm layer(s). See BatchNorm for more details.
- Parameters
num_features – Number of channels \(C\) from the shape \((N, C, ...)\).
eps – A value added to the denominator for numerical stability. Default: 1e-5.
momentum – The value used for the running_mean and running_var computation. Can be set to
None
for cumulative moving average (i.e. simple average). Default: 0.1.affine – A boolean value that when set to
True
, this module has learnable affine parameters. Default:True
.track_running_stats – A boolean value that when set to
True
, this module tracks the running mean and variance, and when set toFalse
, this module does not track such statistics and always uses batch statistics in both training and eval modes. Default:True
.
Note
Only GPU input tensors are supported in the training mode.
- classmethod convert_sync_batchnorm(cls, module)¶
Helper function to convert all
BatchNorm*D
layers in the model to torch.nn.SyncBatchNorm layers.- Parameters
module (nn.Module) – Module containing one or more
BatchNorm*D
layers- Returns
The original
module
with the convertedtorch.nn.SyncBatchNorm
layers. If the originalmodule
is aBatchNorm*D
layer, a newtorch.nn.SyncBatchNorm
layer object will be returned instead.
Note
This function must be called before
with_bagua
method.- Example::
>>> # Network with nn.BatchNorm layer >>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> sync_bn_model = bagua.torch_api.contrib.sync_batchnorm.SyncBatchNorm.convert_sync_batchnorm(model) >>> bagua_model = sync_bn_model.with_bagua([optimizer], GradientAllReduce())
- forward(self, input)¶
- class bagua.torch_api.contrib.CacheLoader(backend='redis', dataset_name='', writer_buffer_size=1, **kwargs)¶
Cache loader caches values calculated by an expensive function by theirs keys via
get
, so that the values can be retrieved faster next time.Internally, values are indexed by
"{dataset_name}_{key}"
and saved in a distributed key-value store, wheredataset_name
is specified on initializing, andkey
is the argument inget
.By default, cache loader uses
RedisStore
as its backend distributed key-value store implementation. It supports using a list of existing redis servers or spawning new redis servers. Parameters forRedisStore
can be provided here in**kwargs
.- Parameters
backend (str) – Backend distributed key-value store implementation. Can be
"redis"
.dataset_name (str) – Name of the dataset. Default
""
.writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.
- Example::
To use a list of existing redis servers for the “redis” backend:
>>> from bagua.torch_api.contrib import CacheLoader >>> >>> hosts = [{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}] >>> loader = CacheLoader(backend="redis", hosts=hosts, cluster_mode=True, dataset_name="test") >>> >>> loader.get(index, lambda x: items[x])
To spawn new redis servers for the “redis” backend:
>>> loader = CacheLoader(backend="redis", hosts=None, cluster_mode=True, capacity_per_node=100000000)
Note
Cache loaders with the same
dataset_name
will reuse and overwrite each other’s cache. Use a differentdataset_name
if this is not desired.- get(self, key, load_fn)¶
Returns the value associated with
key
in cache, useload_fn
to create the entry if the key does not exist in the cache.load_fn
is a function takingkey
as its argument, and returning corresponding value to be cached.- Parameters
key (str) –
load_fn (Callable[[str], None]) –
- num_keys(self)¶
Returns the number of keys in the cache.
- class bagua.torch_api.contrib.CachedDataset(dataset, backend='redis', dataset_name='', writer_buffer_size=20, **kwargs)¶
Bases:
torch.utils.data.dataset.Dataset
Cached dataset wraps a PyTorch dataset to cache its samples in memory, so that accessing these samples after the first time can be much faster. This is useful when samples need tedious preprocessing to produce, or reading the dataset itself is slow, which could slow down the whole training process.
Internally, the samples are indexed by a string key
"{dataset_name}_{index}"
and saved in a distributed key-value store, wheredataset_name
is specified when initializing the cached dataset, andindex
is the index of a specific sample (the argument of__getitem__
method in a PyTorch dataset).- Parameters
dataset (torch.utils.data.dataset.Dataset) – PyTorch dataset to be wrapped.
backend (str) – Backend distributed key-value store implementation. Can be
"redis"
.dataset_name (str) – Name of the dataset. Default
""
.writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.
Example:
>>> from bagua.torch_api.contrib import CachedDataset >>> cache_dataset = CachedDataset(dataset, backend="redis", dataset_name="ds") >>> dataloader = torch.utils.data.DataLoader(cached_dataset)
Note
Cached dataset is a special case of cache loader. Parameter
backend
andwriter_buffer_size
in initializing a cached dataset have the same meanings as those in initializing a cache loader. You can provide the arguments for cache loader here in**kwargs
. See alsoCacheLoader
.- cache_loader¶
The backend cache loader instance.
- class bagua.torch_api.contrib.LoadBalancingDistributedBatchSampler(sampler, batch_fn, drop_last=False)¶
Bases:
torch.utils.data.sampler.Sampler
Wraps another load balance sampler to yield variable sized mini-batches.
- Parameters
sampler (LoadBalancingDistributedSampler) – Load balance sampler.
batch_fn (Callable) – Callable to yield mini-batch indices.
drop_last (bool) – If
True
, the sampler will drop the last few batches exceeding the least number of batches among replicas, otherwise, the number of batches on each replica will be padded to the same.
batch_fn
will have the signature of:def batch_fn(indices: List[int]) -> List[List[int]]
- Example::
>>> from bagua.torch_api.contrib import LoadBalancingDistributedSampler, \ ... LoadBalancingDistributedBatchSampler >>> >>> sampler = LoadBalancingDistributedSampler(dataset, complexity_fn=complexity_fn) >>> batch_sampler = LoadBalancingDistributedBatchSampler(sampler, batch_fn=batch_fn) >>> loader = torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler) >>> >>> for epoch in range(start_epoch, n_epochs): ... batch_sampler.set_epoch(epoch) ... train(loader)
- set_epoch(self, epoch)¶
Sets the epoch for this sampler. When
shuffle=True
, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.- Parameters
epoch (int) – Epoch number.
- Return type
None
- class bagua.torch_api.contrib.LoadBalancingDistributedSampler(dataset, complexity_fn, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False, random_level=0)¶
Bases:
torch.utils.data.sampler.Sampler
Sampler that restricts data loading to a subset of the dataset.
This sampler use a
complexity_fn
to calculate each sample’s computational complexity and make each batch get similar computation complexity.This is useful in scenarios like speech and NLP, where each batch has variable length and distributed training suffers from straggler problem.
The usage is similar to torch.utils.data.DistributedSampler, where each process loads a subset of the original dataset that is exclusive to it.
Note
Dataset is assumed to be of constant size.
- Parameters
dataset (torch.utils.data.dataset.Dataset) – Dataset used for sampling.
complexity_fn (Callable) – A function whose input is a sample and output is an integer as a measure of the computational complexity of the sample.
num_replicas (int, optional) – Number of processes participating in distributed training. By default,
world_size
is retrieved from the current distributed group.rank (int, optional) – Rank of the current process within
num_replicas
. By default,rank
is retrieved from the current distributed group.shuffle (bool, optional) – If
True
(default), sampler will shuffle the indices.seed (int, optional) – random seed used to shuffle the sampler if
shuffle=True
. This number should be identical across all processes in the distributed group. Default: 0.drop_last (bool, optional) – if
True
, then the sampler will drop the tail of the data to make it evenly divisible across the number of replicas. IfFalse
, the sampler will add extra indices to make the data evenly divisible across the replicas. Default:False
.random_level (float, optional) – A float varies from 0 and 1 that controls the extent of load balance. 0 means the best load balance, while 1 means the opposite.
Warning
In distributed mode, calling the
set_epoch
method at the beginning of each epoch before creating the DataLoader iterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be always used.- Example::
Define your
complexity_fn
, which accepts a dataset sample as its input and produces an integer as the sample’s computational complexity:>>> dataset = torch.utils.data.TensorDataset(torch.randn(n, 2), torch.randperm(n)) >>> complexity_fn = lambda x: x[1]
Below is the usage of
LoadBalancingDistributedSampler
and DataLoader:>>> sampler = bagua.torch_api.contrib.LoadBalancingDistributedSampler( ... dataset, ... complexity_fn=complexity_fn) if is_distributed else None >>> loader = torch.utils.data.DataLoader(dataset, ... shuffle=(sampler is None), ... sampler=sampler) >>> >>> for epoch in range(start_epoch, n_epochs): ... if is_distributed: ... sampler.set_epoch(epoch) ... train(loader)
- set_epoch(self, epoch)¶
Sets the epoch for this sampler. When
shuffle=True
, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.- Parameters
epoch (int) – Epoch number.
- Return type
None
- bagua.torch_api.contrib.fuse_optimizer(optimizer, do_flatten=True, check_flatten=True)¶
Convert any optimizer into a fused optimizer.
A fused optimizer can fuse multiple parameter updates into one or a few updates. To achieve this, users need to:
1) flatten multiple parameters in the same group into fused parameter by settingdo_flatten=True
, which is also the default behavior of a fused optimizer;2) perform a fused parameter update by callingfuse_step
.This fused optimizer is implemented for general use. It can be used used in conjunction with a
BaguaModule
as well as a torch.nn.parallel.DistributedDataParallel wrapped module, or some other cases (not listed here).- Parameters
optimizer (torch.optim.Optimizer) – Any PyTorch optimizer.
do_flatten (bool) – Whether to flatten the parameters. The flatten operation will reset data pointers of parameter tensors so that they can be fused together. Default:
True
.check_flatten (bool) – When setting to
True
, it enables fused optimizer to automatically check if parameter tensors are contiguous as they are flattened to. Can only work withdo_flatten=True
. Default:True
.
- Returns
A Fused optimizer.
- Example::
>>> optimizer = torch.optim.Adadelta(model.parameters(), ....) >>> optimizer = bagua.torch_api.contrib.fuse_optimizer(optimizer, do_flatten=True) >>> >>> optimizer.fuse_step()
When use in conjunction with a
BaguaModule
, setdo_flatten=False
inwith_bagua
explicitly:>>> optimizer = bagua.torch_api.contrib.fuse_optimizer(optimizer, do_flatten=True) >>> model = model.with_bagua([optimizer], GradientAllReduceAlgorithm(), do_flatten=False) >>> >>> optimizer.fuse_step()
Note
This function and
with_bagua
method both will reset data pointers of module parameters by default. In order to perform a more effective fused parameter update, users need to disable bucket flattening inwith_bagua
by setting itsdo_flatten
toFalse
.Note
A fuse optimizer does not change the original behaviors of
optimizer
, but enabling it to perform a fused parameter update throughfuse_step
. Users can still perform a normal parameter update throughstep
.
bagua.torch_api.model_parallel¶
- class bagua.torch_api.model_parallel.moe.layer.MoE(hidden_size, expert, num_local_experts=1, k=1, output_dropout_prob=0.0, capacity_factor=1.0, eval_capacity_factor=1.0, min_capacity=4, noisy_gate_policy=None)¶
Bases:
torch.nn.Module
Initialize an MoE layer.
- Parameters
hidden_size (int) – the hidden dimension of the model, importantly this is also the input and output dimension.
expert (torch.nn.Module) – the torch module that defines the expert (e.g., MLP, torch.linear).
num_local_experts (int, optional) – default=1, number of local experts per gpu.
k (int, optional) – default=1, top-k gating value, only supports k=1 or k=2.
output_dropout_prob (float, optional) – default=0.0, output dropout probability.
capacity_factor (float, optional) – default=1.0, the capacity of the expert at training time.
eval_capacity_factor (float, optional) – default=1.0, the capacity of the expert at eval time.
min_capacity (int, optional) – default=4, the minimum capacity per expert regardless of the capacity_factor.
noisy_gate_policy (str, optional) – default=None, noisy gate policy, valid options are ‘Jitter’, ‘RSample’ or ‘None’.
- forward(self, hidden_states, used_token=None)¶
MoE forward
- Parameters
hidden_states (Tensor) – input to the layer
used_token (Tensor, optional) – default: None, mask only used tokens
- Returns
A tuple including output, gate loss, and expert count.
output (Tensor): output of the model
l_aux (Tensor): gate loss value
exp_counts (int): expert count
- bagua.torch_api.model_parallel.moe.sharded_moe.Base¶
- bagua.torch_api.model_parallel.moe.sharded_moe.exp_selection_uniform_map :Dict[torch.device, Callable]¶
- bagua.torch_api.model_parallel.moe.sharded_moe.gumbel_map :Dict[torch.device, Callable]¶
- bagua.torch_api.model_parallel.moe.sharded_moe.uniform_map :Dict[torch.device, Callable]¶
- class bagua.torch_api.model_parallel.moe.sharded_moe.MOELayer(gate, experts, num_local_experts, group=None)¶
Bases:
Base
MOELayer module which implements MixtureOfExperts as described in Gshard_.
gate = TopKGate(model_dim, num_experts) moe = MOELayer(gate, expert) output = moe(input) l_aux = moe.l_aux
- Parameters
gate (torch.nn.Module) – gate network
expert (torch.nn.Module) – expert network
experts (torch.nn.Module) –
num_local_experts (int) –
group (Optional[Any]) –
- forward(self, *input, **kwargs)¶
- Parameters
input (torch.Tensor) –
kwargs (Any) –
- Return type
torch.Tensor
- class bagua.torch_api.model_parallel.moe.sharded_moe.TopKGate(model_dim, num_experts, k=1, capacity_factor=1.0, eval_capacity_factor=1.0, min_capacity=4, noisy_gate_policy=None)¶
Bases:
torch.nn.Module
Gate module which implements Top2Gating as described in Gshard_.
gate = TopKGate(model_dim, num_experts) l_aux, combine_weights, dispatch_mask = gate(input)
- Parameters
model_dim (int) – size of model embedding dimension
num_experts (ints) – number of experts in model
k (int) –
capacity_factor (float) –
eval_capacity_factor (float) –
min_capacity (int) –
noisy_gate_policy (Optional[str]) –
- wg :torch.nn.Linear¶
- forward(self, input, used_token=None)¶
- Parameters
input (torch.Tensor) –
used_token (torch.Tensor) –
- Return type
Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]
- bagua.torch_api.model_parallel.moe.sharded_moe.gumbel_rsample(shape, device)¶
- Parameters
shape (Tuple) –
device (torch.device) –
- Return type
torch.Tensor
- bagua.torch_api.model_parallel.moe.sharded_moe.multiplicative_jitter(x, device, epsilon=0.01)¶
Modified from swtich transformer paper. mesh transformers Multiply values by a random number between 1-epsilon and 1+epsilon. Makes models more resilient to rounding errors introduced by bfloat16. This seems particularly important for logits. :param x: a torch.tensor :param device: torch.device :param epsilon: a floating point value
- Returns
a jittered x.
- Parameters
device (torch.device) –
- bagua.torch_api.model_parallel.moe.sharded_moe.top1gating(logits, capacity_factor, min_capacity, used_token=None, noisy_gate_policy=None)¶
Implements Top1Gating on logits.
- Parameters
logits (torch.Tensor) –
capacity_factor (float) –
min_capacity (int) –
used_token (torch.Tensor) –
noisy_gate_policy (Optional[str]) –
- Return type
Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]
- bagua.torch_api.model_parallel.moe.sharded_moe.top2gating(logits, capacity_factor)¶
Implements Top2Gating on logits.
- Parameters
logits (torch.Tensor) –
capacity_factor (float) –
- Return type
Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]
- bagua.torch_api.model_parallel.moe.utils.is_moe_param(param)¶
- Parameters
param (torch.Tensor) –
- Return type
bool
- class bagua.torch_api.model_parallel.moe.MoE(hidden_size, expert, num_local_experts=1, k=1, output_dropout_prob=0.0, capacity_factor=1.0, eval_capacity_factor=1.0, min_capacity=4, noisy_gate_policy=None)¶
Bases:
torch.nn.Module
Initialize an MoE layer.
- Parameters
hidden_size (int) – the hidden dimension of the model, importantly this is also the input and output dimension.
expert (torch.nn.Module) – the torch module that defines the expert (e.g., MLP, torch.linear).
num_local_experts (int, optional) – default=1, number of local experts per gpu.
k (int, optional) – default=1, top-k gating value, only supports k=1 or k=2.
output_dropout_prob (float, optional) – default=0.0, output dropout probability.
capacity_factor (float, optional) – default=1.0, the capacity of the expert at training time.
eval_capacity_factor (float, optional) – default=1.0, the capacity of the expert at eval time.
min_capacity (int, optional) – default=4, the minimum capacity per expert regardless of the capacity_factor.
noisy_gate_policy (str, optional) – default=None, noisy gate policy, valid options are ‘Jitter’, ‘RSample’ or ‘None’.
- forward(self, hidden_states, used_token=None)¶
MoE forward
- Parameters
hidden_states (Tensor) – input to the layer
used_token (Tensor, optional) – default: None, mask only used tokens
- Returns
A tuple including output, gate loss, and expert count.
output (Tensor): output of the model
l_aux (Tensor): gate loss value
exp_counts (int): expert count
- bagua.torch_api.model_parallel.moe.is_moe_param(param)¶
- Parameters
param (torch.Tensor) –
- Return type
bool
Submodules¶
bagua.torch_api.bucket¶
- class bagua.torch_api.bucket.BaguaBucket(tensors, name, flatten, alignment=1)¶
Create a Bagua bucket with a list of Bagua tensors.
- Parameters
tensors (List[bagua.torch_api.tensor.BaguaTensor]) – A list of Bagua tensors to be put in the bucket.
name (str) – The unique name of the bucket.
flatten (bool) – If
True
, flatten the input tensors so that they are contiguous in memory.alignment (int) – If
alignment > 1
, Bagua will create a padding tensor to the bucket so that the total number of elements in the bucket divides the given alignment.
- name¶
The bucket’s name.
- tensors¶
The Bagua tensors contained in the bucket.
- append_asynchronous_model_average_op(self, peer_selection_mode, group=None)¶
Append an asynchronous model average operation to a bucket. This operation will enable continuous model averaging between workers while training a model.
The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
This operation is intended to run in parallel with the computation process. It returns a reference to the op. The op features a lock to exclusively access the model. Call
op.lock_weight()
to acquire the lock andop.unlock_weight()
to release it.- Parameters
peer_selection_mode (str) – The way how workers communicate with each otehr. Currently
"all"
is supported."all"
means all workers’ weights are averaged during each communication.group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If
None
, the default process group will be used.
- Returns
The asynchronous model average operation itself.
- append_centralized_synchronous_op(self, hierarchical=False, average=True, scattergather=False, compression=None, group=None)¶
Append a centralized synchronous operation to a bucket. It will sum or average the tensors in the bucket for all workers.
The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.
average (bool) – If
True
, the gradients on each worker are averaged. Otherwise, they are summed.scattergather (bool) – If
True
, the communication between workers are done with scatter gather instead of allreduce. This is required for using compression.compression (Optional[str]) – If not
None
, the tensors will be compressed for communication. Currently"MinMaxUInt8"
is supported.group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If
None
, the default process group will be used.
- append_decentralized_synchronous_op(self, peer_weight, hierarchical=True, peer_selection_mode='all', group=None)¶
Append a decentralized synchronous operation to a bucket. It will do gossipy style model averaging among workers.
This operation is not inplace, which means the bucket weights is first copied to
peer_weight
, and the result of decentralized averaging will be inpeer_weight
. To copypeer_weight
back toself
, callop.copy_back_peer_weight(self)
.This operation will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
peer_weight (BaguaTensor) – A tensor used for averaging model with peers, should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().ensure_bagua_tensor(...)
to create such a tensor.hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.
peer_selection_mode (str) – Can be
"all"
or"shift_one"
."all"
means all workers’ weights are averaged in each communication step."shift_one"
means each worker selects a different peer to do weights average in each communication step.group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If
None
, the default process group will be used.
- Returns
The decentralized synchronous operation itself.
- append_low_precision_decentralized_synchronous_op(self, weight, left_peer_weight, right_peer_weight, hierarchical=True, compression='MinMaxUInt8', group=None)¶
Append a low precision decentralized synchronous operation to a bucket. It will compress the difference of local models between two successive iterations and exchange them among workers.
The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
weight (BaguaTensor) – Model replica of current worker’s local model. It should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().ensure_bagua_tensor(...)
to create such a tensor.left_peer_weight (BaguaTensor) – Model replica of current worker’s left peer. It should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().ensure_bagua_tensor(...)
to create such a tensor, then copy the initializing weights of current worker’s left peer to the tensor.right_peer_weight (BaguaTensor) – Model replica of current worker’s right peer. It should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().ensure_bagua_tensor(...)
to create such a tensor. then copy the initializing weights of current worker’s right peer to the tensor.hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.
compression (str) – The way how tensors are compressed for communication. Currently
"MinMaxUInt8"
is supported.group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If
None
, the default process group will be used.
- append_python_op(self, python_function, group=None)¶
Append a Python operation to a bucket. A Python operation is a Python function that takes the bucket’s name and returns
None
. It can do arbitrary things within the function body.The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
python_function (Callable[[str], None]) – The Python operation function.
group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If
None
, the default process group will be used.
- bytes(self)¶
Returns the total number of bytes occupied by the bucket.
- Return type
int
- check_flatten(self)¶
- Returns
True if effective tensors are contiguous in memory.
- Return type
bool
- clear_ops(self)¶
Clear the previously appended operations.
- Return type
- flattened_tensor(self)¶
Returns a tensor contiguous in memory which contains the same data as effective tensors, i.e. returned by calling
bagua_getter_closure
onself
tensors and padding tensor (if exists).- Return type
torch.Tensor
bagua.torch_api.communication¶
- class bagua.torch_api.communication.BaguaProcessGroup(ranks, stream, group_name)¶
Definition of Bagua process group.
- get_global_communicator(self)¶
Returns the global communicator of current process group.
- Return type
bagua_core.BaguaSingleCommunicatorPy
- get_inter_node_communicator(self)¶
Returns the inter-node communicator of current process group.
- Return type
bagua_core.BaguaSingleCommunicatorPy
- get_intra_node_communicator(self)¶
Returns the intra-node communicator of current process group.
- Return type
bagua_core.BaguaSingleCommunicatorPy
- class bagua.torch_api.communication.ReduceOp¶
Bases:
enum.IntEnum
An enum-like class for available reduction operations:
SUM
,PRODUCT
,MIN
,MAX
,BAND
,BOR
,BXOR
andAVG
.Initialize self. See help(type(self)) for accurate signature.
- AVG = 10¶
- BAND = 8¶
- BOR = 7¶
- BXOR = 9¶
- MAX = 3¶
- MIN = 2¶
- PRODUCT = 1¶
- SUM = 0¶
- bagua.torch_api.communication.allgather(send_tensor, recv_tensor, comm=None)¶
Gathers send tensors from all processes associated with the communicator into
recv_tensor
.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have a size of
comm.nranks * send_tensor.size()
elements.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.allgather_inplace(tensor, comm=None)¶
The in-place version of
allgather
.- Parameters
tensor (torch.Tensor) –
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –
- bagua.torch_api.communication.allreduce(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)¶
Reduces the tensor data across all processes associated with the communicator in such a way that all get the final result. After the call
recv_tensor
is going to be bitwise identical in all processes.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have the same size with
send_tensor
.op (ReduceOp) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
Examples:
>>> from bagua.torch_api import allreduce >>> >>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> send_tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> recv_tensor = torch.zeros(2, dtype=torch.int64) >>> send_tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> allreduce(send_tensor, recv_tensor) >>> recv_tensor tensor([4, 6]) # Rank 0 tensor([4, 6]) # Rank 1 >>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> send_tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> recv_tensor = torch.zeros(2, dtype=torch.cfloat) >>> send_tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> allreduce(send_tensor, recv_tensor) >>> recv_tensor tensor([4.+4.j, 6.+6.j]) # Rank 0 tensor([4.+4.j, 6.+6.j]) # Rank 1
- bagua.torch_api.communication.allreduce_inplace(tensor, op=ReduceOp.SUM, comm=None)¶
The in-place version of
allreduce
.- Parameters
tensor (torch.Tensor) –
op (ReduceOp) –
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –
- bagua.torch_api.communication.alltoall(send_tensor, recv_tensor, comm=None)¶
Each process scatters
send_tensor
to all processes associated with the communicator and return the gathered data inrecv_tensor
.- Parameters
send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by
comm.nranks
.recv_tensor (torch.Tensor) – Output of the collective, must have equal size with
send_tensor
.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.alltoall_inplace(tensor, comm=None)¶
The in-place version of
alltoall
.- Parameters
tensor (torch.Tensor) –
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –
- bagua.torch_api.communication.barrier(comm=None)¶
Synchronizes all processes. This collective blocks processes until all processes associated with the communicator enters this function.
- Parameters
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.broadcast(tensor, src=0, comm=None)¶
Broadcasts the tensor to all processes associated with the communicator.
tensor
must have the same number of elements in all processes participating in the collective.- Parameters
tensor (torch.Tensor) – Data to be sent if
src
is the rank of current process, and tensor to be used to save received data otherwise.src (int) – Source rank. Default: 0.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.from_torch_group(group, stream=None)¶
Convert a Pytorch process group to its equivalent Bagua process group.
- Parameters
group – A handle of the Pytorch process group.
stream (Optional[torch.cuda.Stream]) – A CUDA stream used to execute NCCL operations. If
None
, CUDA stream of the default group will be used. Seenew_group
for more information.
- Returns
A handle of the Bagua process group.
- Return type
- bagua.torch_api.communication.gather(send_tensor, recv_tensor, dst, comm=None)¶
Gathers send tensors from all processes associated with the communicator to
recv_tensor
in a single process.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have a size of
comm.nranks * send_tensor.size()
elements.dst (int) – Destination rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.gather_inplace(tensor, count, dst, comm=None)¶
The in-place version of
gather
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, On the
dst
rank, it must have a size ofcomm.nranks * count
elements. On non-dst ranks, its size must be equal to :attr:count
.count (int) – The per-rank data count to gather.
dst (int) – Destination rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.init_process_group(store=None)¶
Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of Bagua.
- Parameters
store (Optional[torch.distributed.Store]) – Key/value store accessible to all workers, used to exchange connection/address information. If
None
, a TCP-based store will be created. Default:None
.
- Examples::
>>> import torch >>> import bagua.torch_api as bagua >>> >>> torch.cuda.set_device(bagua.get_local_rank()) # THIS LINE IS IMPORTANT. See the notes below. >>> bagua.init_process_group() >>> >>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model = model.with_bagua([optimizer], ...)
Note
Each process should be associated to a CUDA device using torch.cuda.set_device(), before calling
init_process_group
. Otherwise you may encounter the fatal runtime error: Rust cannot catch foreign exceptions error.
- bagua.torch_api.communication.is_initialized()¶
Checking if the default process group has been initialized.
- bagua.torch_api.communication.new_group(ranks=None, stream=None)¶
Creates a new process group.
This function requires that all processes in the default group (i.e. all processes that are part of the distributed job) enter this function, even if they are not going to be members of the group. Additionally, groups should be created in the same order in all processes.
Each process group will create three communicators on request, a global communicator, a inter-node communicator and a intra-node communicator. Users can access them through
group.get_global_communicator()
,group.get_inter_node_communicator()
andgroup.get_intra_node_communicator()
respectively.- Parameters
ranks (Optional[List[int]]) – List of ranks of group members. If
None
, will be set to all ranks. Default isNone
.stream (Optional[torch.cuda.Stream]) – A CUDA stream used to execute NCCL operations. If
None
, CUDA stream of the default group will be used. See CUDA semantics for details.
- Returns
A handle of process group that can be given to collective calls.
- Return type
Note
The global communicator is used for global communications involving all ranks in the process group. The inter-node communicator and the intra-node communicator is used for hierarchical communications in this process group.
Note
For a specific communicator
comm
,comm.rank()
returns the rank of current process andcomm.nranks()
returns the size of the communicator.
- bagua.torch_api.communication.recv(tensor, src, comm=None)¶
Receives a tensor synchronously.
- Parameters
tensor (torch.Tensor) – Tensor to fill with received data.
src (int) – Source rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.reduce(send_tensor, recv_tensor, dst, op=ReduceOp.SUM, comm=None)¶
Reduces the tensor data across all processes.
Only the process whit rank
dst
is going to receive the final result.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have the same size with
send_tensor
.dst (int) – Destination rank.
op (ReduceOp) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.reduce_inplace(tensor, dst, op=ReduceOp.SUM, comm=None)¶
The in-place version of
reduce
.- Parameters
tensor (torch.Tensor) –
dst (int) –
op (ReduceOp) –
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –
- bagua.torch_api.communication.reduce_scatter(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)¶
Reduces, then scatters
send_tensor
to all processes associated with the communicator.- Parameters
send_tensor (torch.Tensor) – Input of the collective, must have a size of
comm.nranks * recv_tensor.size()
elements.recv_tensor (torch.Tensor) – Output of the collective.
op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.reduce_scatter_inplace(tensor, op=ReduceOp.SUM, comm=None)¶
The in-place version of
reduce_scatter
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, the size must be divisible by
comm.nranks
.op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.scatter(send_tensor, recv_tensor, src, comm=None)¶
Scatters send tensor to all processes associated with the communicator.
- Parameters
send_tensor (torch.Tensor) – Input of the collective, must have a size of
comm.nranks * recv_tensor.size()
elements.recv_tensor (torch.Tensor) – Output of the collective.
src (int) – Source rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.scatter_inplace(tensor, count, src, comm=None)¶
The in-place version of
scatter
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, On the
src
rank, it must have a size ofcomm.nranks * count
elements. On non-src ranks, its size must be equal tocount
.count (int) – The per-rank data count to scatter.
src (int) – Source rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.communication.send(tensor, dst, comm=None)¶
Sends a tensor to
dst
synchronously.- Parameters
tensor (torch.Tensor) – Tensor to send.
dst (int) – Destination rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
bagua.torch_api.distributed¶
- class bagua.torch_api.distributed.BaguaModule¶
This class patches torch.nn.Module with several methods to enable Bagua functionalities.
- Variables
bagua_optimizers (str) – The optimizers passed in by
with_bagua
.bagua_algorithm (bagua.torch_api.algorithms.AlgorithmImpl) – The algorithm implementation used by the module, reified by the algorithm passed in by
with_bagua
.process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group used by the module.
bagua_module_name – The module’s name. Bagua uses the module name to distinguish different modules.
parameters_to_ignore (List[str]) – The parameter names in
"{module_name}.{param_name}"
format to ignore when callingself.bagua_build_params()
.bagua_train_step_counter (int) – Number of iterations in training mode.
bagua_buckets (List[bagua.torch_api.bucket.BaguaBucket]) – All Bagua buckets in a list.
- bagua_build_params(self)¶
Build tuple of
(parameter_name, parameter)
for all parameters that require grads and not in the_bagua_params_and_buffers_to_ignore
attribute.- Return type
List[Tuple[str, torch.nn.Parameter]]
- with_bagua(self, optimizers, algorithm, process_group=None, do_flatten=True)¶
with_bagua
enables easy distributed data parallel training on a torch.nn.Module.- Parameters
optimizers (List[torch.optim.Optimizer]) – Optimizer(s) used by the module. It can contain one or more PyTorch optimizers.
algorithm (bagua.torch_api.algorithms.Algorithm) – Distributed algorithm used to do the actual communication and update.
process_group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to be used for distributed data all-reduction. If
None
, the default process group, which is created bybagua.torch_api.init_process_group
, will be used. (default:None
)do_flatten (bool) – Whether to flatten the Bagua buckets. The flatten operation will reset data pointer of bucket tensors so that they can use faster code paths. Default:
True
.
- Returns
The original module, with Bagua related environments initialized.
- Return type
Note
If we want to ignore some layers for communication, we can first check these layer’s corresponding keys in the module’s
state_dict
(they are in"{module_name}.{param_name}"
format), then assign the list of keys toyour_module._bagua_params_and_buffers_to_ignore
.Examples:
>>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model = model.with_bagua( ... [optimizer], ... GradientAllReduce() ... )
bagua.torch_api.env¶
- bagua.torch_api.env.get_default_bucket_size()¶
Get default communication bucket byte size.
- Returns
The default bucket size.
- Return type
int
- bagua.torch_api.env.get_local_rank()¶
Get the rank of current node.
Local rank is a unique identifier assigned to each process within a node. They are always consecutive integers ranging from 0 to
local_size
.- Returns
The local rank of the node.
- Return type
int
- bagua.torch_api.env.get_local_size()¶
Get the number of processes in the node.
- Returns
The local size of the node.
- Return type
int
- bagua.torch_api.env.get_rank()¶
Get the rank of current process group.
Rank is a unique identifier assigned to each process within a distributed process group. They are always consecutive integers ranging from 0 to
world_size
.- Returns
The rank of the process group.
- Return type
int
- bagua.torch_api.env.get_world_size()¶
Get the number of processes in the current process group.
- Returns
The world size of the process group.
- Return type
int
bagua.torch_api.tensor¶
- bagua.torch_api.tensor.LooseVersion¶
- class bagua.torch_api.tensor.BaguaTensor¶
This class patch torch.Tensor with additional methods.
A Bagua tensor is required to use Bagua’s communication algorithms. Users can convert a PyTorch tensor to Bagua tensor by
ensure_bagua_tensor
.Bagua tensor features a proxy structure, where the actual tensor used by backend is accessed via a “Proxy Tensor”. The proxy tensor is registered in Bagua, whenever the Bagua backend needs a tensor (for example use it for communication), it calls the
bagua_getter_closure
on the proxy tensor to get the tensor that is actually worked on. We call this tensor “Effective Tensor”. Thebagua_setter_closure
is also provided to replace the effective tensor during runtime. It is intended to be used to replace the effective tensor with customized workflow.Their relation can be seen in the following diagram:
For example, in the gradient allreduce algorithm, the effective tensor that needs to be exchanged between machines is the gradient. In this case, we will register the model parameters as proxy tensor, and register
bagua_getter_closure
to belambda proxy_tensor: proxy_tensor.grad
. In this way, even if the gradient tensor is recreated or changed during runtime, Bagua can still identify the correct tensor and use it for communication, since the proxy tensor serves as the root for access and is never replaced.- bagua_backend_tensor(self)¶
- Returns
The raw Bagua backend tensor.
- Return type
bagua_core.BaguaTensorPy
- bagua_ensure_grad(self)¶
Create a zero gradient tensor for the current parameter if not exist.
- Returns
The original tensor.
- Return type
torch.Tensor
- bagua_getter_closure(self)¶
Returns the tensor that will be used in runtime.
- Return type
torch.Tensor
- bagua_mark_communication_ready(self)¶
Mark a Bagua tensor ready for scheduled operations execution.
- bagua_mark_communication_ready_without_synchronization(self)¶
Mark a Bagua tensor ready immediately, without CUDA event synchronization.
- bagua_set_storage(self, storage, storage_offset=0)¶
Sets the underlying storage for the effective tensor returned by
bagua_getter_closure
with an existing torch.Storage.- Parameters
storage (torch.Storage) – The storage to use.
storage_offset (int) – The offset in the storage.
- bagua_setter_closure(self, tensor)¶
Sets the tensor that will be used in runtime to a new Pytorch tensor
tensor
.- Parameters
tensor (torch.Tensor) – The new tensor to be set to.
- ensure_bagua_tensor(self, name=None, module_name=None, getter_closure=None, setter_closure=None)¶
Convert a PyTorch tensor or parameter to Bagua tensor inplace and return it. A Bagua tensor is required to use Bagua’s communication algorithms.
This operation will register
self
as proxy tensor to the Bagua backend.getter_closure
takes the proxy tensor as input and returns a Pytorch tensor. When using the Bagua tensor, thegetter_closure
will be called and returns the effective tensor which will be used for communication and other operations. For example, if one of a model’s parameterparam
is registered as proxy tensor, andgetter_closure
islambda x: x.grad
, during runtime its gradient will be used.setter_closure
takes the proxy tensor and another tensor as inputs and returns nothing. It is mainly used for changing the effective tensor used in runtime. For example when one of a model’s parameterparam
is registered as proxy tensor, andgetter_closure
islambda x: x.grad
, thesetter_closure
can belambda param, new_grad_tensor: setattr(param, "grad", new_grad_tensor)
. When thesetter_closure
is called, the effective tensor used in later operations will be changed tonew_grad_tensor
.- Parameters
name (Optional[str]) – The unique name of the tensor.
module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using
model.bagua_module_name
. This is required to callbagua_mark_communication_ready
related methods.getter_closure (Optional[Callable[[torch.Tensor], torch.Tensor]]) – A function that accepts a Pytorch tensor as its input and returns a Pytorch tensor as its output. Could be
None
, which means an identity mappinglambda x: x
is used. Default:None
.setter_closure (Optional[Callable[[torch.Tensor, torch.Tensor], None]]) – A function that accepts two Pytorch tensors as its inputs and returns nothing. Could be
None
, which is a no-op. Default:None
.
- Returns
The original tensor with Bagua tensor attributes initialized.
- is_bagua_tensor(self)¶
Checking if this is a Bagua tensor.
- Return type
bool
- to_bagua_tensor(self, name=None, module_name=None, getter_closure=None, setter_closure=None)¶
Create a new Bagua tensor from a PyTorch tensor or parameter and return it. The new Bagua tensor will share the same storage with the input PyTorch tensor. A Bagua tensor is required to use Bagua’s communication algorithms. See
ensure_bagua_tensor
for more information.Caveat: Be aware that if the original tensor changes to use a different storage using for example
torch.Tensor.set_(...)
, the new Bagua tensor will still use the old storage.- Parameters
name (Optional[str]) – The unique name of the tensor.
module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using
model.bagua_module_name
. This is required to callbagua_mark_communication_ready
related methods.getter_closure (Optional[Callable[[torch.Tensor], torch.Tensor]]) – A function that accepts a Pytorch tensor as its input and returns a Pytorch tensor as its output. See
ensure_bagua_tensor
.setter_closure (Optional[Callable[[torch.Tensor, torch.Tensor], None]]) – A function that accepts two Pytorch tensors as its inputs and returns nothing. See
ensure_bagua_tensor
.
- Returns
The new Bagua tensor sharing the same storage with the original tensor.
Package Contents¶
- bagua.torch_api.version¶
- class bagua.torch_api.BaguaModule¶
This class patches torch.nn.Module with several methods to enable Bagua functionalities.
- Variables
bagua_optimizers (str) – The optimizers passed in by
with_bagua
.bagua_algorithm (bagua.torch_api.algorithms.AlgorithmImpl) – The algorithm implementation used by the module, reified by the algorithm passed in by
with_bagua
.process_group (bagua.torch_api.communication.BaguaProcessGroup) – The process group used by the module.
bagua_module_name – The module’s name. Bagua uses the module name to distinguish different modules.
parameters_to_ignore (List[str]) – The parameter names in
"{module_name}.{param_name}"
format to ignore when callingself.bagua_build_params()
.bagua_train_step_counter (int) – Number of iterations in training mode.
bagua_buckets (List[bagua.torch_api.bucket.BaguaBucket]) – All Bagua buckets in a list.
- bagua_build_params(self)¶
Build tuple of
(parameter_name, parameter)
for all parameters that require grads and not in the_bagua_params_and_buffers_to_ignore
attribute.- Return type
List[Tuple[str, torch.nn.Parameter]]
- with_bagua(self, optimizers, algorithm, process_group=None, do_flatten=True)¶
with_bagua
enables easy distributed data parallel training on a torch.nn.Module.- Parameters
optimizers (List[torch.optim.Optimizer]) – Optimizer(s) used by the module. It can contain one or more PyTorch optimizers.
algorithm (bagua.torch_api.algorithms.Algorithm) – Distributed algorithm used to do the actual communication and update.
process_group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to be used for distributed data all-reduction. If
None
, the default process group, which is created bybagua.torch_api.init_process_group
, will be used. (default:None
)do_flatten (bool) – Whether to flatten the Bagua buckets. The flatten operation will reset data pointer of bucket tensors so that they can use faster code paths. Default:
True
.
- Returns
The original module, with Bagua related environments initialized.
- Return type
Note
If we want to ignore some layers for communication, we can first check these layer’s corresponding keys in the module’s
state_dict
(they are in"{module_name}.{param_name}"
format), then assign the list of keys toyour_module._bagua_params_and_buffers_to_ignore
.Examples:
>>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model = model.with_bagua( ... [optimizer], ... GradientAllReduce() ... )
- class bagua.torch_api.BaguaTensor¶
This class patch torch.Tensor with additional methods.
A Bagua tensor is required to use Bagua’s communication algorithms. Users can convert a PyTorch tensor to Bagua tensor by
ensure_bagua_tensor
.Bagua tensor features a proxy structure, where the actual tensor used by backend is accessed via a “Proxy Tensor”. The proxy tensor is registered in Bagua, whenever the Bagua backend needs a tensor (for example use it for communication), it calls the
bagua_getter_closure
on the proxy tensor to get the tensor that is actually worked on. We call this tensor “Effective Tensor”. Thebagua_setter_closure
is also provided to replace the effective tensor during runtime. It is intended to be used to replace the effective tensor with customized workflow.Their relation can be seen in the following diagram:
For example, in the gradient allreduce algorithm, the effective tensor that needs to be exchanged between machines is the gradient. In this case, we will register the model parameters as proxy tensor, and register
bagua_getter_closure
to belambda proxy_tensor: proxy_tensor.grad
. In this way, even if the gradient tensor is recreated or changed during runtime, Bagua can still identify the correct tensor and use it for communication, since the proxy tensor serves as the root for access and is never replaced.- bagua_backend_tensor(self)¶
- Returns
The raw Bagua backend tensor.
- Return type
bagua_core.BaguaTensorPy
- bagua_ensure_grad(self)¶
Create a zero gradient tensor for the current parameter if not exist.
- Returns
The original tensor.
- Return type
torch.Tensor
- bagua_getter_closure(self)¶
Returns the tensor that will be used in runtime.
- Return type
torch.Tensor
- bagua_mark_communication_ready(self)¶
Mark a Bagua tensor ready for scheduled operations execution.
- bagua_mark_communication_ready_without_synchronization(self)¶
Mark a Bagua tensor ready immediately, without CUDA event synchronization.
- bagua_set_storage(self, storage, storage_offset=0)¶
Sets the underlying storage for the effective tensor returned by
bagua_getter_closure
with an existing torch.Storage.- Parameters
storage (torch.Storage) – The storage to use.
storage_offset (int) – The offset in the storage.
- bagua_setter_closure(self, tensor)¶
Sets the tensor that will be used in runtime to a new Pytorch tensor
tensor
.- Parameters
tensor (torch.Tensor) – The new tensor to be set to.
- ensure_bagua_tensor(self, name=None, module_name=None, getter_closure=None, setter_closure=None)¶
Convert a PyTorch tensor or parameter to Bagua tensor inplace and return it. A Bagua tensor is required to use Bagua’s communication algorithms.
This operation will register
self
as proxy tensor to the Bagua backend.getter_closure
takes the proxy tensor as input and returns a Pytorch tensor. When using the Bagua tensor, thegetter_closure
will be called and returns the effective tensor which will be used for communication and other operations. For example, if one of a model’s parameterparam
is registered as proxy tensor, andgetter_closure
islambda x: x.grad
, during runtime its gradient will be used.setter_closure
takes the proxy tensor and another tensor as inputs and returns nothing. It is mainly used for changing the effective tensor used in runtime. For example when one of a model’s parameterparam
is registered as proxy tensor, andgetter_closure
islambda x: x.grad
, thesetter_closure
can belambda param, new_grad_tensor: setattr(param, "grad", new_grad_tensor)
. When thesetter_closure
is called, the effective tensor used in later operations will be changed tonew_grad_tensor
.- Parameters
name (Optional[str]) – The unique name of the tensor.
module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using
model.bagua_module_name
. This is required to callbagua_mark_communication_ready
related methods.getter_closure (Optional[Callable[[torch.Tensor], torch.Tensor]]) – A function that accepts a Pytorch tensor as its input and returns a Pytorch tensor as its output. Could be
None
, which means an identity mappinglambda x: x
is used. Default:None
.setter_closure (Optional[Callable[[torch.Tensor, torch.Tensor], None]]) – A function that accepts two Pytorch tensors as its inputs and returns nothing. Could be
None
, which is a no-op. Default:None
.
- Returns
The original tensor with Bagua tensor attributes initialized.
- is_bagua_tensor(self)¶
Checking if this is a Bagua tensor.
- Return type
bool
- to_bagua_tensor(self, name=None, module_name=None, getter_closure=None, setter_closure=None)¶
Create a new Bagua tensor from a PyTorch tensor or parameter and return it. The new Bagua tensor will share the same storage with the input PyTorch tensor. A Bagua tensor is required to use Bagua’s communication algorithms. See
ensure_bagua_tensor
for more information.Caveat: Be aware that if the original tensor changes to use a different storage using for example
torch.Tensor.set_(...)
, the new Bagua tensor will still use the old storage.- Parameters
name (Optional[str]) – The unique name of the tensor.
module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using
model.bagua_module_name
. This is required to callbagua_mark_communication_ready
related methods.getter_closure (Optional[Callable[[torch.Tensor], torch.Tensor]]) – A function that accepts a Pytorch tensor as its input and returns a Pytorch tensor as its output. See
ensure_bagua_tensor
.setter_closure (Optional[Callable[[torch.Tensor, torch.Tensor], None]]) – A function that accepts two Pytorch tensors as its inputs and returns nothing. See
ensure_bagua_tensor
.
- Returns
The new Bagua tensor sharing the same storage with the original tensor.
- class bagua.torch_api.ReduceOp¶
Bases:
enum.IntEnum
An enum-like class for available reduction operations:
SUM
,PRODUCT
,MIN
,MAX
,BAND
,BOR
,BXOR
andAVG
.Initialize self. See help(type(self)) for accurate signature.
- AVG = 10¶
- BAND = 8¶
- BOR = 7¶
- BXOR = 9¶
- MAX = 3¶
- MIN = 2¶
- PRODUCT = 1¶
- SUM = 0¶
- bagua.torch_api.allgather(send_tensor, recv_tensor, comm=None)¶
Gathers send tensors from all processes associated with the communicator into
recv_tensor
.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have a size of
comm.nranks * send_tensor.size()
elements.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.allgather_inplace(tensor, comm=None)¶
The in-place version of
allgather
.- Parameters
tensor (torch.Tensor) –
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –
- bagua.torch_api.allreduce(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)¶
Reduces the tensor data across all processes associated with the communicator in such a way that all get the final result. After the call
recv_tensor
is going to be bitwise identical in all processes.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have the same size with
send_tensor
.op (ReduceOp) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
Examples:
>>> from bagua.torch_api import allreduce >>> >>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> send_tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> recv_tensor = torch.zeros(2, dtype=torch.int64) >>> send_tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> allreduce(send_tensor, recv_tensor) >>> recv_tensor tensor([4, 6]) # Rank 0 tensor([4, 6]) # Rank 1 >>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> send_tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> recv_tensor = torch.zeros(2, dtype=torch.cfloat) >>> send_tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> allreduce(send_tensor, recv_tensor) >>> recv_tensor tensor([4.+4.j, 6.+6.j]) # Rank 0 tensor([4.+4.j, 6.+6.j]) # Rank 1
- bagua.torch_api.allreduce_inplace(tensor, op=ReduceOp.SUM, comm=None)¶
The in-place version of
allreduce
.- Parameters
tensor (torch.Tensor) –
op (ReduceOp) –
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –
- bagua.torch_api.alltoall(send_tensor, recv_tensor, comm=None)¶
Each process scatters
send_tensor
to all processes associated with the communicator and return the gathered data inrecv_tensor
.- Parameters
send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by
comm.nranks
.recv_tensor (torch.Tensor) – Output of the collective, must have equal size with
send_tensor
.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.alltoall_inplace(tensor, comm=None)¶
The in-place version of
alltoall
.- Parameters
tensor (torch.Tensor) –
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –
- bagua.torch_api.alltoall_v(send_tensor, send_counts, send_displs, recv_tensor, recv_counts, recv_displs, comm=None)¶
Each process scatters
send_tensor
to all processes associated with the communicator and return the gathered data inrecv_tensor
, each process may send a different amount of data and provide displacements for the input and output data.- Parameters
send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by
comm.nranks
.send_counts (int) – integer array equal to the group size specifying the number of elements to send to each processor.
send_displs (int) – integer array (of length group size). Entry j specifies the displacement (relative to sendbuf from which to take the outgoing data destined for process j.
recv_tensor (torch.Tensor) – Output of the collective, must have equal size with
send_tensor
.recv_counts (int) – integer array equal to the group size specifying the maximum number of elements that can be received from each processor.
recv_displs (int) – integer array (of length group size). Entry i specifies the displacement (relative to recvbuf at which to place the incoming data from process i.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.alltoall_v_inplace(tensor, counts, displs, comm=None)¶
The in-place version of
alltoall_v
.- Parameters
tensor (torch.Tensor) –
counts (int) –
displs (int) –
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –
- bagua.torch_api.broadcast(tensor, src=0, comm=None)¶
Broadcasts the tensor to all processes associated with the communicator.
tensor
must have the same number of elements in all processes participating in the collective.- Parameters
tensor (torch.Tensor) – Data to be sent if
src
is the rank of current process, and tensor to be used to save received data otherwise.src (int) – Source rank. Default: 0.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.gather(send_tensor, recv_tensor, dst, comm=None)¶
Gathers send tensors from all processes associated with the communicator to
recv_tensor
in a single process.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have a size of
comm.nranks * send_tensor.size()
elements.dst (int) – Destination rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.gather_inplace(tensor, count, dst, comm=None)¶
The in-place version of
gather
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, On the
dst
rank, it must have a size ofcomm.nranks * count
elements. On non-dst ranks, its size must be equal to :attr:count
.count (int) – The per-rank data count to gather.
dst (int) – Destination rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.get_backend(model_name)¶
- Parameters
model_name (str) –
- bagua.torch_api.get_local_rank()¶
Get the rank of current node.
Local rank is a unique identifier assigned to each process within a node. They are always consecutive integers ranging from 0 to
local_size
.- Returns
The local rank of the node.
- Return type
int
- bagua.torch_api.get_local_size()¶
Get the number of processes in the node.
- Returns
The local size of the node.
- Return type
int
- bagua.torch_api.get_rank()¶
Get the rank of current process group.
Rank is a unique identifier assigned to each process within a distributed process group. They are always consecutive integers ranging from 0 to
world_size
.- Returns
The rank of the process group.
- Return type
int
- bagua.torch_api.get_world_size()¶
Get the number of processes in the current process group.
- Returns
The world size of the process group.
- Return type
int
- bagua.torch_api.init_process_group(store=None)¶
Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of Bagua.
- Parameters
store (Optional[torch.distributed.Store]) – Key/value store accessible to all workers, used to exchange connection/address information. If
None
, a TCP-based store will be created. Default:None
.
- Examples::
>>> import torch >>> import bagua.torch_api as bagua >>> >>> torch.cuda.set_device(bagua.get_local_rank()) # THIS LINE IS IMPORTANT. See the notes below. >>> bagua.init_process_group() >>> >>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model = model.with_bagua([optimizer], ...)
Note
Each process should be associated to a CUDA device using torch.cuda.set_device(), before calling
init_process_group
. Otherwise you may encounter the fatal runtime error: Rust cannot catch foreign exceptions error.
- bagua.torch_api.recv(tensor, src, comm=None)¶
Receives a tensor synchronously.
- Parameters
tensor (torch.Tensor) – Tensor to fill with received data.
src (int) – Source rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.reduce(send_tensor, recv_tensor, dst, op=ReduceOp.SUM, comm=None)¶
Reduces the tensor data across all processes.
Only the process whit rank
dst
is going to receive the final result.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have the same size with
send_tensor
.dst (int) – Destination rank.
op (ReduceOp) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.reduce_inplace(tensor, dst, op=ReduceOp.SUM, comm=None)¶
The in-place version of
reduce
.- Parameters
tensor (torch.Tensor) –
dst (int) –
op (ReduceOp) –
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) –
- bagua.torch_api.reduce_scatter(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)¶
Reduces, then scatters
send_tensor
to all processes associated with the communicator.- Parameters
send_tensor (torch.Tensor) – Input of the collective, must have a size of
comm.nranks * recv_tensor.size()
elements.recv_tensor (torch.Tensor) – Output of the collective.
op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.reduce_scatter_inplace(tensor, op=ReduceOp.SUM, comm=None)¶
The in-place version of
reduce_scatter
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, the size must be divisible by
comm.nranks
.op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.scatter(send_tensor, recv_tensor, src, comm=None)¶
Scatters send tensor to all processes associated with the communicator.
- Parameters
send_tensor (torch.Tensor) – Input of the collective, must have a size of
comm.nranks * recv_tensor.size()
elements.recv_tensor (torch.Tensor) – Output of the collective.
src (int) – Source rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.scatter_inplace(tensor, count, src, comm=None)¶
The in-place version of
scatter
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, On the
src
rank, it must have a size ofcomm.nranks * count
elements. On non-src ranks, its size must be equal tocount
.count (int) – The per-rank data count to scatter.
src (int) – Source rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.
- bagua.torch_api.send(tensor, dst, comm=None)¶
Sends a tensor to
dst
synchronously.- Parameters
tensor (torch.Tensor) – Tensor to send.
dst (int) – Destination rank.
comm (Optional[bagua_core.BaguaSingleCommunicatorPy]) – A handle of the Bagua communicator to work on. By default, the global communicator of the default process group will be used.