分布式通讯包 - torch.distributed


torch.distributed提供了一种类似 MPI 的接口,用于跨多机器网络交换张量数据。它支持几种不同的后端和初始化方法。

目前,torch.distributed支持三个后端,每个后端具有不同的功能。下表显示哪些功能可用于 CPU / CUDA 张量。只有当用于构建 PyTorch 的实现支持它时,MPI 才支持 cuda。

后端 tcp gloo mpi
设备 中央处理器 GPU 中央处理器 GPU 中央处理器 GPU
—- —- —- —- —- —- —-
发送
的 recv
广播
all_reduce
减少
all_gather
收集
分散
屏障

基本

所述torch.distributed包提供跨在一个或多个计算机上运行的几个计算节点对多进程并行 PyTorch 支持与通信原语。该类torch.nn.parallel.DistributedDataParallel()基于此功能,提供同步分布式培训作为围绕任何 PyTorch 模型的包装器。这不同于所提供的类型的并行的 :模块:torch.multiprocessingtorch.nn.DataParallel()在它支持多个网络连接的机器和在用户必须明确地启动主训练脚本的单独副本为每个进程。

在单机同步情况下,torch.distributed 或 torch.nn.parallel.DistributedDataParallel()wrapper 可能仍然具有优于数据并行性的其他方法的优点,包括torch.nn.DataParallel()

  • 每个进程维护自己的优化器,并且每次迭代执行完整的优化步骤。虽然这可能是冗余的,因为梯度已经被聚集在一起,并且在进程之间进行平均,因此对于每个进程是相同的,这意味着不需要参数广播步骤,从而减少在节点之间传送张量所花费的时间。
  • 每个进程都包含一个独立的 Python 解释器,消除了来自单个 Python 进程中驱动多个执行线程,模型副本或 GPU 的额外的解释器开销和“GIL-thrashing”。这对于大量使用 Python 运行时的模型尤其重要,包括具有循环层或多个小组件的模型。

初始化

torch.distributed.init_process_group() 在调用任何其他方法之前,需要使用该函数初始化该包。这将阻止所有进程加入。

