bagua.torch_api.bucket

Module Contents

class bagua.torch_api.bucket.BaguaBucket(tensors, name, flatten, alignment=1)

Create a Bagua bucket with a list of Bagua tensors.

Parameters
  • tensors (List[bagua.torch_api.tensor.BaguaTensor]) – A list of Bagua tensors to be put in the bucket.

  • name (str) – The unique name of the bucket.

  • flatten (bool) – If True, flatten the input tensors so that they are contiguous in memory.

  • alignment (int) – If alignment > 1, Bagua will create a padding tensor to the bucket so that the total number of elements in the bucket divides the given alignment.

name

The bucket’s name.

tensors

The Bagua tensors contained in the bucket.

append_asynchronous_model_average_op(self, peer_selection_mode, group=None)

Append an asynchronous model average operation to a bucket. This operation will enable continuous model averaging between workers while training a model.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

This operation is intended to run in parallel with the computation process. It returns a reference to the op. The op features a lock to exclusively access the model. Call op.lock_weight() to acquire the lock and op.unlock_weight() to release it.

Parameters
  • peer_selection_mode (str) – The way how workers communicate with each otehr. Currently "all" is supported. "all" means all workers’ weights are averaged during each communication.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

Returns

The asynchronous model average operation itself.

append_centralized_synchronous_op(self, hierarchical=False, average=True, scattergather=False, compression=None, group=None)

Append a centralized synchronous operation to a bucket. It will sum or average the tensors in the bucket for all workers.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters
  • hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.

  • average (bool) – If True, the gradients on each worker are averaged. Otherwise, they are summed.

  • scattergather (bool) – If True, the communication between workers are done with scatter gather instead of allreduce. This is required for using compression.

  • compression (Optional[str]) – If not None, the tensors will be compressed for communication. Currently "MinMaxUInt8" is supported.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

append_decentralized_synchronous_op(self, peer_weight, hierarchical=True, peer_selection_mode='all', group=None)

Append a decentralized synchronous operation to a bucket. It will do gossipy style model averaging among workers.

This operation is not inplace, which means the bucket weights is first copied to peer_weight, and the result of decentralized averaging will be in peer_weight. To copy peer_weight back to self, call op.copy_back_peer_weight(self).

This operation will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters
  • peer_weight (BaguaTensor) – A tensor used for averaging model with peers, should be of the same size with the bucket tensors total size. Use self.flattened_tensor().ensure_bagua_tensor(...) to create such a tensor.

  • hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.

  • peer_selection_mode (str) – Can be "all" or "shift_one". "all" means all workers’ weights are averaged in each communication step. "shift_one" means each worker selects a different peer to do weights average in each communication step.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

Returns

The decentralized synchronous operation itself.

append_low_precision_decentralized_synchronous_op(self, weight, left_peer_weight, right_peer_weight, hierarchical=True, compression='MinMaxUInt8', group=None)

Append a low precision decentralized synchronous operation to a bucket. It will compress the difference of local models between two successive iterations and exchange them among workers.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters
  • weight (BaguaTensor) – Model replica of current worker’s local model. It should be of the same size with the bucket tensors total size. Use self.flattened_tensor().ensure_bagua_tensor(...) to create such a tensor.

  • left_peer_weight (BaguaTensor) – Model replica of current worker’s left peer. It should be of the same size with the bucket tensors total size. Use self.flattened_tensor().ensure_bagua_tensor(...) to create such a tensor, then copy the initializing weights of current worker’s left peer to the tensor.

  • right_peer_weight (BaguaTensor) – Model replica of current worker’s right peer. It should be of the same size with the bucket tensors total size. Use self.flattened_tensor().ensure_bagua_tensor(...) to create such a tensor. then copy the initializing weights of current worker’s right peer to the tensor.

  • hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.

  • compression (str) – The way how tensors are compressed for communication. Currently "MinMaxUInt8" is supported.

  • group (Optional[bagua.torch_api.communication.BaguaProcessGroup]) – The process group to work on. If None, the default process group will be used.

append_python_op(self, python_function, group=None)

Append a Python operation to a bucket. A Python operation is a Python function that takes the bucket’s name and returns None. It can do arbitrary things within the function body.

The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.

Parameters
bytes(self)

Returns the total number of bytes occupied by the bucket.

Return type

int

check_flatten(self)
Returns

True if effective tensors are contiguous in memory.

Return type

bool

clear_ops(self)

Clear the previously appended operations.

Return type

BaguaBucket

flattened_tensor(self)

Returns a tensor contiguous in memory which contains the same data as effective tensors, i.e. returned by calling bagua_getter_closure on self tensors and padding tensor (if exists).

Return type

torch.Tensor