Shortcuts

分布式 RPC 框架入门

Created On: Jan 01, 2020 | Last Updated: Jan 21, 2025 | Last Verified: Nov 05, 2024

作者: Shen Li

备注

编辑github 中查看和编辑此教程。

前提条件:

本教程使用两个简单示例演示如何利用具有实验性特性的 torch.distributed.rpc 包(在 PyTorch v1.4 首次引入)创建分布式训练。两个示例的源码可在 PyTorch examples 中找到。

前面的教程 分布式数据并行入门使用 PyTorch 编写分布式应用程序 描述了 DistributedDataParallel,它支持一种特定的训练范式,将模型复制到多个进程中,每个进程处理输入数据的一部分。有时可能会遇到需要不同训练范式的场景,例如:

  1. 在强化学习中,从环境中获取训练数据可能相对昂贵,而模型本身可能相对较小。在这种情况下,可能需要生成多个观察者并行运行,并共享单个代理。在这个情况下,代理负责本地训练,但应用仍然需要库来在观察者和训练器之间发送和接收数据。

  2. 你的模型可能太大,无法容纳在单台机器的 GPU 上,因此需要一个库来帮助将模型拆分到多台机器上。或者你可能在实现一种 参数服务器 训练框架,其中模型参数和训练器位于不同的机器上。

torch.distributed.rpc 包可以帮助解决以上场景。在场景 1 中,RPCRRef 允许从一个工作器向另一个工作器发送数据,并轻松引用远程数据对象。在场景 2 中,分布式自动梯度分布式优化器 使得执行后向传播和优化器步骤时与本地训练类似。在接下来的两个部分中,我们将使用一个强化学习示例和一个语言模型示例演示 torch.distributed.rpc 的 API。请注意,本教程并不是为了构建最准确或最高效的模型来解决问题,其主要目的是展示如何使用 torch.distributed.rpc 包来构建分布式训练应用程序。

使用 RPC 和 RRef 进行分布式强化学习

本节描述了使用 RPC 构建一个示例性分布式强化学习模型以解决来自 OpenAI Gym 的 CartPole-v1。策略代码主要借鉴了现有的单线程 示例,如下所示。我们将略过 Policy 设计的细节,而专注于 RPC 的使用。

import torch.nn as nn
import torch.nn.functional as F

class Policy(nn.Module):

    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=1)

我们准备好展示观察者。在此示例中,每个观察者创建自己的环境,并等待代理的指令以运行一个回合。在每个回合中,一个观察者最多循环 n_steps 迭代,在每次迭代中,它使用 RPC 将其环境状态传递给代理并获得一个动作作为响应。然后它对其环境应用该动作,并从环境中获得奖励和下一个状态。此后,观察者使用另一个 RPC 向代理报告奖励。同样,请注意,这显然不是最高效的观察者实现。例如,一个简单的优化可以是将当前状态和最后奖励打包到一个 RPC 中以减少通信开销。然而,这里的目标是展示 RPC API 而不是构建 CartPole 的最佳求解器。因此,让我们在此示例中保持逻辑简单,并将这两个步骤显式分开。

import argparse
import gym
import torch.distributed.rpc as rpc

parser = argparse.ArgumentParser(
    description="RPC Reinforcement Learning Example",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

parser.add_argument('--world_size', default=2, type=int, metavar='W',
                    help='number of workers')
parser.add_argument('--log_interval', type=int, default=10, metavar='N',
                    help='interval between training status logs')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
                    help='how much to value future rewards')
parser.add_argument('--seed', type=int, default=1, metavar='S',
                    help='random seed  for reproducibility')
args = parser.parse_args()

class Observer:

    def __init__(self):
        self.id = rpc.get_worker_info().id
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)

    def run_episode(self, agent_rref):
        state, ep_reward = self.env.reset(), 0
        for _ in range(10000):
            # send the state to the agent to get an action
            action = agent_rref.rpc_sync().select_action(self.id, state)

            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)

            # report the reward to the agent for training purpose
            agent_rref.rpc_sync().report_reward(self.id, reward)

            # finishes after the number of self.env._max_episode_steps
            if done:
                break

代理的代码稍微复杂一些,我们将其分成多个部分。在此示例中,代理既充当训练器又充当主节点,因此它向多个分布式观察者发送运行回合的指令,还会在本地记录所有动作和奖励,这些将在每个回合后训练阶段使用。下面的代码展示了 Agent 的构造函数,其中大多数行在初始化各种组件。最后的循环在其他工作器上远程初始化观察者,并在本地保留这些观察者的 RRef。代理稍后将使用这些观察者的 RRefs 发送指令。应用程序无需担心 RRefs 的生命周期。每个 RRef 的所有者维护一个引用计数映射以跟踪其生命周期,并保证只要有任何活动用户使用该 RRef,远程数据对象不会被删除。请参考 RRef 设计文档 了解详情。

