bagua.torch_api.bucket¶
Module Contents¶
- class bagua.torch_api.bucket.BaguaBucket(tensors, name, flatten, alignment=1)¶
Create a Bagua bucket with a list of Bagua tensors.
- Parameters
tensors (List[bagua.torch_api.tensor.BaguaTensor]) – A list of Bagua tensors to be put in the bucket.
name (str) – The unique name of the bucket.
flatten (bool) – If
True
, flatten the input tensors so that they are contiguous in memory.alignment (int) – If
alignment > 1
, Bagua will create a padding tensor to the bucket so that the total number of elements in the bucket divides the given alignment.
- name¶
The bucket’s name.
- tensors¶
The tensors contained within the bucket.
- append_asynchronous_model_average_op(self, peer_selection_mode)¶
Append an asynchronous model average operation to a bucket. This operation will enable continuous model averaging between workers while training a model.
The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
This operation is intended to run in parallel with the computation process. It returns a reference to the op. The op features a lock to exclusively access the model. Call
op.lock_weight()
to acquire the lock andop.unlock_weight()
to release it.- Parameters
peer_selection_mode (str) – The way how workers communicate with each otehr. Currently
"all"
is supported."all"
means all workers’ weights are averaged during each communication.- Returns
The asynchronous model average operation itself.
- append_centralized_synchronous_op(self, hierarchical=False, average=True, scattergather=False, compression=None)¶
Append a centralized synchronous operation to a bucket. It will sum or average the tensors in the bucket for all workers.
The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.
average (bool) – If
True
, the gradients on each worker are averaged. Otherwise, they are summed.scattergather (bool) – If
True
, the communication between workers are done with scatter gather instead of allreduce. This is required for using compression.compression (Optional[str]) – If not
None
, the tensors will be compressed for communication. Currently"MinMaxUInt8"
is supported.
- append_decentralized_synchronous_op(self, peer_weight, hierarchical=True, peer_selection_mode='all')¶
Append a decentralized synchronous operation to a bucket. It will do gossipy style model averaging among workers.
This operation is not inplace, which means the bucket weights is first copied to
peer_weight
, and the result of decentralized averaging will be inpeer_weight
. To copypeer_weight
back toself
, calldecentralized_synchronous_op_copy_back_peer_weight
.This operation will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
peer_weight (BaguaTensor) – A tensor used for averaging model with peers, should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().to_bagua_tensor(...)
to create such a tensor.hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.
peer_selection_mode (str) – Can be
"all"
or"shift_one"
."all"
means all workers’ weights are averaged in each communication step."shift_one"
means each worker selects a different peer to do weights average in each communication step.
- append_low_precision_decentralized_synchronous_op(self, weight, left_peer_weight, right_peer_weight, hierarchical=True, compression='MinMaxUInt8')¶
Append a low precision decentralized synchronous operation to a bucket. It will compress the difference of local models between two successive iterations and exchange them among workers.
The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
weight (BaguaTensor) – Model replica of current worker’s local model. It should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().to_bagua_tensor(...)
to create such a tensor.left_peer_weight (BaguaTensor) – Model replica of current worker’s left peer. It should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().to_bagua_tensor(...)
to create such a tensor, then copy the initializing weights of current worker’s left peer to the tensor.right_peer_weight (BaguaTensor) – Model replica of current worker’s right peer. It should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().to_bagua_tensor(...)
to create such a tensor. then copy the initializing weights of current worker’s right peer to the tensor.hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high.
compression (str) – The way how tensors are compressed for communication. Currently
"MinMaxUInt8"
is supported.
- append_python_op(self, python_function)¶
Append a Python operation to a bucket. A Python operation is a Python function that takes the bucket’s name and returns
None
. It can do arbitrary things within the function body.The operations will be executed by the Bagua backend in the order they are appended when all the tensors within the bucket are marked ready.
- Parameters
python_function (Callable[[str], None]) – The Python operation function.
- bytes(self)¶
Returns the total number of bytes occupied by the bucket.
- Return type
int
- check_flatten(self)¶
- Returns
True if the bucket’s tensors are contiguous in memory.
- Return type
bool
- clear_ops(self)¶
Clear the previously appended operations.
- Return type
- decentralized_synchronous_op_copy_back_peer_weight(self, peer_weight, hierarchical=True)¶
Copy
peer_weight
back to bucket weights to end a decentralized synchronous operation. Seeappend_decentralized_synchronous_op
for more information.- Parameters
peer_weight (BaguaTensor) – A tensor used for averaging model with peers, should be of the same size with the bucket tensors total size. Use
self.flattened_tensor().to_bagua_tensor(...)
to create such a tensor.hierarchical (bool) – Enable hierarchical communication. Which means the GPUs on the same machine will communicate will each other first. After that, machines do inter-node communication. This can boost performance when the inter-node communication cost is high. Must be the same with
hierarchical
argument inappend_decentralized_synchronous_op
.
- flattened_tensor(self)¶
Returns a tensor contiguous in memory which contains the same data as
self
tensors and padding tensor (if exists).- Return type