bagua.torch_api.distributed¶
Module Contents¶
- class bagua.torch_api.distributed.DistributedModule(module)¶
Bases:
torch.nn.Module
A base class for distributed module.
- unwrap(self)¶
Return the unwraped module.
- forward(self, *inputs, **kwargs)¶
Execute the forward process and return the output.
- class bagua.torch_api.distributed.Reducer(module, optimizers, bucket_type, hierarchical_reduce, align_bytes, chunking, fusion, decentralize_reduce=False, buckets=[], **kwargs)¶
Bases:
object
In order to improve communication efficiency, the distributed algorithm chunks parameters into many buckets. A bucket is the minimum unit of communication between devices in bagua. This module is the bucket manager, providing bucket operation methods.
The process mainly consists the following two cases:
- bucket_initialized is False:
1.1 add_param
1.2 initialize_buckets -> register_models
1.3 mark_bucket_ready
1.4 mark_on_complete
- bucket_initialized is True:
2.1 mark_tensor_ready
2.2 mark_on_complete
- Parameters
module (DistributedModule) – Module to be parallelized.
optimizers (torch.optim.Optimizer or list of torch.optim.Optimizer) – Optimizer(s) for the module. It can contain one or more PyTorch optimizers.
bucket_type (BucketType) – Type of elements in a communication bucket, could be either module parameters, weights or gradients.
hierarchical_reduce (bool) – Enable hierarchical reduce, which will perform an intra-node allreduce, followed by an inter-node reduce defined by different module, and an intra-node broadcast at the end.
align_bytes (bool) – Number to bytes to be aligned for each communication bucket.
chunking (bool) – For alltoall communication pattern, set chunking to True.
fusion (bool) – To reset parameter data pointer so that they can use faster code paths, set fusion to True.
decentralize_reduce (bool) – Whether execute the decentralize communication. Default: False.
buckets (List[List[TensorDeclaration]]) – Parameter buckets.
- fill_slot(self, param)¶
Get the value of parameters.
- initialize_buckets(self)¶
Initialize parameter buckets.
Note
Initialize_buckets MUST execute after the first round of backward.
- Returns
parameter buckets.
- Return type
List[List[torch.Tensor]]
- register_bagua_buckets(self)¶
Register bagua buckets.
- add_param(self, param)¶
Add parameter into tensor_list.
- mark_bucket_ready(self, bucket, bucket_idx)¶
Mark all tensors in the bucket ready.
- mark_tensor_ready(self, param)¶
Mark the tensor ready when got its gradient.
- mark_on_complete(self)¶
Mark all buckets have finished thier reduce process.
- class bagua.torch_api.distributed.OverlappingWrapper(module, optimizers, delay_reduce=False, bucket_type=BucketType.Gradient, hierarchical_reduce=False, decentralize_reduce=False, parameter_manager=None, align_bytes=8, chunking=False, fusion=True, **kwargs)¶
Bases:
torch.nn.Module
This class defines the process of communication-computation overlap.
- Parameters
module (torch.nn.Module) – A distributed module to be overlapped.
optimizers (torch.optim.Optimizer or list of torch.optim.Optimizer) – Optimizer(s) for the module. It can contain one or more PyTorch optimizers.
delay_reduce (bool) – Delay all communication to the end of the backward pass. This disables overlapping communication with computation. Default value is False.
bucket_type (BucketType) – Type of elements in a communication bucket, could be either module parameters, weights or gradients.
hierarchical_reduce (bool) – Enable hierarchical reduce, which will perform an intra-node allreduce, followed by an inter-node reduce defined by different module, and an intra-node broadcast at the end.
decentralize_reduce (bool) – For decentralize training, set decentralize_reduce to True.
align_bytes (int) – Number to bytes to be aligned for each communication bucket.
chunking (bool) – For alltoall communication pattern, set chunking to True.
fusion (bool) – To reset parameter data pointer so that they can use faster code paths, set fusion to True.
Note
This implementation benefits a lot from apex.parallel.DistributedDataParallel.
- reset_reducer(self, hierarchical_reduce=None, buckets=None)¶
Reset the parameter reducer.
- Parameters
hierarchical_reduce (bool) – Enable hierarchical reduce.
buckets (List[List[TensorDeclaration]]) – Parameter buckets.
- create_hooks(self)¶
Defines a number of hooks used to reduce communication buckets in backward process.
- forward(self, *inputs, **kwargs)¶
Overwrite the forward process for a distributed module with communication-computation overlap.
- class bagua.torch_api.distributed.ModelSwitchWrapper(module, optimizer, broadcast_buffers=True, delay_reduce=False, hierarchical_reduce=None, message_size=10000000, intra_comm_root_rank=0, **kwargs)¶
Bases:
torch.nn.Module
ModelSwitchWrapper is designed to switch distributed algorithms during training process. It mainly has two functions. The first is transform the original module to a distributed module. Second, this class can change the distributed mode to another one in the training process. :param module: Network definition to be run
in multi-gpu/distributed mode.
- Parameters
optimizer (torch.optim.Optimizer or list of torch.optim.Optimizer) – Optimizer(s) for the module. It can contain one or more PyTorch optimizers.
broadcast_buffers (bool) – Flag that enables syncing (broadcasting) buffers of the module at the first iteration of the forward function. Default: True.
delay_reduce (bool) – Overlap communication with computation. Default: True.
hierarchical_reduce (bool) – Enable hierarchical reduce. For GradientAllReduce algorithm, default value is False, otherwise, default value is True.
message_size (int) – Minimum bytes in a communication bucket. Default: 10_000_000.
intra_comm_root_rank (int) – Root rank of intra communication. Default: 0.
- Returns
Distributed module.
Examples:
>>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model = ModelSwitchWrapper( ... model = model, ... optimizer = optimizer, ... broadcast_buffers = broadcast_buffers, ... delay_reduce = delay_reduce, ... hierarchical_reduce = hierarchical_reduce, ... message_size = message_size, ... **kwargs, ... ).switch_to(DistributedAlgorithm.GradientAllReduce) >>> train A epochs >>> model.switch_to(DistributedAlgorithm.Decentralize) >>> train B epochs >>> model.switch_to(DistributedAlgorithm. GradientAllReduce) >>> continue training >>> ...
- switch_to(self, distributed_algorithm)¶
Switch the initial module to distributed module.
- Parameters
distributed_algorithm (DistributedAlgorithm) – Distributed algorithm used to average gradients or weights across all workers. Default: DistributedAlgorithm.GradientAllReduce.
- Returns
Return the distributed module to cover the initial one.
- state_dict(self, **kwargs)¶
Fetch the module’s state_dict.
- report_metrics(self, score_record_list)¶
Logging the metrics of auto_tune algorithm.
- ask_and_update_hyperparameters(self)¶
Execute the environment search process by auto_tune and update the hyper-parameters.
- Return type
bool
- forward(self, *inputs, **kwargs)¶
Overwrite the forward processs and return the output.
- bagua.torch_api.distributed.broadcast_parameters(module, broadcast_buffers=True)¶
Broadcast the parameters (and buffers) for synchronization in the beginning. If broadcast_buffers is False, the buffers won’t be synchronized (broadcasted) in the beginning.
- bagua.torch_api.distributed.allreduce_parameters(module)¶
Allreduce the parameters and buffers for synchronization at each time of switching distributed algorithms.
- bagua.torch_api.distributed.bagua_init(module, optimizer, distributed_algorithm=DistributedAlgorithm.GradientAllReduce, broadcast_buffers=True, delay_reduce=False, hierarchical_reduce=None, message_size=10000000, **kwargs)¶
bagua_init is a module wrapper that enables easy multiprocess distributed data parallel training using different distributed algorithms.
- Parameters
module (torch.nn.Module) – Network definition to be run in multi-gpu/distributed mode.
optimizer (torch.optim.Optimizer or list of torch.optim.Optimizer) – Optimizer(s) for the module. It can contain one or more PyTorch optimizers.
distributed_algorithm (DistributedAlgorithm) – Distributed algorithm used to average gradients or weights across all workers. Default: DistributedAlgorithm.GradientAllReduce.
broadcast_buffers (bool) – Flag that enables syncing (broadcasting) buffers of the module at the first iteration of the forward function. Default: True.
delay_reduce (bool) – Delay all communication to the end of the backward pass. This disables overlapping communication with computation. Default value is False.
hierarchical_reduce (bool) – Enable hierarchical reduce. For GradientAllReduce algorithm, default value is False, otherwise, default value is True.
message_size (int) – Minimum bytes in a communication bucket. Default: 10_000_000.
- Returns
Distributed module.
Examples:
>>> model = torch.nn.Sequential( ... torch.nn.Linear(D_in, H), ... torch.nn.ReLU(), ... torch.nn.Linear(H, D_out), ... ) >>> optimizer = torch.optim.SGD( ... model.parameters(), ... lr=0.01, ... momentum=0.9 ... ) >>> model, optimizer = bagua_init( ... model, ... optimizer, ... broadcast_buffers=True ... )