import gym
import numpy as np

import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical

class Agent:
    def __init__(self, world_size):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.saved_log_probs = {}
        self.policy = Policy()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.eps = np.finfo(np.float32).eps.item()
        self.running_reward = 0
        self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(remote(ob_info, Observer))
            self.rewards[ob_info.id] = []
            self.saved_log_probs[ob_info.id] = []

接下来,代理向观察者公开两个 API,用于选择动作和报告奖励。那些函数仅在代理上本地运行,但将通过 RPC 被观察者触发。

class Agent:
    ...
    def select_action(self, ob_id, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.policy(state)
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()

    def report_reward(self, ob_id, reward):
        self.rewards[ob_id].append(reward)

让我们在代理(agent)上添加一个``run_episode``函数,该函数告诉所有观察者执行一个回合。在此函数中,首先创建一个列表,用于收集异步RPC的futures,然后遍历所有观察者的``RRefs``,进行异步RPC调用。在这些RPC中,代理还将自己的``RRef``传递给观察者,以便观察者也可以调用代理上的函数。如上所示,每个观察者将向代理发起RPC请求,这些是嵌套的RPC调用。在每个回合之后,``saved_log_probs``和``rewards``将包含记录的动作概率和奖励。

class Agent:
    ...
    def run_episode(self):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(
                rpc_async(
                    ob_rref.owner(),
                    ob_rref.rpc_sync().run_episode,
                    args=(self.agent_rref,)
                )
            )

        # wait until all obervers have finished this episode
        for fut in futs:
            fut.wait()

最后,在一个回合之后,代理需要训练模型,这在下面的``finish_episode``函数中实现。该函数中没有RPC,且大部分内容借鉴自单线程的`示例 <https://github.com/pytorch/examples/blob/master/reinforcement_learning>`__。因此,我们略过其具体内容的描述。

class Agent:
    ...
    def finish_episode(self):
      # joins probs and rewards from different observers into lists
      R, probs, rewards = 0, [], []
      for ob_id in self.rewards:
          probs.extend(self.saved_log_probs[ob_id])
          rewards.extend(self.rewards[ob_id])

      # use the minimum observer reward to calculate the running reward
      min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
      self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward

      # clear saved probs and rewards
      for ob_id in self.rewards:
          self.rewards[ob_id] = []
          self.saved_log_probs[ob_id] = []

      policy_loss, returns = [], []
      for r in rewards[::-1]:
          R = r + args.gamma * R
          returns.insert(0, R)
      returns = torch.tensor(returns)
      returns = (returns - returns.mean()) / (returns.std() + self.eps)
      for log_prob, R in zip(probs, returns):
          policy_loss.append(-log_prob * R)
      self.optimizer.zero_grad()
      policy_loss = torch.cat(policy_loss).sum()
      policy_loss.backward()
      self.optimizer.step()
      return min_reward

有了``Policy``、Observer``和``Agent``类,我们便可以启动多个进程进行分布式训练。在此示例中,所有进程运行相同的``run_worker``函数,并通过排名区分其角色。排名为0的进程总是作为代理,其余排名的进程为观察者。代理作为主控,通过反复调用``run_episode``和``finish_episode,直到运行奖励超过环境指定的奖励阈值。所有观察者被动等待来自代理的命令。代码被`rpc.init_rpc <https://pytorch.org/docs/stable/rpc.html#torch.distributed.rpc.init_rpc>`__和`rpc.shutdown <https://pytorch.org/docs/stable/rpc.html#torch.distributed.rpc.shutdown>`__包装,这分别用于初始化和终止RPC实例。更多详情请参见`API页 <https://pytorch.org/docs/stable/rpc.html>`__。

import os
from itertools import count

import torch.multiprocessing as mp

AGENT_NAME = "agent"
OBSERVER_NAME="obs{}"

def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)

        agent = Agent(world_size)
        print(f"This will run until reward threshold of {agent.reward_threshold}"
                " is reached. Ctrl+C to exit.")
        for i_episode in count(1):
            agent.run_episode()
            last_reward = agent.finish_episode()

            if i_episode % args.log_interval == 0:
                print(f"Episode {i_episode}\tLast reward: {last_reward:.2f}\tAverage reward: "
                    f"{agent.running_reward:.2f}")
            if agent.running_reward > agent.reward_threshold:
                print(f"Solved! Running reward is now {agent.running_reward}!")
                break
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from the agent

    # block until all rpcs finish, and shutdown the RPC instance
    rpc.shutdown()


mp.spawn(
    run_worker,
    args=(args.world_size, ),
    nprocs=args.world_size,
    join=True
)

以下是使用`world_size=2`进行训练时的一些示例输出。

