• Tutorials >
  • 结合分布式数据并行与分布式 RPC 框架
Shortcuts

结合分布式数据并行与分布式 RPC 框架

Created On: Jul 28, 2020 | Last Updated: Jun 06, 2023 | Last Verified: Not Verified

作者Pritam DamaniaYi Wang

备注

编辑 View and edit this tutorial in github.

本教程使用一个简单的示例演示如何结合 DistributedDataParallel (DDP) 与 分布式 RPC 框架,将分布式数据并行与分布式模型并行结合,用于训练一个简单模型。示例的源代码可以在 这里 找到。

之前的教程,入门分布式数据并行入门分布式RPC框架,分别介绍了如何进行分布式数据并行和分布式模型并行训练。然而,在某些训练范式中,可能需要结合这两种技术。例如:

  1. 如果我们有一个包含稀疏部分(大型嵌入表)和密集部分(全连接层)的模型,我们可能希望将嵌入表放在参数服务器上,并使用 DistributedDataParallel 在多个训练器之间复制全连接层。分布式RPC框架 可用于对参数服务器上的嵌入表进行查找。

  2. 启用如 PipeDream 论文中所述的混合并行机制。我们可以使用 分布式RPC框架 在多个工作者之间创建模型阶段的流水线,并在需要时使用 DistributedDataParallel 复制每一个阶段。


在本教程中,我们将深入探讨上述的案例1。我们在设置中总共有4个工作者,如下所示:

  1. 1个Master,负责在参数服务器上创建嵌入表(nn.EmbeddingBag)。Master还负责驱动两个训练器上的训练循环。

  2. 1个参数服务器,主要负责将嵌入表保存在内存中,并回应Master和训练器的RPC请求。

  3. 2个训练器,存储一个全连接层(nn.Linear),使用 DistributedDataParallel 在训练器之间进行复制。这些训练器还负责执行前向传播、反向传播和优化器步骤。


整个训练过程按以下方式运行:

  1. Master在参数服务器上创建了一个包含嵌入表的 RemoteModule

  2. 然后,Master启动了训练器上的训练循环,并将远程模块传递给训练器。

  3. 训练器创建了一个 HybridModel,首先使用Master提供的远程模块执行嵌入查找,然后执行包含在DDP中的全连接层。

  4. 训练器执行模型的前向传播,并利用损失执行反向传播,使用 Distributed Autograd

  5. 在反向传播过程中,首先计算全连接层的梯度,并通过DDP的allreduce机制同步到所有训练器。

  6. 接下来,Distributed Autograd将梯度传播到参数服务器,在那里更新嵌入表的梯度。

  7. 最后,使用 分布式优化器 更新所有参数。

注意

如果你将DDP与RPC结合使用,反向传播始终应该使用 Distributed Autograd

现在,我们详细讨论每个部分。首先,在进行任何训练之前,我们需要设置所有工作者。我们创建了4个进程,其中rank 0和1是训练器,rank 2是Master,rank 3是参数服务器。

我们使用TCP初始化方法在所有4个工作者上初始化RPC框架。一旦RPC初始化完成,Master创建了一个包含在参数服务器上的 EmbeddingBag 层的远程模块 (RemoteModule)。然后,Master通过对每个训练器使用 rpc_async 调用``_run_trainer``方法,逐一启动训练循环。最后,Master等待所有训练完成后退出。

训练器首先使用 init_process_group 为DDP初始化一个 ProcessGroup,其world_size为2(即两个训练器)。接下来,他们使用TCP初始化方法初始化RPC框架。请注意RPC初始化和ProcessGroup初始化中的端口不同,这是为了避免两个框架初始化过程中出现端口冲突。一旦初始化完成,训练器仅需等待来自Master的``_run_trainer`` RPC请求。

参数服务器仅需初始化RPC框架,并等待来自训练器和Master的RPC请求。

