Shortcuts

分布式数据并行的入门

Created On: Apr 23, 2019 | Last Updated: Jun 06, 2025 | Last Verified: Nov 05, 2024

作者: Shen Li

编辑: Joe Zhu, Chirag Pandya

备注

编辑github 中查看和编辑此教程。

前提条件:

分布式数据并行 (DDP) 是 PyTorch 中一个强大的模块,它允许你在多台机器上并行化模型,使其适合大规模深度学习应用。要使用 DDP,你需要为每个进程生成多个进程,并为每个进程创建一个单独的 DDP 实例。

但是它是如何工作的?DDP 使用 torch.distributed 包中的集合通信来同步所有进程之间的梯度和缓冲区。这意味着每个进程都拥有模型的一个副本,但它们会协同工作来训练模型,就像在单台机器上一样。

为了实现这一点,DDP 为模型中的每个参数注册了一个自动梯度钩子。当反向传播运行时,这个钩子会触发并启动所有进程的梯度同步。这确保了每个进程拥有相同的梯度,然后用于更新模型。

要详细了解 DDP 的工作原理以及如何有效地使用它,请务必查看 DDP 设计笔记。使用 DDP,你可以比以往更快、更高效地训练模型!

使用 DDP 的推荐方法是为每个模型副本生成一个进程。模型副本可以跨多个设备扩展。DDP 进程可以放在同一台机器上或跨机器分布。请注意,GPU 设备不能在 DDP 进程之间共享(即一个 GPU 对应一个 DDP 进程)。

在本教程中,我们将从一个基本的 DDP 用例开始,然后演示更高级的用例,包括模型检查点保存和将 DDP 与模型并行结合使用。

备注

本教程中的代码运行在一个 8 GPU 的服务器上,但可以轻松推广到其他环境。

DataParallelDistributedDataParallel 的比较

在深入探讨之前,让我们澄清一下,尽管增加了复杂性,为什么你会考虑使用 DistributedDataParallel 而不是 DataParallel

  • 首先,DataParallel 是单进程、多线程的,但它仅适用于单台机器。相比之下,DistributedDataParallel 是多进程的,并支持单机和多机训练。由于线程之间的 GIL 竞争、每次迭代都会复制模型,以及输入分散和输出聚合引入的额外开销,即使在单机上,DataParallel 通常也比 DistributedDataParallel 慢。

  • 回顾 前面的教程,如果你的模型太大而无法放在单个 GPU 上,你必须使用 模型并行 将其分割到多个 GPU 上。DistributedDataParallel 可与 模型并行 一起工作,而目前 DataParallel 不能。当 DDP 与模型并行结合时,每个 DDP 进程将使用模型并行,所有进程将共同使用数据并行。

基本用例

要创建一个 DDP 模块,你必须首先正确地设置进程组。详细信息可以在 使用 PyTorch 编写分布式应用 中找到。

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# For TcpStore, same way as on Linux.

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

现在,让我们创建一个玩具模块,用 DDP 包装它,并为其提供一些虚拟的输入数据。请注意,由于 DDP 在构造函数中将模型状态从 rank 0 进程广播到所有其他进程,你无需关心不同的 DDP 进程从不同的初始模型参数值开始。

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running basic DDP example on rank {rank}.")


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

正如你所看到的,DDP 封装了底层的分布式通信细节,并提供了一个简单的 API,就像它是一个本地模型一样。梯度同步通讯发生在反向传播过程中,并与反向计算重叠。当 backward() 返回时,param.grad 已经包含了同步的梯度张量。对于基本用例,DDP 只需要多几行代码来设置进程组。在将 DDP 应用于更高级的用例时,需要注意一些注意事项。

处理速度不均衡

在 DDP 中,构造函数、前向传播和反向传播是分布式同步点。期望不同的进程启动相同数量的同步,并按照相同的顺序到达这些同步点,并且大致同时进入每个同步点。否则,快速的进程可能会提前到达并在等待慢进程时超时。因此,用户有责任平衡进程之间的工作负载分布。有时,由于诸如网络延迟、资源竞争或不可预知的工作负荷波动等原因,处理速度不均衡是不可避免的。为避免这种情况下的超时,请确保在调用 init_process_group 时传递一个足够大的 timeout 值。