This will run until reward threshold of 475.0 is reached. Ctrl+C to exit.
Episode 10      Last reward: 26.00      Average reward: 10.01
Episode 20      Last reward: 16.00      Average reward: 11.27
Episode 30      Last reward: 49.00      Average reward: 18.62
Episode 40      Last reward: 45.00      Average reward: 26.09
Episode 50      Last reward: 44.00      Average reward: 30.03
Episode 60      Last reward: 111.00     Average reward: 42.23
Episode 70      Last reward: 131.00     Average reward: 70.11
Episode 80      Last reward: 87.00      Average reward: 76.51
Episode 90      Last reward: 86.00      Average reward: 95.93
Episode 100     Last reward: 13.00      Average reward: 123.93
Episode 110     Last reward: 33.00      Average reward: 91.39
Episode 120     Last reward: 73.00      Average reward: 76.38
Episode 130     Last reward: 137.00     Average reward: 88.08
Episode 140     Last reward: 89.00      Average reward: 104.96
Episode 150     Last reward: 97.00      Average reward: 98.74
Episode 160     Last reward: 150.00     Average reward: 100.87
Episode 170     Last reward: 126.00     Average reward: 104.38
Episode 180     Last reward: 500.00     Average reward: 213.74
Episode 190     Last reward: 322.00     Average reward: 300.22
Episode 200     Last reward: 165.00     Average reward: 272.71
Episode 210     Last reward: 168.00     Average reward: 233.11
Episode 220     Last reward: 184.00     Average reward: 195.02
Episode 230     Last reward: 284.00     Average reward: 208.32
Episode 240     Last reward: 395.00     Average reward: 247.37
Episode 250     Last reward: 500.00     Average reward: 335.42
Episode 260     Last reward: 500.00     Average reward: 386.30
Episode 270     Last reward: 500.00     Average reward: 405.29
Episode 280     Last reward: 500.00     Average reward: 443.29
Episode 290     Last reward: 500.00     Average reward: 464.65
Solved! Running reward is now 475.3163778435275!

在这个示例中,我们展示了如何使用RPC作为通信媒介来跨工作节点传递数据,以及如何使用RRef来引用远程对象。虽然你可以直接基于``ProcessGroup``的``send``和``recv``API构建整个结构,或者使用其他的通信/RPC库,但通过使用`torch.distributed.rpc`,你可以获得原生支持,并在底层实现持续优化的性能。

接下来,我们将展示如何将RPC与RRef结合分布式自动微分(distributed autograd)和分布式优化器(distributed optimizer),以实现分布式模型并行训练。

使用分布式自动微分和分布式优化器的分布式RNN

在本节中,我们使用一个RNN模型来展示如何利用RPC API构建分布式模型并行训练。示例中的RNN模型非常小,可以轻松放入单个GPU,但我们仍然将其层分布到两个不同的工作节点上以演示该方法。开发者可以采用类似技术,将更大的模型分布到多个设备和机器上。

RNN模型的设计借鉴自PyTorch `示例 <https://github.com/pytorch/examples/tree/master/word_language_model>`__库中的词语言模型,它包含三个主要组件:一个嵌入表、一个``LSTM``层和一个解码器。以下代码将嵌入表和解码器封装为子模块,以便它们的构造函数可以传递给RPC API。在``EmbeddingTable``子模块中,我们有意将``Embedding``层放置在GPU上,以涵盖该用例。在1.4版本中,RPC总是会在目标工作节点上创建CPU张量参数或返回值。如果函数接收GPU张量参数,你需要显式地将其移动到适当的设备上。

class EmbeddingTable(nn.Module):
    r"""
    Encoding layers of the RNNModel
    """
    def __init__(self, ntoken, ninp, dropout):
        super(EmbeddingTable, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.encoder = nn.Embedding(ntoken, ninp).cuda()
        self.encoder.weight.data.uniform_(-0.1, 0.1)

    def forward(self, input):
        return self.drop(self.encoder(input.cuda()).cpu()


class Decoder(nn.Module):
    def __init__(self, ntoken, nhid, dropout):
        super(Decoder, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.decoder = nn.Linear(nhid, ntoken)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-0.1, 0.1)

    def forward(self, output):
        return self.decoder(self.drop(output))

有了上述子模块,我们现在可以使用RPC将它们组合在一起,创建一个RNN模型。在以下代码中,``ps``表示参数服务器,它托管嵌入表和解码器的参数。构造函数使用`remote <https://pytorch.org/docs/stable/rpc.html#torch.distributed.rpc.remote>`__ API在参数服务器上创建一个``EmbeddingTable``对象和一个``Decoder``对象,并在本地创建``LSTM``子模块。在前向传播过程中,训练器使用``EmbeddingTable``的``RRef``查找到远程子模块,并通过RPC将输入数据传递给``EmbeddingTable``获取查找结果。然后将结果通过本地``LSTM``层,最后利用另一个RPC调用将输出传递给``Decoder``子模块。一般来说,为了实现分布式模型并行训练,开发者可以将模型分解为子模块,调用RPC在远程创建子模块实例,并在需要时通过``RRef``查找到它们。如以下代码所示,它看起来与单机的模型并行训练非常相似。主要区别在于将``Tensor.to(device)``替换为RPC函数。

class RNNModel(nn.Module):
    def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
        super(RNNModel, self).__init__()

        # setup embedding table remotely
        self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
        # setup LSTM locally
        self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
        # setup decoder remotely
        self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))

    def forward(self, input, hidden):
        # pass input to the remote embedding table and fetch emb tensor back
        emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
        output, hidden = self.rnn(emb, hidden)
        # pass output to the rremote decoder and get the decoded output back
        decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
        return decoded, hidden

