bagua.torch_api.distributed

Module Contents

class bagua.torch_api.distributed.DistributedModule(module)

Bases: torch.nn.Module

A base class for distributed module.

unwrap(self)

Return the unwraped module.

forward(self, *inputs, **kwargs)

Execute the forward process and return the output.

class bagua.torch_api.distributed.Reducer(module, optimizers, bucket_type, hierarchical_reduce, align_bytes, chunking, fusion, decentralize_reduce=False, buckets=[], **kwargs)

Bases: object

In order to improve communication efficiency, the distributed algorithm chunks parameters into many buckets. A bucket is the minimum unit of communication between devices in bagua. This module is the bucket manager, providing bucket operation methods.

The process mainly consists the following two cases:

  1. bucket_initialized is False:

    1.1 add_param

    1.2 initialize_buckets -> register_models

    1.3 mark_bucket_ready

    1.4 mark_on_complete

  2. bucket_initialized is True:

    2.1 mark_tensor_ready

    2.2 mark_on_complete

Parameters
  • module (DistributedModule) – Module to be parallelized.

  • optimizers (torch.optim.Optimizer or list of torch.optim.Optimizer) – Optimizer(s) for the module. It can contain one or more PyTorch optimizers.

  • bucket_type (BucketType) – Type of elements in a communication bucket, could be either module parameters, weights or gradients.

  • hierarchical_reduce (bool) – Enable hierarchical reduce, which will perform an intra-node allreduce, followed by an inter-node reduce defined by different module, and an intra-node broadcast at the end.

  • align_bytes (bool) – Number to bytes to be aligned for each communication bucket.

  • chunking (bool) – For alltoall communication pattern, set chunking to True.

  • fusion (bool) – To reset parameter data pointer so that they can use faster code paths, set fusion to True.

  • decentralize_reduce (bool) – Whether execute the decentralize communication. Default: False.

  • buckets (List[List[TensorDeclaration]]) – Parameter buckets.

fill_slot(self, param)

Get the value of parameters.

initialize_buckets(self)

Initialize parameter buckets.

Note

Initialize_buckets MUST execute after the first round of backward.

Returns

parameter buckets.

Return type

List[List[torch.Tensor]]

register_bagua_buckets(self)

Register bagua buckets.

add_param(self, param)

Add parameter into tensor_list.

mark_bucket_ready(self, bucket, bucket_idx)

Mark all tensors in the bucket ready.

mark_tensor_ready(self, param)

Mark the tensor ready when got its gradient.

mark_on_complete(self)

Mark all buckets have finished thier reduce process.

class bagua.torch_api.distributed.OverlappingWrapper(module, optimizers, delay_reduce=False, bucket_type=BucketType.Gradient, hierarchical_reduce=False, decentralize_reduce=False, parameter_manager=None, align_bytes=8, chunking=False, fusion=True, **kwargs)

Bases: torch.nn.Module

This class defines the process of communication-computation overlap.

Parameters
  • module (torch.nn.Module) – A distributed module to be overlapped.

  • optimizers (torch.optim.Optimizer or list of torch.optim.Optimizer) – Optimizer(s) for the module. It can contain one or more PyTorch optimizers.

  • delay_reduce (bool) – Delay all communication to the end of the backward pass. This disables overlapping communication with computation. Default value is False.

  • bucket_type (BucketType) – Type of elements in a communication bucket, could be either module parameters, weights or gradients.

  • hierarchical_reduce (bool) – Enable hierarchical reduce, which will perform an intra-node allreduce, followed by an inter-node reduce defined by different module, and an intra-node broadcast at the end.

  • decentralize_reduce (bool) – For decentralize training, set decentralize_reduce to True.

  • align_bytes (int) – Number to bytes to be aligned for each communication bucket.

  • chunking (bool) – For alltoall communication pattern, set chunking to True.

  • fusion (bool) – To reset parameter data pointer so that they can use faster code paths, set fusion to True.

Note

This implementation benefits a lot from apex.parallel.DistributedDataParallel.

reset_reducer(self, hierarchical_reduce=None, buckets=None)

Reset the parameter reducer.

Parameters
  • hierarchical_reduce (bool) – Enable hierarchical reduce.

  • buckets (List[List[TensorDeclaration]]) – Parameter buckets.

create_hooks(self)

Defines a number of hooks used to reduce communication buckets in backward process.

forward(self, *inputs, **kwargs)

Overwrite the forward process for a distributed module with communication-computation overlap.

class bagua.torch_api.distributed.ModelSwitchWrapper(module, optimizer, broadcast_buffers=True, delay_reduce=False, hierarchical_reduce=None, message_size=10000000, intra_comm_root_rank=0, **kwargs)

