Shortcuts

简介 || DDP 是什么? || 单节点多 GPU 训练 || 容错能力 || 多节点训练 || minGPT 训练

使用 DDP 进行多 GPU 训练

Created On: Sep 27, 2022 | Last Updated: Nov 03, 2024 | Last Verified: Not Verified

作者: Suraj Subramanian

What you will learn
  • 如何将单 GPU 训练脚本迁移到通过 DDP 实现的多 GPU 训练

  • 设置分布式进程组

  • 在分布式环境中保存和加载模型

查看教程中使用的代码,托管于 GitHub

Prerequisites
  • 了解 DDP 如何工作 的高层概述

  • 具有多GPU的机器(本教程使用AWS p3.8xlarge实例)

  • 安装CUDA版本的PyTorch 安装

通过以下视频或在 YouTube 上观看教程。

上一节教程 中,我们了解了 DDP 工作原理的高层概述;现在我们来看如何在代码中使用 DDP。在本教程中,我们从一个单 GPU 训练脚本开始,迁移到在单节点上的 4 块 GPU 上运行。在此过程中,我们将讨论分布式训练中的重要概念并在代码中实现它们。

备注

如果您的模型包含任何 BatchNorm 层,则需要将其转换为 SyncBatchNorm,以同步 BatchNorm 层跨副本的运行状态。

使用辅助函数 torch.nn.SyncBatchNorm.convert_sync_batchnorm(model) 将模型中的所有 BatchNorm 层转换为 SyncBatchNorm

single_gpu.pymultigpu.py 的差异

以下是为启用 DDP 通常需要对单 GPU 训练脚本进行的更改。

导入

  • torch.multiprocessing 是 PyTorch 针对 Python 原生 multiprocessing 的封装。

  • 分布式进程组包含可以相互通信和同步的所有进程。

import torch
import torch.nn.functional as F
from utils import MyTrainDataset

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os

构建进程组

  • 首先,在初始化进程组之前,调用 set_device,为每个进程设置默认 GPU。这很重要,以防止在 GPU:0 上出现死锁或过度内存利用。

  • 进程组可以通过 TCP(默认)或来自共享文件系统进行初始化。阅读更多相关内容请访问 进程组初始化

  • init_process_group 用于初始化分布式进程组。

  • 阅读更多关于 选择 DDP 后端 的信息。

def ddp_setup(rank: int, world_size: int):
   """
   Args:
       rank: Unique identifier of each process
      world_size: Total number of processes
   """
   os.environ["MASTER_ADDR"] = "localhost"
   os.environ["MASTER_PORT"] = "12355"
   torch.cuda.set_device(rank)
   init_process_group(backend="nccl", rank=rank, world_size=world_size)

构建 DDP 模型

self.model = DDP(model, device_ids=[gpu_id])

分布输入数据

  • DistributedSampler 将输入数据划分到所有分布式进程中。

  • DataLoader 将数据集和

    采样器组合,并提供一个数据集的迭代器。

  • 每个进程将接收一个包含 32 个样本的输入批次;有效的批次大小为 32 * nprocs,使用 4 块 GPU 时为 128。

train_data = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=32,
    shuffle=False,  # We don't shuffle
    sampler=DistributedSampler(train_dataset), # Use the Distributed Sampler here.
)
  • 在每个 epoch 开始时调用 DistributedSamplerset_epoch() 方法是必需的,以确保跨多个 epoch 的随机打乱正确工作,否则,每个 epoch 的数据顺序将相同。

def _run_epoch(self, epoch):
    b_sz = len(next(iter(self.train_data))[0])
    self.train_data.sampler.set_epoch(epoch)   # call this additional line at every epoch
    for source, targets in self.train_data:
      ...
      self._run_batch(source, targets)

保存模型检查点

  • 我们只需要从一个进程保存模型检查点。否则,每个进程都会保存它自己的一份相同的模型。阅读更多关于以 DDP 保存和加载模型的信息,访问 此处

- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
  self._save_checkpoint(epoch)

警告

Collective 函数调用 是运行于所有分布式进程上的函数,用于汇总某些状态或值到特定进程。Collective 函数调用需要所有 rank 运行此代码。在此示例中,_save_checkpoint 不应该包含 Collective 函数调用,因为它仅在 rank:0 进程上运行。如果需要使用 Collective 调用,应在 if self.gpu_id == 0 检查之前完成。

运行分布式训练任务

  • 包括新的参数 rank``(替代 ``device)和 world_size

  • rank 是在调用 mp.spawn 时由 DDP 自动分配。

  • world_size 是训练任务的进程总数。对于 GPU 训练而言,这对应于使用的 GPU 数量,每个进程在专属的 GPU 上运行。

- def main(device, total_epochs, save_every):
+ def main(rank, world_size, total_epochs, save_every):
+  ddp_setup(rank, world_size)
   dataset, model, optimizer = load_train_objs()
   train_data = prepare_dataloader(dataset, batch_size=32)
-  trainer = Trainer(model, train_data, optimizer, device, save_every)
+  trainer = Trainer(model, train_data, optimizer, rank, save_every)
   trainer.train(total_epochs)
+  destroy_process_group()

if __name__ == "__main__":
   import sys
   total_epochs = int(sys.argv[1])
   save_every = int(sys.argv[2])
-  device = 0      # shorthand for cuda:0
-  main(device, total_epochs, save_every)
+  world_size = torch.cuda.device_count()
+  mp.spawn(main, args=(world_size, total_epochs, save_every,), nprocs=world_size)

代码如下所示:

进一步阅读

文档

访问 PyTorch 的详细开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源