torch.distributed.init_process_group(backend, init_method=’env://‘, kwargs)

初始化分布式包。

参数:

torch.distributed.get_rank()

返回当前进程的排名。

Rank 是分配给分布式组中每个进程的唯一标识符。它们总是连续的整数,范围从 0 到world_size

torch.distributed.get_world_size()

返回分布式组中的进程数。


目前支持三种初始化方式:

TCP 初始化

有两种方法来初始化使用 TCP,这两种方法都需要可以从所有进程访问的网络地址和所需的 world_size。第一种方法需要指定属于等级 0 进程的地址。第一种初始化方式要求所有进程都具有手动指定的等级。

或者,地址必须是有效的 IP 多播地址,在这种情况下可以自动分配等级。组播初始化还支持一个 group_name 参数,只要使用不同的组名,就可以为多个作业使用相同的地址。

  1. import torch.distributed as dist
  2. # Use address of one of the machines
  3. dist.init_process_group(init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)
  4. # or a multicast address - rank will be assigned automatically if unspecified
  5. dist.init_process_group(init_method='tcp://[ff15:1e18:5d4c:4cf0:d02d:b659:53ba:b0a7]:23456',
  6. world_size=4)

共享文件系统初始化

另一个初始化方法利用一个文件系统,该文件系统可以从组中的所有计算机共享和可见,以及所需的 world_size。该 URL 应 file://以共享文件系统上的不存在的文件(在现有目录中)开头并包含一个路径。此初始化方法也支持一个 group_name 参数,只要使用不同的组名,就可以使用相同的共享文件路径进行多个作业。

警告

该方法假设文件系统支持使用 fcntl 大多数本地系统进行锁定,NFS 支持它。

  1. import torch.distributed as dist
  2. # Rank will be assigned automatically if unspecified
  3. dist.init_process_group(init_method='file:///mnt/nfs/sharedfile', world_size=4,
  4. group_name=args.group)

环境变量初始化

该方法将从环境变量中读取配置,从而可以完全自定义获取信息的方式。要设置的变量是:

  • MASTER_PORT - 必需 必须是排名 0 的机器上的免费端口
  • MASTER_ADDR - 必需(等级 0 除外); 地址 0 级节点
  • WORLD_SIZE - 必需 可以在这里设置,也可以调用 init 函数
  • RANK - 必需 可以在这里设置,也可以调用 init 函数

等级为 0 的机器将用于设置所有连接。

这是默认方法,这意味着 init_method 不必指定(或可以 env://)。

Groups 组

默认集合在默认组(也称为世界)上运行,并要求所有进程进入分布式函数调用。然而,一些工作负载可以从更细粒度的通信中受益。这就是分布式组织发挥作用的地方。new_group()函数可用于创建新组,具有所有进程的任意子集。它返回一个不透明的组句柄,可以作为 group 参数给予所有集合(集合是分布式函数以在某些众所周知的编程模式中交换信息)。

torch.distributed.new_group(_rank = None)

创建一个新的分布式组。

此功能要求主组中的所有进程(即作为分布式作业的一部分的所有进程)都将输入此功能,即使它们不会是组的成员。此外,应在所有进程中以相同的顺序创建组。

参数:

  • ranks (list[int]) - 团体成员名单。 返回: 分配组的句柄可以被赋予集体调用。

torch.distributed.send(tensor, dst)

同步发送张量。

参数:

  • 张量(张量) - 张量发送。
  • dst(int) - 目的地排名。

torch.distributed.recv(tensor, src=None)

同步接收张量。

参数:

  • 张量(Tensor) - 张量填充收到的数据。
  • src(int) - 源排名。

isend()irecv() 在使用时返回分布式请求对象。一般来说,此对象的类型是未指定的,因为它们不应该手动创建,而是保证支持两种方法:

  • is_completed() - 如果操作完成,返回 True
  • wait() - 将阻止进程,直到操作完成。 is_completed()保证在返回 True 后返回 True。

torch.distributed.isend(tensor, dst)

以异步方式发送张量。

参数:

  • 张量(张量) - 张量发送。
  • dst(int) - 目的地排名。

返回:

分布式请求对象。

torch.distributed.irecv(tensor, src)

以异步方式接收张量。

参数:

  • 张量(Tensor) - 张量填充收到的数据。
  • src(int) - 源排名。

返回: 分布式请求对象。

torch.distributed.broadcast(tensor, src, group=)

向整个组播放张量。

tensor 在参与集体的所有过程中必须具有相同数量的元素。

参数:

  • 张量(Tensor) - 如果src是当前进程的等级,则要发送的数据,否则用于保存接收数据的张量。
  • src(int) - 源排名。
  • 集团(可选) - 集体集团。

torch.distributed.all_reduce(tensor, op=, group=)

减少所有机器上的张量数据,使得所有机器都得到最终结果。

tensor在所有进程中,呼叫将是相同的。

参数:

  • 张量(张量) - 集体的输入和输出。该功能就地运行。
  • op(可选) - torch.distributed.reduce_op 枚举中的一个值。指定用于元素方式减少的操作。
  • 集团(可选) - 集体集团。

torch.distributed.reduce(tensor, dst, op=, group=)

减少所有机器的张量数据。

只有排名过程dst才会得到最终结果。

参数:

  • 张量(张量) - 集体的输入和输出。该功能就地运行。
  • op(可选) - torch.distributed.reduce_op 枚举中的一个值。指定用于元素方式减少的操作。
  • 集团(可选) - 集体集团。

torch.distributed.all_gather(tensor_list, tensor, group=)

从列表中收集整个组的张量。

参数:

  • tensorlist(list _[ Tensor ]) - 输出列表。它应该包含用于集体产出的正确大小的张量。
  • 张量(Tensor) - 从当前进程广播的张量。
  • 集团(可选) - 集体集团。

torch.distributed.gather(tensor, kwargs)

在单个过程中收集张量列表。

参数:

  • 张量(Tensor) - 输入张量。
  • dst(int) - 目的地排名。除了接收数据之外的所有进程都需要。
  • gatherlist(list _[ Tensor ]) - 用于接收数据的适当大小的张量的列表。仅在接收过程中需要。
  • 集团(可选) - 集体集团。

torch.distributed.scatter(tensor, kwargs)

向组中的所有进程散布张量列表。

每个进程将只收到一个张量,并将其数据存储在 tensor参数中。

参数:

  • 张量(张量) - 输出张量。
  • src(int) - 源排名。除了发送数据之外的所有进程都需要。
  • scatterlist(list _[ Tensor ]) - 要分散的张量列表。只有在发送数据的过程中才需要。
  • 集团(可选) - 集体集团。

torch.distributed.barrier(group=)

同步所有进程。

这个集体阻止进程,直到整个组进入这个功能。

参数: group (可选) - Group of the collective.

译者署名

用户名 头像 职能 签名
Song 分布式通讯包 - torch.distributed - 图1 翻译 人生总要追求点什么