使用 PyTorch 编写分布式应用程序¶
Created On: Oct 06, 2017 | Last Updated: Feb 20, 2025 | Last Verified: Nov 05, 2024
作者: Séb Arnold
备注
在 github 中查看和编辑此教程。
前提条件:
在本简短教程中,我们将介绍 PyTorch 的分布式包。我们将了解如何设置分布式环境,使用不同的通信策略,并深入了解一些包的内部工作原理。
设置¶
PyTorch 中包含的分布式包(即 torch.distributed
)使研究人员和从业者可以轻松地在进程和机器集群之间并行化计算。为此,它利用消息传递语义,允许每个进程将数据传递到其他任何进程。与 multiprocessing (torch.multiprocessing
)包不同,进程可以使用不同的通信后端,并且不局限于同一机器上执行。
为了开始,我们需要能够同时运行多个进程。如果您可以访问计算集群,应该咨询您所在的系统管理员,或者使用您喜欢的协调工具(例如 pdsh、clustershell 或 slurm)。在本教程中,我们将使用单台机器,并使用以下模板生成多个进程。
"""run.py:"""
#!/usr/bin/env python
import os
import sys
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def run(rank, size):
""" Distributed function to be implemented later. """
pass
def init_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
world_size = 2
processes = []
if "google.colab" in sys.modules:
print("Running in Google Colab")
mp.get_context("spawn")
else:
mp.set_start_method("spawn")
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
以上脚本生成了两个进程,每个进程都会设置分布式环境,初始化进程组(dist.init_process_group
),并最终执行给定的``run``函数。
让我们看看``init_process``函数。它确保每个进程能够通过主节点进行协调,使用相同的IP地址和端口。注意我们使用了``gloo``后端,但还有其他后端可用。(参见`第5.1节 <#communication-backends>`__)在本教程的末尾我们将深入了解``dist.init_process_group``中的实现,但其基本作用是允许进程通过共享它们的位置进行通信。
点对点通信¶

