utils.pytorch_ddp_dist_helper¶
pytorch_ddp_dist_helper¶
Please Reference ding/ding/utils/disk_helper.py for usage.
errro_wrapper¶
- Overview:
wrap the function, so that any Exception in the function will be catched and return the default_ret
- Arguments:
fn (
Callable): the function to be wrapeddefault_ret (
obj): the default return when an Exception occurred in the function
- Returns:
wrapper (
Callable): the wrapped function
- Examples:
>>> # Used to checkfor Fakelink (Refer to utils.linklink_dist_helper.py) >>> def get_rank(): # Get the rank of linklink model, return 0 if use FakeLink. >>> if is_fake_link: >>> return 0 >>> return error_wrapper(link.get_rank, 0)()
get_rank¶
- Overview:
Get the rank of current process in total world_size
get_world_size¶
- Overview:
Get the world_size(total process number in data parallel training)
broadcast¶
Broadcasts the tensor to the whole group.
tensor must have the same number of elements in all processes
participating in the collective.
- Args:
- tensor (Tensor): Data to be sent if
srcis the rank of current process, and tensor to be used to save received data otherwise.
src (int): Source rank. group (ProcessGroup, optional): The process group to work on. If None,
the default process group will be used.
async_op (bool, optional): Whether this op should be an async op
- tensor (Tensor): Data to be sent if
- Returns:
Async work handle, if async_op is set to True. None, if not async_op or if not part of the group
allreduce¶
Reduces the tensor data across all machines in such a way that all get the final result.
After the call tensor is going to be bitwise identical in all processes.
Complex tensors are supported.
- Args:
- tensor (Tensor): Input and output of the collective. The function
operates in-place.
- op (optional): One of the values from
torch.distributed.ReduceOpenum. Specifies an operation used for element-wise reductions.- group (ProcessGroup, optional): The process group to work on. If None,
the default process group will be used.
async_op (bool, optional): Whether this op should be an async op
- Returns:
Async work handle, if async_op is set to True. None, if not async_op or if not part of the group
- Examples:
>>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6]) # Rank 0 tensor([4, 6]) # Rank 1
>>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j]) # Rank 0 tensor([4.+4.j, 6.+6.j]) # Rank 1
get_group¶
- Overview:
Get the group segmentation of
group_sizeeach group- Arguments:
group_size (
int) thegroup_size
dist_mode¶
- Overview:
Wrap the function so that in can init and finalize automatically before each call
dist_init¶
- Overview:
Init the distributed training setting
dist_finalize¶
- Overview:
Finalize distributed training resources