def run_worker(rank, world_size):
    r"""
    A wrapper function that initializes RPC, calls the function, and shuts down
    RPC.
    """

    # We need to use different port numbers in TCP init_method for init_rpc and
    # init_process_group to avoid port conflicts.
    rpc_backend_options = TensorPipeRpcBackendOptions()
    rpc_backend_options.init_method = "tcp://localhost:29501"

    # Rank 2 is master, 3 is ps and 0 and 1 are trainers.
    if rank == 2:
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        remote_emb_module = RemoteModule(
            "ps",
            torch.nn.EmbeddingBag,
            args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
            kwargs={"mode": "sum"},
        )

        # Run the training loop on trainers.
        futs = []
        for trainer_rank in [0, 1]:
            trainer_name = "trainer{}".format(trainer_rank)
            fut = rpc.rpc_async(
                trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
            )
            futs.append(fut)

        # Wait for all training to finish.
        for fut in futs:
            fut.wait()
    elif rank <= 1:
        # Initialize process group for Distributed DataParallel on trainers.
        dist.init_process_group(
            backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500"
        )

        # Initialize RPC.
        trainer_name = "trainer{}".format(rank)
        rpc.init_rpc(
            trainer_name,
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        # Trainer just waits for RPCs from master.
    else:
        rpc.init_rpc(
            "ps",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__ == "__main__":
    # 2 trainers, 1 parameter server, 1 master.
    world_size = 4
    mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)

在讨论训练器的细节之前,我们先介绍训练器使用的 HybridModel。如下面所述,HybridModel 的初始化需要一个保存嵌入表(remote_emb_module)的远程模块,以及用于DDP的``设备``。模型的初始化将一个 nn.Linear 层封装在DDP中,以复制和同步该层到所有训练器。

模型的forward方法相对简单。它使用远程模块的``forward``在参数服务器上执行嵌入查找,并将其输出传递给全连接层。

class HybridModel(torch.nn.Module):
    r"""
    The model consists of a sparse part and a dense part.
    1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel.
    2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server.
    This remote model can get a Remote Reference to the embedding table on the parameter server.
    """

    def __init__(self, remote_emb_module, device):
        super(HybridModel, self).__init__()
        self.remote_emb_module = remote_emb_module
        self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
        self.device = device

    def forward(self, indices, offsets):
        emb_lookup = self.remote_emb_module.forward(indices, offsets)
        return self.fc(emb_lookup.cuda(self.device))

接下来,我们来看训练器的设置。训练器首先利用远程模块创建上述的 HybridModel,该模块保存参数服务器上的嵌入表,以及自身的rank。

现在,我们需要通过 DistributedOptimizer 检索需要优化的所有参数的RRef列表。要从参数服务器检索嵌入表的参数,我们可以调用远程模块的`remote_parameters <https://pytorch.org/docs/master/rpc.html#torch.distributed.nn.api.remote_module.RemoteModule.remote_parameters>`__,基本上遍历嵌入表的所有参数并返回一个RRef列表。训练器通过RPC调用该方法,从参数服务器接收需要的参数的RRef列表。由于分布式优化器总是需要优化参数的RRef列表,我们需要为局部的全连接层参数创建RRef。这是通过遍历``model.fc.parameters()``、为每个参数创建RRef并将其附加到``remote_parameters()``返回的列表中完成的。请注意,我们不能使用``model.parameters()``,因为它会递归调用``model.remote_emb_module.parameters()``,这不被``RemoteModule``支持。

最后,我们使用所有的RRef创建分布式优化器,并定义了一个交叉熵损失函数。

def _run_trainer(remote_emb_module, rank):
    r"""
    Each trainer runs a forward pass which involves an embedding lookup on the
    parameter server and running nn.Linear locally. During the backward pass,
    DDP is responsible for aggregating the gradients for the dense part
    (nn.Linear) and distributed autograd ensures gradients updates are
    propagated to the parameter server.
    """

    # Setup the model.
    model = HybridModel(remote_emb_module, rank)

    # Retrieve all model parameters as rrefs for DistributedOptimizer.

    # Retrieve parameters for embedding table.
    model_parameter_rrefs = model.remote_emb_module.remote_parameters()

    # model.fc.parameters() only includes local parameters.
    # NOTE: Cannot call model.parameters() here,
    # because this will call remote_emb_module.parameters(),
    # which supports remote_parameters() but not parameters().
    for param in model.fc.parameters():
        model_parameter_rrefs.append(RRef(param))

    # Setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model_parameter_rrefs,
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

现在我们介绍在每个训练器上运行的主要训练循环。get_next_batch 只是一个生成随机输入和目标用于训练的辅助函数。我们运行多次epoch的训练循环,对于每一个批次:

  1. 设置分布式自动微分的 Distributed Autograd Context

  2. 运行模型的forward传播并获取其输出。

  3. 使用损失函数根据输出和目标计算损失。

  4. 使用分布式自动微分通过损失执行分布式反向传播。

  5. 最后,运行分布式优化器步骤以优化所有参数。

    def get_next_batch(rank):
        for _ in range(10):
            num_indices = random.randint(20, 50)
            indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)

            # Generate offsets.
            offsets = []
            start = 0
            batch_size = 0
            while start < num_indices:
                offsets.append(start)
                start += random.randint(1, 10)
                batch_size += 1

            offsets_tensor = torch.LongTensor(offsets)
            target = torch.LongTensor(batch_size).random_(8).cuda(rank)
            yield indices, offsets_tensor, target

    # Train for 100 epochs
    for epoch in range(100):
        # create distributed autograd context
        for indices, offsets, target in get_next_batch(rank):
            with dist_autograd.context() as context_id:
                output = model(indices, offsets)
                loss = criterion(output, target)

                # Run distributed backward pass
                dist_autograd.backward(context_id, [loss])

                # Tun distributed optimizer
                opt.step(context_id)

                # Not necessary to zero grads as each iteration creates a different
                # distributed autograd context which hosts different grads
        print("Training done for epoch {}".format(epoch))

整个示例的源代码可以在 此处 找到。

文档

访问 PyTorch 的详细开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源