发送和接收¶
从一个进程向另一个进程传输数据称为点对点通信。可以通过``send``和``recv``函数或它们的非阻塞版本``isend``和``irecv``实现。
"""Blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
if rank == 0:
tensor += 1
# Send the tensor to process 1
dist.send(tensor=tensor, dst=1)
else:
# Receive tensor from process 0
dist.recv(tensor=tensor, src=0)
print('Rank ', rank, ' has data ', tensor[0])
在上述示例中,两个进程都以一个零张量开始,然后进程0将张量加1并将其发送到进程1,以便两个进程最终拥有值1.0。注意进程1需要分配内存以存储接收到的数据。
还要注意``send/recv``是**阻塞的**:两个进程都会阻塞,直到通信完成。而非阻塞版本方法是**非阻塞的**,脚本会继续执行,这些方法会返回一个``Work``对象,我们可以选择对其调用``wait()``。
"""Non-blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
req = None
if rank == 0:
tensor += 1
# Send the tensor to process 1
req = dist.isend(tensor=tensor, dst=1)
print('Rank 0 started sending')
else:
# Receive tensor from process 0
req = dist.irecv(tensor=tensor, src=0)
print('Rank 1 started receiving')
req.wait()
print('Rank ', rank, ' has data ', tensor[0])
使用非阻塞方法时,我们必须注意如何使用发送和接收的张量。由于我们无法确定数据何时会传输到另一个进程,因此在``req.wait()``完成之前,不应修改发送的张量或访问接收的张量。换句话说:
在``dist.isend()``后写入``tensor``将导致未定义行为。
在``dist.irecv()``后读取``tensor``将导致未定义行为,直到执行了``req.wait()``为止。
然而,执行完``req.wait()``后,我们可以保证通信已经发生,并且``tensor[0]``中的值是1.0。
当我们需要对进程间通信有更细粒度的控制时,点对点通信非常有用。它们可以用来实现例如`百度的DeepSpeech <https://github.com/baidu-research/baidu-allreduce>`__或`Facebook的大规模实验 <https://research.fb.com/publications/imagenet1kin1h/>`__中使用的复杂算法。(参见 第4.1节)
集体通信¶
![]() 分散¶ |
![]() 收集¶ |
![]() 归约¶ |
![]() 全归约¶ |
![]() 广播¶ |
![]() 全部收集¶ |
与点对点通信相反,集体通信允许在一个**组**中的所有进程之间进行通信模式。组是所有进程的一个子集。我们可以通过向``dist.new_group(group)``传递一个排名列表来创建组。默认情况下,集体通信会在所有进程上执行,也称为**全局**。例如,为了获得所有进程上的张量的总和,我们可以使用``dist.all_reduce(tensor, op, group)``集体通信。
""" All-Reduce example."""
def run(rank, size):
""" Simple collective communication. """
group = dist.new_group([0, 1])
tensor = torch.ones(1)
dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
print('Rank ', rank, ' has data ', tensor[0])
由于我们希望获得组中所有张量的总和,我们使用``dist.ReduceOp.SUM``作为归约操作符。一般来说,任何交换律数学操作都可以作为操作符。PyTorch内置了许多这样的操作符,它们都在单个元素级别工作:
dist.ReduceOp.SUM
,dist.ReduceOp.PRODUCT
,dist.ReduceOp.MAX
,dist.ReduceOp.MIN
,dist.ReduceOp.BAND
,dist.ReduceOp.BOR
,dist.ReduceOp.BXOR
,dist.ReduceOp.PREMUL_SUM
。
支持的操作符的完整列表请参考`这里 <https://pytorch.org/docs/stable/distributed.html#torch.distributed.ReduceOp>`__。
除了``dist.all_reduce(tensor, op, group)``,PyTorch目前还实现了许多其他集体通信操作。以下是一些支持的集体通信操作。
dist.broadcast(tensor, src, group)
:将``tensor``从``src``复制到所有其他进程。dist.reduce(tensor, dst, op, group)
:对每个``tensor``应用``op``并将结果存储在``dst``中。dist.all_reduce(tensor, op, group)
:与reduce相同,但结果存储在所有进程中。dist.scatter(tensor, scatter_list, src, group)
:将``scatter_list[i]``中的第i个张量复制到第i个进程。dist.gather(tensor, gather_list, dst, group)
:将所有进程中的``tensor``复制到``dst``。dist.all_gather(tensor_list, tensor, group)
:将所有进程的``tensor``复制到``tensor_list``,在所有进程上。dist.barrier(group)
:阻塞`group`中的所有进程直到每个进程都进入此函数。dist.all_to_all(output_tensor_list, input_tensor_list, group)
:将输入张量列表分散到组中的所有进程,并将收集到的张量列表返回到输出列表中。
支持的集体通信操作的完整列表可以通过查看PyTorch分布式的最新文档找到`(链接) <https://pytorch.org/docs/stable/distributed.html>`__。
分布式训练¶
**注意:**本节的示例脚本可以在`这个GitHub仓库中找到 <https://github.com/seba-1511/dist_tuto.pth/>`__。
现在我们已经理解了分布式模块的工作原理,让我们用它写点有用的东西。我们的目标是重现`DistributedDataParallel <https://pytorch.org/docs/stable/nn.html#torch.nn.parallel.DistributedDataParallel>`__的功能。当然,这将是一个教学示例,在真实世界中,您应该使用上述链接中官方的、经过严格测试和优化的版本。
我们希望实现分布式版本的随机梯度下降。我们的脚本将让所有进程在它们的数据批次上计算模型的梯度,然后平均它们的梯度。为了确保在更改进程数量时具有类似的收敛结果,我们首先需要对数据集进行划分。(您也可以使用`torch.utils.data.random_split <https://pytorch.org/docs/stable/data.html#torch.utils.data.random_split>`__,而不是下面的代码段。)
""" Dataset partitioning helper """
class Partition(object):
def __init__(self, data, index):
self.data = data
self.index = index
def __len__(self):
return len(self.index)
def __getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]
class DataPartitioner(object):
def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
self.data = data
self.partitions = []
rng = Random() # from random import Random
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
def use(self, partition):
return Partition(self.data, self.partitions[partition])
通过上述代码段,我们现在可以使用以下几行代码简单地划分任何数据集:
""" Partitioning MNIST """
def partition_dataset():
dataset = datasets.MNIST('./data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
size = dist.get_world_size()
bsz = 128 // size
partition_sizes = [1.0 / size for _ in range(size)]
partition = DataPartitioner(dataset, partition_sizes)
partition = partition.use(dist.get_rank())
train_set = torch.utils.data.DataLoader(partition,
batch_size=bsz,
shuffle=True)
return train_set, bsz
假设我们有2个副本,那么每个进程将拥有60000 / 2 = 30000个样本的``train_set``。我们还将批量大小除以副本数,以保持总的批量大小为128。
现在我们可以写我们通常的前向-反向优化训练代码,并添加一个函数调用以平均我们模型的梯度。(以下代码大部分是受官方`PyTorch MNIST示例 <https://github.com/pytorch/examples/blob/master/mnist/main.py>`__启发的。)
""" Distributed Synchronous SGD Example """
def run(rank, size):
torch.manual_seed(1234)
train_set, bsz = partition_dataset()
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=0.01, momentum=0.5)
num_batches = ceil(len(train_set.dataset) / float(bsz))
for epoch in range(10):
epoch_loss = 0.0
for data, target in train_set:
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.item()
loss.backward()
average_gradients(model)
optimizer.step()
print('Rank ', dist.get_rank(), ', epoch ',
epoch, ': ', epoch_loss / num_batches)
剩下的就是实现``average_gradients(model)``函数,该函数简单地接收一个模型并平均它的所有全局梯度。
""" Gradient averaging. """
def average_gradients(model):
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
param.grad.data /= size
完成了! 我们成功实现了分布式同步SGD,并可以在大型计算集群上训练任何模型。
**注意:**尽管最后一句话在技术上是正确的,要实现生产级的同步SGD实施还需要`很多技巧 <https://seba-1511.github.io/dist_blog>`__。再次强调,请使用`经过测试和优化的版本 <https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel>`__。
我们自己的环形全归约¶
作为一个额外的挑战,假设我们想实现DeepSpeech的高效环形全归约。这使用点对点通信操作相当容易实现。
""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
rank = dist.get_rank()
size = dist.get_world_size()
send_buff = send.clone()
recv_buff = send.clone()
accum = send.clone()
left = ((rank - 1) + size) % size
right = (rank + 1) % size
for i in range(size - 1):
if i % 2 == 0:
# Send send_buff
send_req = dist.isend(send_buff, right)
dist.recv(recv_buff, left)
accum[:] += recv_buff[:]
else:
# Send recv_buff
send_req = dist.isend(recv_buff, right)
dist.recv(send_buff, left)
accum[:] += send_buff[:]
send_req.wait()
recv[:] = accum[:]
在上述脚本中,``allreduce(send, recv)``函数的签名与PyTorch中的稍有不同。它接收一个``recv``张量,并将所有``send``张量的总和存储在其中。作为留给读者的练习,我们的版本与DeepSpeech的版本之间还有一个不同之处:他们的实现将梯度张量分成*块*,以便最佳利用通信带宽。(提示:torch.chunk)
高级主题¶
现在我们准备好了解``torch.distributed``的一些更高级的功能。由于涉及的内容很多,本节分为两个子部分:
通信后端:我们将在其中学习如何使用MPI和Gloo进行GPU-GPU通信。
初始化方法:我们将在其中理解如何在``dist.init_process_group()``中最好地设置初始协调阶段。
通信后端¶
``torch.distributed``最优雅的方面之一是它能够抽象并基于不同的后端构建。如前所述,PyTorch中实现了多个后端。一些最受欢迎的后端是Gloo、NCCL和MPI。它们根据所需的用例具有不同的规格和权衡。支持的功能的对比表可以参考`这里 <https://pytorch.org/docs/stable/distributed.html#module-torch.distributed>`__。
Gloo后端
到目前为止,我们已经广泛使用了`Gloo后端<https://github.com/facebookincubator/gloo>`__。它作为开发平台非常方便,因为它包括在预编译的PyTorch二进制文件中,并且可以在Linux(自0.2版本起)和macOS(自1.3版本起)上运行。它支持CPU上的所有点对点和集体操作,以及GPU上的所有集体操作。对于CUDA张量的集体操作实现并不像NCCL后端提供的那样优化。
正如您肯定已经注意到的,如果将``model``放置到GPU上,我们的分布式SGD示例将无法工作。为了使用多个GPU,我们还需要进行以下修改:
使用``device = torch.device(“cuda:{}”.format(rank))``
model = Net()
\(\rightarrow\)model = Net().to(device)
使用``data, target = data.to(device), target.to(device)``
通过以上修改,我们的模型现在正在两个GPU上进行训练,您可以用``watch nvidia-smi``监控它们的利用率。
MPI后端
消息传递接口(MPI)是高性能计算领域的一种标准化工具。它允许进行点对点和集体通信,并且是``torch.distributed`` API 的主要灵感来源。存在若干种 MPI 实现(例如 Open-MPI,MVAPICH2,Intel MPI),每种实现均针对不同的目的进行了优化。使用 MPI 后端的优势在于 MPI 在大型计算集群上的广泛可用性和高度优化。一些 近期的 实现 还能够利用 CUDA IPC 和 GPU 直连技术,从而避免通过 CPU 进行内存复制。
不幸的是,PyTorch 的二进制文件不能包含 MPI 实现,因此我们需要手动重新编译它。幸运的是,这个过程相当简单,因为在编译期间 PyTorch 会自行查找可用的 MPI 实现。以下步骤通过从源码安装 PyTorch 来安装 MPI 后端。
创建并激活您的 Anaconda 环境,安装所有先决条件,按照`指南 <https://github.com/pytorch/pytorch#from-source>`__操作,但请确保**不要**运行
python setup.py install
。选择并安装您喜欢的 MPI 实现。请注意,启用 CUDA 感知 MPI 可能需要一些额外步骤。在本例中,我们将选择没有 GPU 支持的 Open-MPI:
conda install -c conda-forge openmpi
现在进入您克隆的 PyTorch 仓库并执行
python setup.py install
。
为了测试我们新安装的后端,需要进行一些修改。
将
if __name__ == '__main__':
下的内容替换为init_process(0, 0, run, backend='mpi')
。运行
mpirun -n 4 python myscript.py
。
进行这些更改的原因是 MPI 需要在生成进程之前创建自己的环境。MPI 还会生成自己的进程并执行在`初始化方法 <#initialization-methods>`__中描述的握手操作,从而使得 init_process_group
的 rank
和 size
参数变得多余。这实际上相当强大,因为您可以向 mpirun
传递额外的参数,以针对每个进程定制计算资源(如每个进程的核心数量、手动将机器分配给特定的 rank,以及`其他 <https://www.open-mpi.org/faq/?category=running#mpirun-hostfile>`__内容)。通过这样做,您应该能够获得与其他通信后端相同的熟悉输出。
NCCL 后端
NCCL 后端 提供了一种针对 CUDA 张量优化的集体操作实现。如果您的集体操作只使用 CUDA 张量,请考虑使用此后端以获得一流的性能。NCCL 后端包含在支持 CUDA 的预构建二进制文件中。
初始化方法¶
在本教程的最后,让我们检查我们调用的初始函数:dist.init_process_group(backend, init_method)
。具体来说,我们将讨论各种负责每个进程之间初步协调的初始化方法。这些方法使您能够定义如何完成这种协调。
初始化方法的选择取决于您的硬件设置,一种方法可能比其他方法更适合。除了以下部分外,请参阅`官方文档 <https://pytorch.org/docs/stable/distributed.html#initialization>`__以获取更多信息。
环境变量
在本教程中,我们一直使用环境变量初始化方法。通过在所有机器上设置以下四个环境变量,所有进程将能够正确连接到主节点,获取其他进程的信息,并最终与它们握手。
MASTER_PORT
:将托管 rank 为 0 的进程的机器上的一个空闲端口。MASTER_ADDR
:将托管 rank 为 0 的进程的机器的 IP 地址。WORLD_SIZE
:进程的总数,以便主节点知道需要等待多少个工作进程。RANK
:每个进程的 rank,以便它们知道是主进程还是工作进程。
共享文件系统
共享文件系统要求所有进程都能够访问一个共享文件系统,并通过共享文件对它们进行协调。这意味着每个进程都会打开文件,写入它的信息,并等待所有进程完成。之后,所有必要的信息将能为所有进程所用。为避免竞争条件,文件系统必须支持通过 fcntl 进行锁定。
dist.init_process_group(
init_method='file:///mnt/nfs/sharedfile',
rank=args.rank,
world_size=4)
TCP
通过 TCP 初始化可以通过提供 rank 为 0 的进程的 IP 地址和一个可达的端口号来实现。在这里,所有工作进程都能够连接到 rank 为 0 的进程并交换关于如何相互连接的信息。
dist.init_process_group(
init_method='tcp://10.1.1.20:23456',
rank=args.rank,
world_size=4)
致谢
我要感谢 PyTorch 开发者在实现、文档和测试上出色的工作。当代码不清楚时,我总是可以依赖于`文档 <https://pytorch.org/docs/stable/distributed.html>`__或`测试 <https://github.com/pytorch/pytorch/tree/master/test/distributed>`__找到答案。特别感谢 Soumith Chintala、Adam Paszke 和 Natalia Gimelshein 对早期草稿提供的洞察性评论和回答问题。