保存和加载检查点

在训练过程中使用 torch.savetorch.load 来检查点模块并从检查点恢复是很常见的。详细信息请参见 保存和加载模型。使用 DDP 时,一种优化方法是仅在一个进程中保存模型,然后在所有进程中加载模型,从而减少写入开销。这可行是因为所有进程从相同的参数开始,并且在反向传播中梯度被同步,因此优化器应该继续设置相同的参数值。如果使用此优化(即在一个进程中保存但在所有进程中恢复),请确保在保存完成之前没有进程开始加载。此外,在加载模块时,你需要提供一个适当的 map_location 参数,以防止进程进入其他进程的设备。如果缺少 map_locationtorch.load 将首先将模块加载到 CPU,然后将每个参数复制到保存它的地方,这会导致同一台机器上的所有进程使用同一组设备。有关更高级的故障恢复和弹性支持,请参考 TorchElastic

def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])


    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location, weights_only=True))

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)

    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Not necessary to use a dist.barrier() to guard the file deletion below
    # as the AllReduce ops in the backward pass of DDP already served as
    # a synchronization.

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()
    print(f"Finished running DDP checkpoint example on rank {rank}.")

将 DDP 与模型并行结合

DDP 还可用于多 GPU 模型。当训练具有大量数据的超大模型时,DDP 包裹多 GPU 模型尤其有帮助。

class ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)

    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)

当将一个多 GPU 模型传递给 DDP 时,device_idsoutput_device 必须未设置。输入和输出数据将由应用程序或模型的 forward() 方法正确地放置到设备上。

def demo_model_parallel(rank, world_size):
    print(f"Running DDP with model parallel example on rank {rank}.")
    setup(rank, world_size)

    # setup mp_model and devices for this process
    dev0 = rank * 2
    dev1 = rank * 2 + 1
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    # outputs will be on dev1
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running DDP with model parallel example on rank {rank}.")


if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    run_demo(demo_basic, world_size)
    run_demo(demo_checkpoint, world_size)
    world_size = n_gpus//2
    run_demo(demo_model_parallel, world_size)

使用 torch.distributed.run/torchrun 初始化 DDP

我们可以利用 PyTorch Elastic 简化 DDP 代码并更容易初始化工作。让我们仍然使用 Toymodel 示例并创建一个名为 elastic_ddp.py 的文件。

import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim

from torch.nn.parallel import DistributedDataParallel as DDP

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic():
    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    dist.init_process_group("nccl")
    rank = dist.get_rank()
    print(f"Start running basic DDP example on rank {rank}.")
    # create model and move it to GPU with id rank
    device_id = rank % torch.cuda.device_count()
    model = ToyModel().to(device_id)
    ddp_model = DDP(model, device_ids=[device_id])
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_id)
    loss_fn(outputs, labels).backward()
    optimizer.step()
    dist.destroy_process_group()
    print(f"Finished running basic DDP example on rank {rank}.")

if __name__ == "__main__":
    demo_basic()

然后可以在所有节点上运行一个 torch elastic/torchrun 命令来初始化上述的 DDP 工作:

torchrun --nnodes=2 --nproc_per_node=8 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=$MASTER_ADDR:29400 elastic_ddp.py

在上述示例中,我们在两个主机上运行 DDP 脚本,并且在每个主机上运行 8 个进程。也就是说,我们在 16 个 GPU 上运行此任务。请注意,$MASTER_ADDR 必须在所有节点中相同。

在这里,torchrun 将启动 8 个进程,并在它启动的节点上的每个进程上调用 elastic_ddp.py。但用户还需要应用集群管理工具(如 slurm)来实际在 2 个节点上运行此命令。

例如,在启用了 SLURM 的集群上,我们可以编写一个脚本来运行上述命令并设置 MASTER_ADDR

export MASTER_ADDR=$(scontrol show hostname ${SLURM_NODELIST} | head -n 1)

然后我们只需使用 SLURM 命令运行此脚本:srun --nodes=2 ./torchrun_script.sh

这只是一个例子;你可以选择自己的集群调度工具来启动 torchrun 作业。

有关 Elastic run 的更多信息,请参见 快速入门文档

文档

访问 PyTorch 的详细开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源