Bagua¶
This website contains Bagua API documentation. See tutorials if you need step by step instructions on how to use Bagua.
bagua¶
Bagua is a communication library developed by Kuaishou Technology and DS3 Lab for deep learning.
See tutorials for Bagua’s rationale and benchmark.
Subpackages¶
bagua.torch_api¶
The Bagua communication library PyTorch interface.
Subpackages¶
bagua.torch_api.algorithms¶
- class bagua.torch_api.algorithms.async_model_average.AsyncModelAverageAlgorithm(peer_selection_mode='all', sync_interval_ms=500, warmup_steps=0)¶
Bases:
bagua.torch_api.algorithms.Algorithm
Create an instance of the AsyncModelAverage algorithm.
The asynchronous implementation is experimental, and imposes some restrictions. With such asynchronous algorithm, the number of iterations on each worker are different. Therefore the current implementation assumes that the dataset is an endless stream, and all workers continuously synchronize between each other.
Users should call
abort
to manually stop the algorithm’s continuous synchronization process.- Parameters
peer_selection_mode (str) – The way how workers communicate with each other. Currently
"all"
is supported."all"
means all workers’ weights are synchronized during each communication.sync_interval_ms (int) – Number of milliseconds between model synchronizations.
warmup_steps (int) – Number of steps to warm up by doing gradient allreduce before doing asynchronous model averaging. Use 0 to disable.
- abort(self, bagua_module)¶
Stop background asynchronous communications. Should be called after training.
- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.
- resume(self, bagua_module)¶
Resume aborted background asynchronous communications (see
abort
). Should be called before training.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.
- class bagua.torch_api.algorithms.base.Algorithm¶
This is the base class that all Bagua algorithms inherit.
It provides methods that can be override to implement different kinds of distributed algorithms.
- init_backward_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed on every parameter’s gradient computation completion.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes the name of a parameter (as in
torch.nn.Module.named_parameters
) and the parameter itself.
- init_forward_pre_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed before the forward process.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes the model’s input.
- init_operations(self, bagua_module, bucket)¶
Given a
BaguaModule
, and aBaguaBucket
, register operations to be executed on the bucket.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.bucket (bagua.torch_api.bucket.BaguaBucket) – A single bucket to register operations.
- init_post_backward_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed when the backward pass is done.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes no argument.
- init_post_optimizer_step_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed when theoptimizer.step()
is done.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that gets called after an optimizer’s
step()
method is called. The function takes the optimizer as its argument.
- init_tensors(self, bagua_module)¶
Given a
BaguaModule
, return Bagua tensors to be used in Bagua for later operations.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A list of Bagua tensors for communication.
- Return type
- need_reset(self)¶
- Returns
True
if all initialization methods of the current algorithms should be called again. This is useful for algorithms that have multiple stages where each stage needs different initializations.- Return type
bool
- tensors_to_buckets(self, tensors)¶
Given the bucketing suggestion from Bagua, return the actual Bagua buckets. The default implementation follows the suggestion to do the bucketing.
- Parameters
tensors (List[List[bagua.torch_api.tensor.BaguaTensor]]) – Bagua tensors grouped in different lists, representing Bagua’s suggestion on how to bucketing the tensors.
- Returns
A list of Bagua buckets.
- Return type
- class bagua.torch_api.algorithms.bytegrad.ByteGradAlgorithm(average=True)¶
Bases:
bagua.torch_api.algorithms.Algorithm
Create an instance of the ByteGrad algorithm.
- Parameters
average (bool) – If
True
, the gradients on each worker are averaged. Otherwise, they are summed.
- class bagua.torch_api.algorithms.decentralized.DecentralizedAlgorithm(hierarchical=True, peer_selection_mode='all', communication_interval=1)¶
Bases:
bagua.torch_api.algorithms.Algorithm
Create an instance of the Decentralized SGD algorithm.
- Parameters
hierarchical (bool) – Enable hierarchical communication.
peer_selection_mode (str) – Can be
"all"
or"shift_one"
."all"
means all workers’ weights are averaged in each communication step."shift_one"
means each worker selects a different peer to do weights average in each communication step.communication_interval (int) – Number of iterations between two communication steps.
- class bagua.torch_api.algorithms.decentralized.LowPrecisionDecentralizedAlgorithm(hierarchical=True, communication_interval=1)¶
Bases:
bagua.torch_api.algorithms.Algorithm
Create an instance of the Low Precision Decentralized SGD algorithm.
- Parameters
hierarchical (bool) – Enable hierarchical communication.
communication_interval (int) – Number of iterations between two communication steps.
- class bagua.torch_api.algorithms.gradient_allreduce.GradientAllReduceAlgorithm(hierarchical=False, average=True)¶
Bases:
bagua.torch_api.algorithms.Algorithm
Create an instance of the GradientAllReduce algorithm.
- Parameters
hierarchical (bool) – Enable hierarchical communication.
average (bool) – If
True
, the gradients on each worker are averaged. Otherwise, they are summed.
- class bagua.torch_api.algorithms.q_adam.QAdamAlgorithm(q_adam_optimizer, hierarchical=True)¶
Bases:
bagua.torch_api.algorithms.Algorithm
Create an instance of the QAdam Algorithm .
- Parameters
q_adam_optimizer (QAdamOptimizer) – A QAdamOptimizer initialized with model parameters.
hierarchical (bool) – Enable hierarchical communication.
- class bagua.torch_api.algorithms.q_adam.QAdamOptimizer(params, lr=0.001, warmup_steps=100, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.0)¶
Bases:
torch.optim.optimizer.Optimizer
Create a dedicated optimizer used for QAdam algorithm.
- Parameters
params (iterable) – Iterable of parameters to optimize or dicts defining parameter groups.
lr (float) – Learning rate.
warmup_steps (int) – Number of steps to warm up by doing gradient allreduce before doing asynchronous model averaging. Use 0 to disable.
betas (Tuple[float, float]) – Coefficients used for computing running averages of gradient and its square.
eps (float) – Term added to the denominator to improve numerical stability.
weight_decay (float) – Weight decay (L2 penalty).
- step(self, closure=None)¶
- class bagua.torch_api.algorithms.Algorithm¶
This is the base class that all Bagua algorithms inherit.
It provides methods that can be override to implement different kinds of distributed algorithms.
- init_backward_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed on every parameter’s gradient computation completion.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes the name of a parameter (as in
torch.nn.Module.named_parameters
) and the parameter itself.
- init_forward_pre_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed before the forward process.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes the model’s input.
- init_operations(self, bagua_module, bucket)¶
Given a
BaguaModule
, and aBaguaBucket
, register operations to be executed on the bucket.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.bucket (bagua.torch_api.bucket.BaguaBucket) – A single bucket to register operations.
- init_post_backward_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed when the backward pass is done.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that takes no argument.
- init_post_optimizer_step_hook(self, bagua_module)¶
Given a
BaguaModule
, return a hook function that will be executed when theoptimizer.step()
is done.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A function that gets called after an optimizer’s
step()
method is called. The function takes the optimizer as its argument.
- init_tensors(self, bagua_module)¶
Given a
BaguaModule
, return Bagua tensors to be used in Bagua for later operations.- Parameters
bagua_module (bagua.torch_api.distributed.BaguaModule) – A PyTorch module initialized by
with_bagua
method.- Returns
A list of Bagua tensors for communication.
- Return type
- need_reset(self)¶
- Returns
True
if all initialization methods of the current algorithms should be called again. This is useful for algorithms that have multiple stages where each stage needs different initializations.- Return type
bool
- tensors_to_buckets(self, tensors)¶
Given the bucketing suggestion from Bagua, return the actual Bagua buckets. The default implementation follows the suggestion to do the bucketing.
- Parameters
tensors (List[List[bagua.torch_api.tensor.BaguaTensor]]) – Bagua tensors grouped in different lists, representing Bagua’s suggestion on how to bucketing the tensors.
- Returns
A list of Bagua buckets.
- Return type
bagua.torch_api.contrib¶
- class bagua.torch_api.contrib.utils.redis_store.RedisStore(hosts=None, cluster_mode=True, capacity_per_node=107374182400)¶
Bases:
bagua.torch_api.contrib.utils.store.ClusterStore
A Redis-based distributed key-value store implementation, with
set
andget
API exposed.- Parameters
hosts (List[Dict[str, str]]) – A list of redis servers, defined by a list of dict containing Redis host and port information like
[{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}]
. A new Redis instance will be spawned on each node ifhosts=None
.cluster_mode (bool) – If
True
, data is sharded across all Redis instances. Otherwise, if there are \(m\) Redis instances, the workers on the \(n\)-th node will use the \(n % m\)-th Redis instance.capacity_per_node (int) – Maximum memory limit in bytes when spawning new Redis instances. Old values will be evicted when the limit is reached. Default is
100GB
.
Note
All Bagua jobs within the same node will share the same local Redis instance if
hosts=None
. Thecapacity_per_node
only affects newly spawned Redis instances, and has no effect on existing ones.
- class bagua.torch_api.contrib.utils.store.ClusterStore(stores)¶
Bases:
Store
Base class for distributed key-value stores.
In cluster store, entries will be sharded equally among multiple store instances based on their keys.
- Parameters
stores (List[Store]) – A list of stores to shard entries on.
- class bagua.torch_api.contrib.utils.store.Store¶
Base class for key-value store implementations. Entries are added to store with
set
ormset
, and retrieved withget
ormget
.- clear(self)¶
Delete all keys in the current store.
- get(self, key)¶
Returns the value associated with
key
, orNone
if the key doesn’t exist.- Parameters
key (str) –
- Return type
Optional[Union[str, bytes]]
- mget(self, keys)¶
Retrieve each key’s corresponding value and return them in a list with the same order as
keys
.- Parameters
keys (List[str]) –
- Return type
List[Optional[Union[str, bytes]]]
- mset(self, dictionary)¶
Set multiple entries at once with a dictionary. Each key-value pair in the
dictionary
will be set.- Parameters
dictionary (Dict[str, Union[str, bytes]]) –
- num_keys(self)¶
Returns the number of keys in the current store.
- Return type
int
- set(self, key, value)¶
Set a key-value pair.
- Parameters
key (str) –
value (Union[str, bytes]) –
- shutdown(self)¶
Shutdown the managed store instances. Unmanaged instances will not be killed.
- status(self)¶
Returns
True
if the current store is alive.- Return type
bool
- class bagua.torch_api.contrib.cache_loader.CacheLoader(backend='redis', dataset_name='', writer_buffer_size=1, **kwargs)¶
Cache loader caches values calculated by an expensive function by theirs keys via
get
, so that the values can be retrieved faster next time.Internally, values are indexed by
"{dataset_name}_{key}"
and saved in a distributed key-value store, wheredataset_name
is specified on initializing, andkey
is the argument inget
.By default, cache loader uses
RedisStore
as its backend distributed key-value store implementation. It supports using a list of existing redis servers or spawning new redis servers. Parameters forRedisStore
can be provided here in**kwargs
.- Parameters
backend (str) – Backend distributed key-value store implementation. Can be
"redis"
.dataset_name (str) – Name of the dataset. Default
""
.writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.
- Example::
To use a list of existing redis servers for the “redis” backend:
>>> from bagua.torch_api.contrib import CacheLoader >>> >>> hosts = [{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}] >>> loader = CacheLoader(backend="redis", hosts=hosts, cluster_mode=True, dataset_name="test") >>> >>> loader.get(index, lambda x: items[x])
To spawn new redis servers for the “redis” backend:
>>> loader = CacheLoader(backend="redis", hosts=None, cluster_mode=True, capacity_per_node=100000000)
Note
Cache loaders with the same
dataset_name
will reuse and overwrite each other’s cache. Use a differentdataset_name
if this is not desired.- get(self, key, load_fn)¶
Returns the value associated with
key
in cache, useload_fn
to create the entry if the key does not exist in the cache.load_fn
is a function takingkey
as its argument, and returning corresponding value to be cached.- Parameters
key (str) –
load_fn (Callable[[str], None]) –
- num_keys(self)¶
Returns the number of keys in the cache.
- class bagua.torch_api.contrib.cached_dataset.CachedDataset(dataset, backend='redis', dataset_name='', writer_buffer_size=20, **kwargs)¶
Bases:
torch.utils.data.dataset.Dataset
Cached dataset wraps a PyTorch dataset to cache its samples in memory, so that accessing these samples after the first time can be much faster. This is useful when samples need tedious preprocessing to produce, or reading the dataset itself is slow, which could slow down the whole training process.
Internally, the samples are indexed by a string key
"{dataset_name}_{index}"
and saved in a distributed key-value store, wheredataset_name
is specified when initializing the cached dataset, andindex
is the index of a specific sample (the argument of__getitem__
method in a PyTorch dataset).- Parameters
dataset (torch.utils.data.dataset.Dataset) – PyTorch dataset to be wrapped.
backend (str) – Backend distributed key-value store implementation. Can be
"redis"
.dataset_name (str) – Name of the dataset. Default
""
.writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.
Example:
>>> from bagua.torch_api.contrib import CachedDataset >>> cache_dataset = CachedDataset(dataset, backend="redis", dataset_name="ds") >>> dataloader = torch.utils.data.DataLoader(cached_dataset)
Note
Cached dataset is a special case of cache loader. Parameter
backend
andwriter_buffer_size
in initializing a cached dataset have the same meanings as those in initializing a cache loader. You can provide the arguments for cache loader here in**kwargs
. See alsoCacheLoader
.- cache_loader¶
The backend cache loader instance.
- class bagua.torch_api.contrib.fused_optimizer.FusedOptimizer(optimizer, do_flatten=False)¶
Bases:
torch.optim.Optimizer
Convert any optimizer into a fused optimizer.
This fused optimizer fuses multiple module parameter update kernel launches into one or a few, by flattening parameter tensors into one or more contiguous buckets.
It can be used in conjunction with
with_bagua
method. In this case, Bagua will do the fusions automatically, otherwise, you need to explicitly setdo_flatten=True
.- Parameters
optimizer (torch.optim.Optimizer) – Any PyTorch optimizer.
do_flatten (bool) – Whether to flatten the parameters. Default:
False
.
- Returns
Fused optimizer.
- Example::
To use in conjunction with
with_bagua
method:>>> optimizer = torch.optim.Adadelta(model.parameters(), ....) >>> optimizer = bagua.torch_api.contrib.FusedOptimizer(optimizer) >>> model = model.with_bagua([optimizer], GradientAllReduceAlgorithm())
To use alone or with torch.nn.parallel.DistributedDataParallel, set
do_flatten=True
:>>> optimizer = torch.optim.Adadelta(model.parameters(), ....) >>> optimizer = bagua.torch_api.contrib.FusedOptimizer(optimizer, do_flatten=True)
- step(self, closure=None)¶
Performs a single optimization step (parameter update).
- Parameters
closure (Callable) – A closure that reevaluates the model and returns the loss. Optional for most optimizers.
Note
Unless otherwise specified, this function should not modify the
.grad
field of the parameters.
- class bagua.torch_api.contrib.load_balancing_data_loader.LoadBalancingDistributedBatchSampler(sampler, batch_fn, drop_last=False)¶
Bases:
torch.utils.data.sampler.Sampler
Wraps another load balance sampler to yield variable sized mini-batches.
- Parameters
sampler (LoadBalancingDistributedSampler) – Load balance sampler.
batch_fn (Callable) – Callable to yield mini-batch indices.
drop_last (bool) – If
True
, the sampler will drop the last few batches exceeding the least number of batches among replicas, otherwise, the number of batches on each replica will be padded to the same.
batch_fn
will have the signature of:def batch_fn(indices: List[int]) -> List[List[int]]
- Example::
>>> from bagua.torch_api.contrib import LoadBalancingDistributedSampler, \ ... LoadBalancingDistributedBatchSampler >>> >>> sampler = LoadBalancingDistributedSampler(dataset, complexity_fn=complexity_fn) >>> batch_sampler = LoadBalancingDistributedBatchSampler(sampler, batch_fn=batch_fn) >>> loader = torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler) >>> >>> for epoch in range(start_epoch, n_epochs): ... batch_sampler.set_epoch(epoch) ... train(loader)
- set_epoch(self, epoch)¶
Sets the epoch for this sampler. When
shuffle=True
, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.- Parameters
epoch (int) – Epoch number.
- Return type
None
- class bagua.torch_api.contrib.load_balancing_data_loader.LoadBalancingDistributedSampler(dataset, complexity_fn, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False, random_level=0)¶
Bases:
torch.utils.data.sampler.Sampler
Sampler that restricts data loading to a subset of the dataset.
This sampler use a
complexity_fn
to calculate each sample’s computational complexity and make each batch get similar computation complexity.This is useful in scenarios like speech and NLP, where each batch has variable length and distributed training suffers from straggler problem.
The usage is similar to torch.utils.data.DistributedSampler, where each process loads a subset of the original dataset that is exclusive to it.
Note
Dataset is assumed to be of constant size.
- Parameters
dataset (torch.utils.data.dataset.Dataset) – Dataset used for sampling.
complexity_fn (Callable) – A function whose input is a sample and output is an integer as a measure of the computational complexity of the sample.
num_replicas (int, optional) – Number of processes participating in distributed training. By default,
world_size
is retrieved from the current distributed group.rank (int, optional) – Rank of the current process within
num_replicas
. By default,rank
is retrieved from the current distributed group.shuffle (bool, optional) – If
True
(default), sampler will shuffle the indices.seed (int, optional) – random seed used to shuffle the sampler if
shuffle=True
. This number should be identical across all processes in the distributed group. Default: 0.drop_last (bool, optional) – if
True
, then the sampler will drop the tail of the data to make it evenly divisible across the number of replicas. IfFalse
, the sampler will add extra indices to make the data evenly divisible across the replicas. Default:False
.random_level (float, optional) – A float varies from 0 and 1 that controls the extent of load balance. 0 means the best load balance, while 1 means the opposite.
Warning
In distributed mode, calling the
set_epoch
method at the beginning of each epoch before creating the DataLoader iterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be always used.- Example::
Define your
complexity_fn
, which accepts a dataset sample as its input and produces an integer as the sample’s computational complexity:>>> dataset = torch.utils.data.TensorDataset(torch.randn(n, 2), torch.randperm(n)) >>> complexity_fn = lambda x: x[1]
Below is the usage of
LoadBalancingDistributedSampler
and DataLoader:>>> sampler = bagua.torch_api.contrib.LoadBalancingDistributedSampler( ... dataset, ... complexity_fn=complexity_fn) if is_distributed else None >>> loader = torch.utils.data.DataLoader(dataset, ... shuffle=(sampler is None), ... sampler=sampler) >>> >>> for epoch in range(start_epoch, n_epochs): ... if is_distributed: ... sampler.set_epoch(epoch) ... train(loader)
- set_epoch(self, epoch)¶
Sets the epoch for this sampler. When
shuffle=True
, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.- Parameters
epoch (int) – Epoch number.
- Return type
None
- bagua.torch_api.contrib.sync_batchnorm.unused¶
- class bagua.torch_api.contrib.sync_batchnorm.SyncBatchNorm(num_features, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)¶
Bases:
torch.nn.modules.batchnorm._BatchNorm
Applies synchronous BatchNorm for distributed module with N-dimensional BatchNorm layer(s). See BatchNorm for more details.
- Parameters
num_features – Number of channels \(C\) from the shape \((N, C, ...)\).
eps – A value added to the denominator for numerical stability. Default: 1e-5.
momentum – The value used for the running_mean and running_var computation. Can be set to
None
for cumulative moving average (i.e. simple average). Default: 0.1.affine – A boolean value that when set to
True
, this module has learnable affine parameters. Default:True
.track_running_stats – A boolean value that when set to
True
, this module tracks the running mean and variance, and when set toFalse
, this module does not track such statistics and always uses batch statistics in both training and eval modes. Default:True
.
Note
Only GPU input tensors are supported in the training mode.
- classmethod convert_sync_batchnorm(cls, module)¶
Helper function to convert all
BatchNorm*D
layers in the model to torch.nn.SyncBatchNorm layers.- Parameters
module (nn.Module) – Module containing one or more
BatchNorm*D
layers- Returns
The original
module
with the convertedtorch.nn.SyncBatchNorm
layers. If the originalmodule
is aBatchNorm*D
layer, a newtorch.nn.SyncBatchNorm
layer object will be returned instead.
Note
This function must be called before
with_bagua
method.- Example::
>>> # Network with nn.BatchNorm layer >>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> sync_bn_model = bagua.torch_api.contrib.sync_batchnorm.SyncBatchNorm.convert_sync_batchnorm(model) >>> bagua_model = sync_bn_model.with_bagua([optimizer], GradientAllReduce())
- forward(self, input)¶
- class bagua.torch_api.contrib.CacheLoader(backend='redis', dataset_name='', writer_buffer_size=1, **kwargs)¶
Cache loader caches values calculated by an expensive function by theirs keys via
get
, so that the values can be retrieved faster next time.Internally, values are indexed by
"{dataset_name}_{key}"
and saved in a distributed key-value store, wheredataset_name
is specified on initializing, andkey
is the argument inget
.By default, cache loader uses
RedisStore
as its backend distributed key-value store implementation. It supports using a list of existing redis servers or spawning new redis servers. Parameters forRedisStore
can be provided here in**kwargs
.- Parameters
backend (str) – Backend distributed key-value store implementation. Can be
"redis"
.dataset_name (str) – Name of the dataset. Default
""
.writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.
- Example::
To use a list of existing redis servers for the “redis” backend:
>>> from bagua.torch_api.contrib import CacheLoader >>> >>> hosts = [{"host": "192.168.1.0", "port": "7000"}, {"host": "192.168.1.1", "port": "7000"}] >>> loader = CacheLoader(backend="redis", hosts=hosts, cluster_mode=True, dataset_name="test") >>> >>> loader.get(index, lambda x: items[x])
To spawn new redis servers for the “redis” backend:
>>> loader = CacheLoader(backend="redis", hosts=None, cluster_mode=True, capacity_per_node=100000000)
Note
Cache loaders with the same
dataset_name
will reuse and overwrite each other’s cache. Use a differentdataset_name
if this is not desired.- get(self, key, load_fn)¶
Returns the value associated with
key
in cache, useload_fn
to create the entry if the key does not exist in the cache.load_fn
is a function takingkey
as its argument, and returning corresponding value to be cached.- Parameters
key (str) –
load_fn (Callable[[str], None]) –
- num_keys(self)¶
Returns the number of keys in the cache.
- class bagua.torch_api.contrib.CachedDataset(dataset, backend='redis', dataset_name='', writer_buffer_size=20, **kwargs)¶
Bases:
torch.utils.data.dataset.Dataset
Cached dataset wraps a PyTorch dataset to cache its samples in memory, so that accessing these samples after the first time can be much faster. This is useful when samples need tedious preprocessing to produce, or reading the dataset itself is slow, which could slow down the whole training process.
Internally, the samples are indexed by a string key
"{dataset_name}_{index}"
and saved in a distributed key-value store, wheredataset_name
is specified when initializing the cached dataset, andindex
is the index of a specific sample (the argument of__getitem__
method in a PyTorch dataset).- Parameters
dataset (torch.utils.data.dataset.Dataset) – PyTorch dataset to be wrapped.
backend (str) – Backend distributed key-value store implementation. Can be
"redis"
.dataset_name (str) – Name of the dataset. Default
""
.writer_buffer_size (int) – Number of samples to collect before writing to the backend key-value store. Useful for improving the backend throughput.
Example:
>>> from bagua.torch_api.contrib import CachedDataset >>> cache_dataset = CachedDataset(dataset, backend="redis", dataset_name="ds") >>> dataloader = torch.utils.data.DataLoader(cached_dataset)
Note
Cached dataset is a special case of cache loader. Parameter
backend
andwriter_buffer_size
in initializing a cached dataset have the same meanings as those in initializing a cache loader. You can provide the arguments for cache loader here in**kwargs
. See alsoCacheLoader
.- cache_loader¶
The backend cache loader instance.
- class bagua.torch_api.contrib.FusedOptimizer(optimizer, do_flatten=False)¶
Bases:
torch.optim.Optimizer
Convert any optimizer into a fused optimizer.
This fused optimizer fuses multiple module parameter update kernel launches into one or a few, by flattening parameter tensors into one or more contiguous buckets.
It can be used in conjunction with
with_bagua
method. In this case, Bagua will do the fusions automatically, otherwise, you need to explicitly setdo_flatten=True
.- Parameters
optimizer (torch.optim.Optimizer) – Any PyTorch optimizer.
do_flatten (bool) – Whether to flatten the parameters. Default:
False
.
- Returns
Fused optimizer.
- Example::
To use in conjunction with
with_bagua
method:>>> optimizer = torch.optim.Adadelta(model.parameters(), ....) >>> optimizer = bagua.torch_api.contrib.FusedOptimizer(optimizer) >>> model = model.with_bagua([optimizer], GradientAllReduceAlgorithm())
To use alone or with torch.nn.parallel.DistributedDataParallel, set
do_flatten=True
:>>> optimizer = torch.optim.Adadelta(model.parameters(), ....) >>> optimizer = bagua.torch_api.contrib.FusedOptimizer(optimizer, do_flatten=True)
- step(self, closure=None)¶
Performs a single optimization step (parameter update).
- Parameters
closure (Callable) – A closure that reevaluates the model and returns the loss. Optional for most optimizers.
Note
Unless otherwise specified, this function should not modify the
.grad
field of the parameters.
- class bagua.torch_api.contrib.LoadBalancingDistributedBatchSampler(sampler, batch_fn, drop_last=False)¶
Bases:
torch.utils.data.sampler.Sampler
Wraps another load balance sampler to yield variable sized mini-batches.
- Parameters
sampler (LoadBalancingDistributedSampler) – Load balance sampler.
batch_fn (Callable) – Callable to yield mini-batch indices.
drop_last (bool) – If
True
, the sampler will drop the last few batches exceeding the least number of batches among replicas, otherwise, the number of batches on each replica will be padded to the same.
batch_fn
will have the signature of:def batch_fn(indices: List[int]) -> List[List[int]]
- Example::
>>> from bagua.torch_api.contrib import LoadBalancingDistributedSampler, \ ... LoadBalancingDistributedBatchSampler >>> >>> sampler = LoadBalancingDistributedSampler(dataset, complexity_fn=complexity_fn) >>> batch_sampler = LoadBalancingDistributedBatchSampler(sampler, batch_fn=batch_fn) >>> loader = torch.utils.data.DataLoader(dataset, batch_sampler=batch_sampler) >>> >>> for epoch in range(start_epoch, n_epochs): ... batch_sampler.set_epoch(epoch) ... train(loader)
- set_epoch(self, epoch)¶
Sets the epoch for this sampler. When
shuffle=True
, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.- Parameters
epoch (int) – Epoch number.
- Return type
None
- class bagua.torch_api.contrib.LoadBalancingDistributedSampler(dataset, complexity_fn, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False, random_level=0)¶
Bases:
torch.utils.data.sampler.Sampler
Sampler that restricts data loading to a subset of the dataset.
This sampler use a
complexity_fn
to calculate each sample’s computational complexity and make each batch get similar computation complexity.This is useful in scenarios like speech and NLP, where each batch has variable length and distributed training suffers from straggler problem.
The usage is similar to torch.utils.data.DistributedSampler, where each process loads a subset of the original dataset that is exclusive to it.
Note
Dataset is assumed to be of constant size.
- Parameters
dataset (torch.utils.data.dataset.Dataset) – Dataset used for sampling.
complexity_fn (Callable) – A function whose input is a sample and output is an integer as a measure of the computational complexity of the sample.
num_replicas (int, optional) – Number of processes participating in distributed training. By default,
world_size
is retrieved from the current distributed group.rank (int, optional) – Rank of the current process within
num_replicas
. By default,rank
is retrieved from the current distributed group.shuffle (bool, optional) – If
True
(default), sampler will shuffle the indices.seed (int, optional) – random seed used to shuffle the sampler if
shuffle=True
. This number should be identical across all processes in the distributed group. Default: 0.drop_last (bool, optional) – if
True
, then the sampler will drop the tail of the data to make it evenly divisible across the number of replicas. IfFalse
, the sampler will add extra indices to make the data evenly divisible across the replicas. Default:False
.random_level (float, optional) – A float varies from 0 and 1 that controls the extent of load balance. 0 means the best load balance, while 1 means the opposite.
Warning
In distributed mode, calling the
set_epoch
method at the beginning of each epoch before creating the DataLoader iterator is necessary to make shuffling work properly across multiple epochs. Otherwise, the same ordering will be always used.- Example::
Define your
complexity_fn
, which accepts a dataset sample as its input and produces an integer as the sample’s computational complexity:>>> dataset = torch.utils.data.TensorDataset(torch.randn(n, 2), torch.randperm(n)) >>> complexity_fn = lambda x: x[1]
Below is the usage of
LoadBalancingDistributedSampler
and DataLoader:>>> sampler = bagua.torch_api.contrib.LoadBalancingDistributedSampler( ... dataset, ... complexity_fn=complexity_fn) if is_distributed else None >>> loader = torch.utils.data.DataLoader(dataset, ... shuffle=(sampler is None), ... sampler=sampler) >>> >>> for epoch in range(start_epoch, n_epochs): ... if is_distributed: ... sampler.set_epoch(epoch) ... train(loader)
- set_epoch(self, epoch)¶
Sets the epoch for this sampler. When
shuffle=True
, this ensures all replicas use a different random ordering for each epoch. Otherwise, the next iteration of this sampler will yield the same ordering.- Parameters
epoch (int) – Epoch number.
- Return type
None
Submodules¶
bagua.torch_api.bucket¶
- class bagua.torch_api.bucket.BaguaBucket(tensors, name, flatten, alignment=1)¶
Create a Bagua bucket with a list of Bagua tensors.
- Parameters
tensors (List[bagua.torch_api.tensor.BaguaTensor]) – A list of Bagua tensors to be put in the bucket.
name (str) – The unique name of the bucket.
flatten (bool) – If
True
, flatten the input tensors so that they are contiguous in memory.alignment (int) – If
alignment > 1
, Bagua will create a padding tensor to the bucket so that the total number of elements in the bucket divides the given alignment.
- name¶
The bucket’s name.
- tensors¶
The tensors contained within the bucket.
- append_asynchronous_model_average_op(self, peer_selection_mode)¶
Append an asynchronous model average operation to a bucket. This operation will enable continuous model averaging between workers while training a model.
The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
This operation is intended to run in parallel with the computation process. It returns a reference to the op. The op features a lock to exclusively access the model. Call
op.lock_weight()
to acquire the lock andop.unlock_weight()
to release it.- Parameters
peer_selection_mode (str) – The way how workers communicate with each otehr. Currently
"all"
is supported."all"
means all workers’ weights are averaged during each communication.- Returns
The asynchronous model average operation itself.
- append_centralized_synchronous_op(self, hierarchical=False, average=True, scattergather=False, compression=None)¶
Append a centralized synchronous operation to a bucket. It will sum or average the tensors in the bucket for all workers.
The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.
average (bool) – If
True
, the gradients on each worker are averaged. Otherwise, they are summed.scattergather (bool) – If
True
, the communication between workers are done with scatter gather instead of allreduce. This is required for using compression.compression (Optional[str]) – If not
None
, the tensors will be compressed for communication. Currently"MinMaxUInt8"
is supported.
- append_decentralized_synchronous_op(self, peer_weight, hierarchical=True, peer_selection_mode='all')¶
Append a decentralized synchronous operation to a bucket. It will do gossipy style model averaging among workers.
This operation is not inplace, which means the bucket weights is first copied to
peer_weight
, and the result of decentralized averaging will be inpeer_weight
. To copypeer_weight
back toself
, calldecentralized_synchronous_op_copy_back_peer_weight
.This operation will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
peer_weight (BaguaTensor) – A tensor used for averaging model with peers, should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().to_bagua_tensor(...)
to create such a tensor.hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.
peer_selection_mode (str) – Can be
"all"
or"shift_one"
."all"
means all workers’ weights are averaged in each communication step."shift_one"
means each worker selects a different peer to do weights average in each communication step.
- append_low_precision_decentralized_synchronous_op(self, weight, left_peer_weight, right_peer_weight, hierarchical=True, compression='MinMaxUInt8')¶
Append a low precision decentralized synchronous operation to a bucket. It will compress the difference of local models between two successive iterations and exchange them among workers.
The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
weight (BaguaTensor) – Model replica of current worker’s local model. It should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().to_bagua_tensor(...)
to create such a tensor.left_peer_weight (BaguaTensor) – Model replica of current worker’s left peer. It should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().to_bagua_tensor(...)
to create such a tensor, then copy the initializing weights of current worker’s left peer to the tensor.right_peer_weight (BaguaTensor) – Model replica of current worker’s right peer. It should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().to_bagua_tensor(...)
to create such a tensor. then copy the initializing weights of current worker’s right peer to the tensor.hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.
compression (str) – The way how tensors are compressed for communication. Currently
"MinMaxUInt8"
is supported.
- append_python_op(self, python_function)¶
Append a Python operation to a bucket. A Python operation is a Python function that takes the bucket’s name and returns
None
. It can do arbitrary things within the function body.The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
python_function (Callable[[str], None]) – The Python operation function.
- bytes(self)¶
Returns the total number of bytes occupied by the bucket.
- Return type
int
- check_flatten(self)¶
- Returns
True if the bucket’s tensors are contiguous in memory.
- Return type
bool
- clear_ops(self)¶
Clear the previously appended operations.
- Return type
- decentralized_synchronous_op_copy_back_peer_weight(self, peer_weight, hierarchical=True)¶
Copy
peer_weight
back to bucket weights to end a decentralized synchronous operation. Seeappend_decentralized_synchronous_op
for more information.- Parameters
peer_weight (BaguaTensor) – A tensor used for averaging model with peers, should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().to_bagua_tensor(...)
to create such a tensor.hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high. Must be the same with
hierarchical
argument inappend_decentralized_synchronous_op
.
- flattened_tensor(self)¶
Returns a tensor contiguous in memory which contains the same data as
self
tensors and padding tensor (if exists).- Return type
bagua.torch_api.communication¶
- class bagua.torch_api.communication.ReduceOp¶
Bases:
enum.IntEnum
An enum-like class for available reduction operations:
SUM
,PRODUCT
,MIN
,MAX
,BAND
,BOR
,BXOR
andAVG
.Initialize self. See help(type(self)) for accurate signature.
- AVG = 10¶
- BAND = 8¶
- BOR = 7¶
- BXOR = 9¶
- MAX = 3¶
- MIN = 2¶
- PRODUCT = 1¶
- SUM = 0¶
- bagua.torch_api.communication.allgather(send_tensor, recv_tensor, comm=None)¶
Gathers send tensors from all processes associated with the communicator into
recv_tensor
.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have a size of
comm.nranks * send_tensor.size()
elements.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.allgather_inplace(tensor, comm=None)¶
The in-place version of
allgather
.- Parameters
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.communication.allreduce(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)¶
Reduces the tensor data across all processes associated with the communicator in such a way that all get the final result. After the call
recv_tensor
is going to be bitwise identical in all processes.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have the same size with
send_tensor
.op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
Examples:
>>> from bagua.torch_api import allreduce >>> >>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> send_tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> recv_tensor = torch.zeros(2, dtype=torch.int64) >>> send_tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> allreduce(send_tensor, recv_tensor) >>> recv_tensor tensor([4, 6]) # Rank 0 tensor([4, 6]) # Rank 1 >>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> send_tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> recv_tensor = torch.zeros(2, dtype=torch.cfloat) >>> send_tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> allreduce(send_tensor, recv_tensor) >>> recv_tensor tensor([4.+4.j, 6.+6.j]) # Rank 0 tensor([4.+4.j, 6.+6.j]) # Rank 1
- bagua.torch_api.communication.allreduce_inplace(tensor, op=ReduceOp.SUM, comm=None)¶
The in-place version of
allreduce
.- Parameters
op (ReduceOp) –
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.communication.alltoall(send_tensor, recv_tensor, comm=None)¶
Each process scatters
send_tensor
to all processes associated with the communicator and return the gathered data inrecv_tensor
.- Parameters
send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by
comm.nranks
.recv_tensor (torch.Tensor) – Output of the collective, must have equal size with
send_tensor
.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.alltoall_inplace(tensor, comm=None)¶
The in-place version of
alltoall
.- Parameters
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.communication.broadcast(tensor, src=0, comm=None)¶
Broadcasts the tensor to all processes associated with the communicator.
tensor
must have the same number of elements in all processes participating in the collective.- Parameters
tensor (torch.Tensor) – Data to be sent if
src
is the rank of current process, and tensor to be used to save received data otherwise.src (int, optional) – Source rank. Default: 0.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
, the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.gather(send_tensor, recv_tensor, dst, comm=None)¶
Gathers send tensors from all processes associated with the communicator to
recv_tensor
in a single process.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have a size of
comm.nranks * send_tensor.size()
elements.dst (int) – Destination rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.gather_inplace(tensor, count, dst, comm=None)¶
The in-place version of
gather
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, On the
dst
rank, it must have a size ofcomm.nranks * count
elements. On non-dst ranks, its size must be equal to :attr:count
.count (int) – The per-rank data count to gather.
dst (int) – Destination rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.init_process_group()¶
Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of Bagua.
- Examples::
>>> import torch >>> import bagua.torch_api as bagua >>> >>> torch.cuda.set_device(bagua.get_local_rank()) >>> bagua.init_process_group() >>> >>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model = model.with_bagua([optimizer], ...)
- bagua.torch_api.communication.recv(tensor, src, comm=None)¶
Receives a tensor synchronously.
- Parameters
tensor (torch.Tensor) – Tensor to fill with received data.
src (int) – Source rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
, the global Bagua communicator will be used.
- bagua.torch_api.communication.reduce(send_tensor, recv_tensor, dst, op=ReduceOp.SUM, comm=None)¶
Reduces the tensor data across all processes.
Only the process whit rank
dst
is going to receive the final result.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have the same size with
send_tensor
.dst (int) – Destination rank.
op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.reduce_inplace(tensor, dst, op=ReduceOp.SUM, comm=None)¶
The in-place version of
reduce
.- Parameters
op (ReduceOp) –
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.communication.reduce_scatter(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)¶
Reduces, then scatters
send_tensor
to all processes associated with the communicator.- Parameters
send_tensor (torch.Tensor) – Input of the collective, must have a size of
comm.nranks * recv_tensor.size()
elements.recv_tensor (torch.Tensor) – Output of the collective.
op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.reduce_scatter_inplace(tensor, op=ReduceOp.SUM, comm=None)¶
The in-place version of
reduce_scatter
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, the size must be divisible by
comm.nranks
.op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.scatter(send_tensor, recv_tensor, src, comm=None)¶
Scatters send tensor to all processes associated with the communicator.
- Parameters
send_tensor (torch.Tensor) – Input of the collective, must have a size of
comm.nranks * recv_tensor.size()
elements.recv_tensor (torch.Tensor) – Output of the collective.
src (int) – Source rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.scatter_inplace(tensor, count, src, comm=None)¶
The in-place version of
scatter
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, On the
src
rank, it must have a size ofcomm.nranks * count
elements. On non-src ranks, its size must be equal tocount
.count (int) – The per-rank data count to scatter.
src (int) – Source rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.communication.send(tensor, dst, comm=None)¶
Sends a tensor to
dst
synchronously.- Parameters
tensor (torch.Tensor) – Tensor to send.
dst (int) – Destination rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
, the global Bagua communicator will be used.
bagua.torch_api.distributed¶
- class bagua.torch_api.distributed.BaguaModule¶
This class patches torch.nn.Module with several methods to enable Bagua functionalities.
- Variables
bagua_optimizers (List[torch.optim.Optimizer]) – The optimizers passed in by
with_bagua
.bagua_algorithm (bagua.torch_api.algorithms.Algorithm) – The algorithm passed in by
with_bagua
.parameters_to_ignore (List[str]) – The parameter names in
"{module_name}.{param_name}"
format to ignore when callingself.bagua_build_params()
.bagua_train_step_counter (int) – Number of iterations in training mode.
bagua_buckets (List[bagua.torch_api.bucket.BaguaBucket]) – All Bagua buckets in a list.
- bagua_build_params(self)¶
Build tuple of
(parameter_name, parameter)
for all parameters that require grads and not in the_bagua_params_and_buffers_to_ignore
attribute.- Return type
List[Tuple[str, torch.nn.Parameter]]
- with_bagua(self, optimizers, algorithm)¶
with_bagua
enables easy distributed data parallel training on a torch.nn.Module.- Parameters
optimizers (List[torch.optim.Optimizer]) – Optimizer(s) used by the module. It can contain one or more PyTorch optimizers.
algorithm (bagua.torch_api.algorithms.Algorithm) – Distributed algorithm used to do the actual communication and update.
- Returns
The original module, with Bagua related environments initialized.
- Return type
Note
If we want to ignore some layers for communication, we can first check these layer’s corresponding keys in the module’s
state_dict
(they are in"{module_name}.{param_name}"
format), then assign the list of keys toyour_module._bagua_params_and_buffers_to_ignore
.Examples:
>>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model = model.with_bagua( ... [optimizer], ... GradientAllReduce() ... )
bagua.torch_api.env¶
- bagua.torch_api.env.get_default_bucket_size()¶
Get default communication bucket byte size.
- Returns
The default bucket size.
- Return type
int
- bagua.torch_api.env.get_local_rank()¶
Get the rank of current node.
Local rank is a unique identifier assigned to each process within a node. They are always consecutive integers ranging from 0 to
local_size
.- Returns
The local rank of the node.
- Return type
int
- bagua.torch_api.env.get_local_size()¶
Get the number of processes in the node.
- Returns
The local size of the node.
- Return type
int
- bagua.torch_api.env.get_rank()¶
Get the rank of current process group.
Rank is a unique identifier assigned to each process within a distributed process group. They are always consecutive integers ranging from 0 to
world_size
.- Returns
The rank of the process group.
- Return type
int
- bagua.torch_api.env.get_world_size()¶
Get the number of processes in the current process group.
- Returns
The world size of the process group.
- Return type
int
bagua.torch_api.tensor¶
- class bagua.torch_api.tensor.BaguaTensor¶
This class patch torch.Tensor with additional methods.
- bagua_backend_tensor(self)¶
- Returns
The raw Bagua backend tensor.
- Return type
bagua_core.BaguaTensorPy
- bagua_ensure_grad(self)¶
Return the gradient of current parameter. Create a zero gradient tensor if not exist.
- Return type
torch.Tensor
- bagua_mark_communication_ready(self)¶
Mark a Bagua tensor ready for scheduled operations execution.
- bagua_mark_communication_ready_without_synchronization(self)¶
Mark a Bagua tensor ready immediately, without CUDA event synchronization.
- bagua_set_storage(self, storage, storage_offset=0)¶
Sets the underlying storage using an existing torch.Storage.
- Parameters
storage (torch.Storage) – The storage to use.
storage_offset (int) – The offset in the storage.
- ensure_bagua_tensor(self, name=None, module_name=None)¶
Convert a PyTorch tensor or parameter to Bagua tensor inplace and return it. A Bagua tensor is required to use Bagua’s communication algorithms.
- Parameters
name (Optional[str]) – The unique name of the tensor.
module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using
model.bagua_module_name
. This is required to callbagua_mark_communication_ready
related methods.
- Returns
The original tensor with Bagua tensor attributes initialized.
- is_bagua_tensor(self)¶
- Return type
bool
- to_bagua_tensor(self, name=None, module_name=None)¶
Create a new Bagua tensor from a PyTorch tensor or parameter and return it. The original tensor is not changed. A Bagua tensor is required to use Bagua’s communication algorithms.
- Parameters
name (Optional[str]) – The unique name of the tensor.
module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using
model.bagua_module_name
. This is required to callbagua_mark_communication_ready
related methods.
- Returns
The new Bagua tensor sharing the same storage with the original tensor.
Package Contents¶
- bagua.torch_api.version¶
- class bagua.torch_api.BaguaModule¶
This class patches torch.nn.Module with several methods to enable Bagua functionalities.
- Variables
bagua_optimizers (List[torch.optim.Optimizer]) – The optimizers passed in by
with_bagua
.bagua_algorithm (bagua.torch_api.algorithms.Algorithm) – The algorithm passed in by
with_bagua
.parameters_to_ignore (List[str]) – The parameter names in
"{module_name}.{param_name}"
format to ignore when callingself.bagua_build_params()
.bagua_train_step_counter (int) – Number of iterations in training mode.
bagua_buckets (List[bagua.torch_api.bucket.BaguaBucket]) – All Bagua buckets in a list.
- bagua_build_params(self)¶
Build tuple of
(parameter_name, parameter)
for all parameters that require grads and not in the_bagua_params_and_buffers_to_ignore
attribute.- Return type
List[Tuple[str, torch.nn.Parameter]]
- with_bagua(self, optimizers, algorithm)¶
with_bagua
enables easy distributed data parallel training on a torch.nn.Module.- Parameters
optimizers (List[torch.optim.Optimizer]) – Optimizer(s) used by the module. It can contain one or more PyTorch optimizers.
algorithm (bagua.torch_api.algorithms.Algorithm) – Distributed algorithm used to do the actual communication and update.
- Returns
The original module, with Bagua related environments initialized.
- Return type
Note
If we want to ignore some layers for communication, we can first check these layer’s corresponding keys in the module’s
state_dict
(they are in"{module_name}.{param_name}"
format), then assign the list of keys toyour_module._bagua_params_and_buffers_to_ignore
.Examples:
>>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model = model.with_bagua( ... [optimizer], ... GradientAllReduce() ... )
- class bagua.torch_api.BaguaTensor¶
This class patch torch.Tensor with additional methods.
- bagua_backend_tensor(self)¶
- Returns
The raw Bagua backend tensor.
- Return type
bagua_core.BaguaTensorPy
- bagua_ensure_grad(self)¶
Return the gradient of current parameter. Create a zero gradient tensor if not exist.
- Return type
torch.Tensor
- bagua_mark_communication_ready(self)¶
Mark a Bagua tensor ready for scheduled operations execution.
- bagua_mark_communication_ready_without_synchronization(self)¶
Mark a Bagua tensor ready immediately, without CUDA event synchronization.
- bagua_set_storage(self, storage, storage_offset=0)¶
Sets the underlying storage using an existing torch.Storage.
- Parameters
storage (torch.Storage) – The storage to use.
storage_offset (int) – The offset in the storage.
- ensure_bagua_tensor(self, name=None, module_name=None)¶
Convert a PyTorch tensor or parameter to Bagua tensor inplace and return it. A Bagua tensor is required to use Bagua’s communication algorithms.
- Parameters
name (Optional[str]) – The unique name of the tensor.
module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using
model.bagua_module_name
. This is required to callbagua_mark_communication_ready
related methods.
- Returns
The original tensor with Bagua tensor attributes initialized.
- is_bagua_tensor(self)¶
- Return type
bool
- to_bagua_tensor(self, name=None, module_name=None)¶
Create a new Bagua tensor from a PyTorch tensor or parameter and return it. The original tensor is not changed. A Bagua tensor is required to use Bagua’s communication algorithms.
- Parameters
name (Optional[str]) – The unique name of the tensor.
module_name (Optional[str]) – The name of the model of which the tensor belongs to. The model name can be acquired using
model.bagua_module_name
. This is required to callbagua_mark_communication_ready
related methods.
- Returns
The new Bagua tensor sharing the same storage with the original tensor.
- class bagua.torch_api.ReduceOp¶
Bases:
enum.IntEnum
An enum-like class for available reduction operations:
SUM
,PRODUCT
,MIN
,MAX
,BAND
,BOR
,BXOR
andAVG
.Initialize self. See help(type(self)) for accurate signature.
- AVG = 10¶
- BAND = 8¶
- BOR = 7¶
- BXOR = 9¶
- MAX = 3¶
- MIN = 2¶
- PRODUCT = 1¶
- SUM = 0¶
- bagua.torch_api.allgather(send_tensor, recv_tensor, comm=None)¶
Gathers send tensors from all processes associated with the communicator into
recv_tensor
.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have a size of
comm.nranks * send_tensor.size()
elements.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.allgather_inplace(tensor, comm=None)¶
The in-place version of
allgather
.- Parameters
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.allreduce(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)¶
Reduces the tensor data across all processes associated with the communicator in such a way that all get the final result. After the call
recv_tensor
is going to be bitwise identical in all processes.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have the same size with
send_tensor
.op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
Examples:
>>> from bagua.torch_api import allreduce >>> >>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> send_tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> recv_tensor = torch.zeros(2, dtype=torch.int64) >>> send_tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> allreduce(send_tensor, recv_tensor) >>> recv_tensor tensor([4, 6]) # Rank 0 tensor([4, 6]) # Rank 1 >>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> send_tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> recv_tensor = torch.zeros(2, dtype=torch.cfloat) >>> send_tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> allreduce(send_tensor, recv_tensor) >>> recv_tensor tensor([4.+4.j, 6.+6.j]) # Rank 0 tensor([4.+4.j, 6.+6.j]) # Rank 1
- bagua.torch_api.allreduce_inplace(tensor, op=ReduceOp.SUM, comm=None)¶
The in-place version of
allreduce
.- Parameters
op (ReduceOp) –
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.alltoall(send_tensor, recv_tensor, comm=None)¶
Each process scatters
send_tensor
to all processes associated with the communicator and return the gathered data inrecv_tensor
.- Parameters
send_tensor (torch.Tensor) – Input of the collective, the size must be divisible by
comm.nranks
.recv_tensor (torch.Tensor) – Output of the collective, must have equal size with
send_tensor
.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.alltoall_inplace(tensor, comm=None)¶
The in-place version of
alltoall
.- Parameters
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.broadcast(tensor, src=0, comm=None)¶
Broadcasts the tensor to all processes associated with the communicator.
tensor
must have the same number of elements in all processes participating in the collective.- Parameters
tensor (torch.Tensor) – Data to be sent if
src
is the rank of current process, and tensor to be used to save received data otherwise.src (int, optional) – Source rank. Default: 0.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
, the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.gather(send_tensor, recv_tensor, dst, comm=None)¶
Gathers send tensors from all processes associated with the communicator to
recv_tensor
in a single process.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have a size of
comm.nranks * send_tensor.size()
elements.dst (int) – Destination rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.gather_inplace(tensor, count, dst, comm=None)¶
The in-place version of
gather
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, On the
dst
rank, it must have a size ofcomm.nranks * count
elements. On non-dst ranks, its size must be equal to :attr:count
.count (int) – The per-rank data count to gather.
dst (int) – Destination rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.get_backend(model_name)¶
- Parameters
model_name (str) –
- bagua.torch_api.get_local_rank()¶
Get the rank of current node.
Local rank is a unique identifier assigned to each process within a node. They are always consecutive integers ranging from 0 to
local_size
.- Returns
The local rank of the node.
- Return type
int
- bagua.torch_api.get_local_size()¶
Get the number of processes in the node.
- Returns
The local size of the node.
- Return type
int
- bagua.torch_api.get_rank()¶
Get the rank of current process group.
Rank is a unique identifier assigned to each process within a distributed process group. They are always consecutive integers ranging from 0 to
world_size
.- Returns
The rank of the process group.
- Return type
int
- bagua.torch_api.get_world_size()¶
Get the number of processes in the current process group.
- Returns
The world size of the process group.
- Return type
int
- bagua.torch_api.init_process_group()¶
Initializes the PyTorch builtin distributed process group, and this will also initialize the distributed package, should be executed before all the APIs of Bagua.
- Examples::
>>> import torch >>> import bagua.torch_api as bagua >>> >>> torch.cuda.set_device(bagua.get_local_rank()) >>> bagua.init_process_group() >>> >>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model = model.with_bagua([optimizer], ...)
- bagua.torch_api.recv(tensor, src, comm=None)¶
Receives a tensor synchronously.
- Parameters
tensor (torch.Tensor) – Tensor to fill with received data.
src (int) – Source rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
, the global Bagua communicator will be used.
- bagua.torch_api.reduce(send_tensor, recv_tensor, dst, op=ReduceOp.SUM, comm=None)¶
Reduces the tensor data across all processes.
Only the process whit rank
dst
is going to receive the final result.- Parameters
send_tensor (torch.Tensor) – Input of the collective.
recv_tensor (torch.Tensor) – Output of the collective, must have the same size with
send_tensor
.dst (int) – Destination rank.
op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.reduce_inplace(tensor, dst, op=ReduceOp.SUM, comm=None)¶
The in-place version of
reduce
.- Parameters
op (ReduceOp) –
comm (bagua_core.BaguaSingleCommunicatorPy) –
- bagua.torch_api.reduce_scatter(send_tensor, recv_tensor, op=ReduceOp.SUM, comm=None)¶
Reduces, then scatters
send_tensor
to all processes associated with the communicator.- Parameters
send_tensor (torch.Tensor) – Input of the collective, must have a size of
comm.nranks * recv_tensor.size()
elements.recv_tensor (torch.Tensor) – Output of the collective.
op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.reduce_scatter_inplace(tensor, op=ReduceOp.SUM, comm=None)¶
The in-place version of
reduce_scatter
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, the size must be divisible by
comm.nranks
.op (ReduceOp, optional) – One of the values from
ReduceOp
enum. Specifies an operation used for element-wise reductions.comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.scatter(send_tensor, recv_tensor, src, comm=None)¶
Scatters send tensor to all processes associated with the communicator.
- Parameters
send_tensor (torch.Tensor) – Input of the collective, must have a size of
comm.nranks * recv_tensor.size()
elements.recv_tensor (torch.Tensor) – Output of the collective.
src (int) – Source rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.scatter_inplace(tensor, count, src, comm=None)¶
The in-place version of
scatter
.- Parameters
tensor (torch.Tensor) – Input and output of the collective, On the
src
rank, it must have a size ofcomm.nranks * count
elements. On non-src ranks, its size must be equal tocount
.count (int) – The per-rank data count to scatter.
src (int) – Source rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
the global Bagua communicator will be used. Default:None
.
- bagua.torch_api.send(tensor, dst, comm=None)¶
Sends a tensor to
dst
synchronously.- Parameters
tensor (torch.Tensor) – Tensor to send.
dst (int) – Destination rank.
comm (B.BaguaSingleCommunicatorPy, optional) – The Bagua communicator to work on. If
None
, the global Bagua communicator will be used.