在介绍分布式优化器之前,添加一个辅助函数来生成模型参数的``RRefs``列表,这些列表将由分布式优化器使用。在本地训练中,应用程序可以调用``Module.parameters()``以获取所有参数张量的引用,并将其传递给本地优化器以进行后续更新。然而,在分布式训练场景中,由于一些参数位于远程机器上,无法使用相同的API。因此,分布式优化器接受一个``RRefs``列表,每个模型参数(无论本地或远程)对应一个``RRef``。辅助函数非常简单,只需调用``Module.parameters()``并在每个参数上创建一个本地``RRef``即可。

def _parameter_rrefs(module):
    param_rrefs = []
    for param in module.parameters():
        param_rrefs.append(RRef(param))
    return param_rrefs

然后,由于``RNNModel``包含三个子模块,我们需要三次调用``_parameter_rrefs``,并将它们封装到另一个辅助函数中。

class RNNModel(nn.Module):
    ...
    def parameter_rrefs(self):
        remote_params = []
        # get RRefs of embedding table
        remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
        # create RRefs for local parameters
        remote_params.extend(_parameter_rrefs(self.rnn))
        # get RRefs of decoder
        remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
        return remote_params

现在,我们准备好实现训练循环。在初始化模型参数后,创建``RNNModel``和``DistributedOptimizer``。分布式优化器将接受一个参数``RRefs``列表,找到所有不同的所有者工作节点,并在每个所有者工作节点上使用给定的参数(如``lr=0.05``)创建给定的本地优化器(例如此处的``SGD``,也可以使用其他本地优化器)。

在训练循环中,首先创建一个分布式自动微分上下文,它将帮助分布式自动微分引擎查找梯度以及涉及的RPC发送/接收函数。分布式自动微分引擎的设计详见其`设计说明 <https://pytorch.org/docs/stable/rpc/distributed_autograd.html>`__。接着,启动前向传播过程,就像使用本地模型一样,并运行分布式反向传播。对于分布式反向传播,只需指定一个根节点列表,在这种情况下即损失``Tensor``。分布式自动微分引擎会自动遍历分布式计算图并正确写入梯度。接下来运行分布式优化器的``step``函数,该函数将联系所有相关的本地优化器以更新模型参数。与本地训练相比,区别是你不需要调用``zero_grad()``,因为每个自动微分上下文都有专用空间存储梯度,并且由于每次迭代都会创建一个新的上下文,不同迭代的梯度不会累积到相同的``Tensors``集合中。

def run_trainer():
    batch = 5
    ntoken = 10
    ninp = 2

    nhid = 3
    nindices = 3
    nlayers = 4
    hidden = (
        torch.randn(nlayers, nindices, nhid),
        torch.randn(nlayers, nindices, nhid)
    )

    model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)

    # setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

    def get_next_batch():
        for _ in range(5):
            data = torch.LongTensor(batch, nindices) % ntoken
            target = torch.LongTensor(batch, ntoken) % nindices
            yield data, target

    # train for 10 iterations
    for epoch in range(10):
        for data, target in get_next_batch():
            # create distributed autograd context
            with dist_autograd.context() as context_id:
                hidden[0].detach_()
                hidden[1].detach_()
                output, hidden = model(data, hidden)
                loss = criterion(output, target)
                # run distributed backward pass
                dist_autograd.backward(context_id, [loss])
                # run distributed optimizer
                opt.step(context_id)
                # not necessary to zero grads since they are
                # accumulated into the distributed autograd context
                # which is reset every iteration.
        print("Training epoch {}".format(epoch))

最后,添加一些粘合代码以启动参数服务器和训练器进程。

def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 1:
        rpc.init_rpc("trainer", rank=rank, world_size=world_size)
        _run_trainer()
    else:
        rpc.init_rpc("ps", rank=rank, world_size=world_size)
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)

文档

访问 PyTorch 的详细开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源