Bases: torch.nn.Module

ModelSwitchWrapper is designed to switch distributed algorithms during training process. It mainly has two functions. The first is transform the original module to a distributed module. Second, this class can change the distributed mode to another one in the training process. :param module: Network definition to be run

in multi-gpu/distributed mode.

Parameters
  • optimizer (torch.optim.Optimizer or list of torch.optim.Optimizer) – Optimizer(s) for the module. It can contain one or more PyTorch optimizers.

  • broadcast_buffers (bool) – Flag that enables syncing (broadcasting) buffers of the module at the first iteration of the forward function. Default: True.

  • delay_reduce (bool) – Overlap communication with computation. Default: True.

  • hierarchical_reduce (bool) – Enable hierarchical reduce. For GradientAllReduce algorithm, default value is False, otherwise, default value is True.

  • message_size (int) – Minimum bytes in a communication bucket. Default: 10_000_000.

  • intra_comm_root_rank (int) – Root rank of intra communication. Default: 0.

Returns

Distributed module.

Examples:

>>> model = torch.nn.Sequential(
...    torch.nn.Linear(D_in, H),
...    torch.nn.ReLU(),
...    torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...    model.parameters(),
...    lr=0.01,
...    momentum=0.9
...    )
>>> model = ModelSwitchWrapper(
...    model = model,
...    optimizer = optimizer,
...    broadcast_buffers = broadcast_buffers,
...    delay_reduce = delay_reduce,
...    hierarchical_reduce = hierarchical_reduce,
...    message_size = message_size,
...    **kwargs,
...    ).switch_to(DistributedAlgorithm.GradientAllReduce)
>>> train A epochs
>>> model.switch_to(DistributedAlgorithm.Decentralize)
>>> train B epochs
>>> model.switch_to(DistributedAlgorithm. GradientAllReduce)
>>> continue training
>>> ...
switch_to(self, distributed_algorithm)

Switch the initial module to distributed module.

Parameters

distributed_algorithm (DistributedAlgorithm) – Distributed algorithm used to average gradients or weights across all workers. Default: DistributedAlgorithm.GradientAllReduce.

Returns

Return the distributed module to cover the initial one.

state_dict(self, **kwargs)

Fetch the module’s state_dict.

report_metrics(self, score_record_list)

Logging the metrics of auto_tune algorithm.

ask_and_update_hyperparameters(self)

Execute the environment search process by auto_tune and update the hyper-parameters.

Return type

bool

forward(self, *inputs, **kwargs)

Overwrite the forward processs and return the output.

bagua.torch_api.distributed.broadcast_parameters(module, broadcast_buffers=True)

Broadcast the parameters (and buffers) for synchronization in the beginning. If broadcast_buffers is False, the buffers won’t be synchronized (broadcasted) in the beginning.

bagua.torch_api.distributed.allreduce_parameters(module)

Allreduce the parameters and buffers for synchronization at each time of switching distributed algorithms.

bagua.torch_api.distributed.bagua_init(module, optimizer, distributed_algorithm=DistributedAlgorithm.GradientAllReduce, broadcast_buffers=True, delay_reduce=False, hierarchical_reduce=None, message_size=10000000, **kwargs)

bagua_init is a module wrapper that enables easy multiprocess distributed data parallel training using different distributed algorithms.

Parameters
  • module (torch.nn.Module) – Network definition to be run in multi-gpu/distributed mode.

  • optimizer (torch.optim.Optimizer or list of torch.optim.Optimizer) – Optimizer(s) for the module. It can contain one or more PyTorch optimizers.

  • distributed_algorithm (DistributedAlgorithm) – Distributed algorithm used to average gradients or weights across all workers. Default: DistributedAlgorithm.GradientAllReduce.

  • broadcast_buffers (bool) – Flag that enables syncing (broadcasting) buffers of the module at the first iteration of the forward function. Default: True.

  • delay_reduce (bool) – Delay all communication to the end of the backward pass. This disables overlapping communication with computation. Default value is False.

  • hierarchical_reduce (bool) – Enable hierarchical reduce. For GradientAllReduce algorithm, default value is False, otherwise, default value is True.

  • message_size (int) – Minimum bytes in a communication bucket. Default: 10_000_000.

Returns

Distributed module.

Examples:

>>> model = torch.nn.Sequential(
...    torch.nn.Linear(D_in, H),
...    torch.nn.ReLU(),
...    torch.nn.Linear(H, D_out),
...    )
>>> optimizer = torch.optim.SGD(
...    model.parameters(),
...    lr=0.01,
...    momentum=0.9
...    )
>>> model, optimizer = bagua_init(
...    model,
...    optimizer,
...    broadcast_buffers=True
...    )