使用异步执行实现批量 RPC 处理

原文:https://pytorch.org/tutorials/intermediate/rpc_async_execution.html

作者Shen Li

先决条件:

本教程演示了如何使用@rpc.functions.async_execution装饰器来构建批量 RPC 应用,该装饰器通过减少阻止的 RPC 线程数和合并被调用方上的 CUDA 操作来帮助加快训练速度。 这使用 TorchServer 的相同想法进行批量推断

注意

本教程需要 PyTorch v1.6.0 或更高版本。

基础知识

先前的教程显示了使用torch.distributed.rpc构建分布式训练应用的步骤,但并未详细说明在处理 RPC 请求时被调用方发生的情况。 从 PyTorch v1.5 开始,每个 RPC 请求都会在被调用方上阻塞一个线程,以在该请求中执行该函数,直到该函数返回为止。 这适用于许多用例,但有一个警告。 如果用户函数例如通过嵌套 RPC 调用在 IO 上阻塞,或者例如在等待其他 RPC 请求解除阻塞的信号时阻塞,则被调用方上的 RPC 线程将必须空闲,直到 IO 完成或发生信令事件为止。 结果,RPC 被调用者可能使用了不必要的更多线程。 造成此问题的原因是 RPC 将用户函数视为黑盒,并且几乎不了解该函数会发生什么。 为了允许用户函数产生和释放 RPC 线程,需要向 RPC 系统提供更多提示。

从 v1.6.0 开始,PyTorch 通过引入两个新概念来解决此问题:

  • torch.futures.Future 类型封装了异步执行,还支持安装回调函数。
  • 一个@rpc.functions.async_execution装饰器,允许应用告诉被调用方目标函数将返回将来的函数,并且在执行期间可以暂停并产生多次。

使用这两个工具,应用代码可以将用户函数分解为多个较小的函数,将它们作为Future对象上的回调链接在一起,然后返回包含最终结果的Future。 在被调用方,当获取Future对象时,它还将安装后续的 RPC 响应准备和通讯作为回调,这将在最终结果准备好时触发。 这样,被调用者不再需要阻塞一个线程并等待直到最终返回值准备就绪。 有关简单示例,请参考@rpc.functions.async_execution 的 API 文档。

除了减少被调用方上的空闲线程数之外,这些工具还有助于使批量 RPC 处理更容易,更快捷。 本教程的以下两节演示了如何使用@rpc.functions.async_execution装饰器来构建分布式批更新参数服务器和批量强化学习应用。

批量更新参数服务器

考虑具有一个参数服务器(PS)和多个训练器的同步参数服务器训练应用。 在此应用中,PS 保留参数并等待所有训练器报告坡度。 在每次迭代中,它都会等到收到所有训练者的梯度后,再一次更新所有参数。 下面的代码显示 PS 类的实现。 update_and_fetch_model方法是用@rpc.functions.async_execution装饰的,将由训练器调用。 每次调用都会返回一个Future对象,该对象将填充有更新的模型。 大多数训练器发起的调用仅将梯度累积到.grad字段,立即返回,并在 PS 上产生 RPC 线程。 最后到达的训练器将触发优化器步骤,并消耗所有先前报告的梯度。 然后,它使用更新的模型设置future_model,该模型又通过Future对象通知其他训练器的所有先前请求,并将更新后的模型发送给所有训练器。

  1. import threading
  2. import torchvision
  3. import torch
  4. import torch.distributed.rpc as rpc
  5. from torch import optim
  6. num_classes, batch_update_size = 30, 5
  7. class BatchUpdateParameterServer(object):
  8. def __init__(self, batch_update_size=batch_update_size):
  9. self.model = torchvision.models.resnet50(num_classes=num_classes)
  10. self.lock = threading.Lock()
  11. self.future_model = torch.futures.Future()
  12. self.batch_update_size = batch_update_size
  13. self.curr_update_size = 0
  14. self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
  15. for p in self.model.parameters():
  16. p.grad = torch.zeros_like(p)
  17. def get_model(self):
  18. return self.model
  19. @staticmethod
  20. @rpc.functions.async_execution
  21. def update_and_fetch_model(ps_rref, grads):
  22. # Using the RRef to retrieve the local PS instance
  23. self = ps_rref.local_value()
  24. with self.lock:
  25. self.curr_update_size += 1
  26. # accumulate gradients into .grad field
  27. for p, g in zip(self.model.parameters(), grads):
  28. p.grad += g
  29. # Save the current future_model and return it to make sure the
  30. # returned Future object holds the correct model even if another
  31. # thread modifies future_model before this thread returns.
  32. fut = self.future_model
  33. if self.curr_update_size >= self.batch_update_size:
  34. # update the model
  35. for p in self.model.parameters():
  36. p.grad /= self.batch_update_size
  37. self.curr_update_size = 0
  38. self.optimizer.step()
  39. self.optimizer.zero_grad()
  40. # by settiing the result on the Future object, all previous
  41. # requests expecting this updated model will be notified and
  42. # the their responses will be sent accordingly.
  43. fut.set_result(self.model)
  44. self.future_model = torch.futures.Future()
  45. return fut

对于训练器,它们都使用来自 PS 的相同参数集进行初始化。 在每次迭代中,每位训练器首先进行前进和后退操作,以局部生成梯度。 然后,每个训练器都使用 RPC 向 PS 报告其梯度,并通过同一 RPC 请求的返回值取回更新的参数。 在训练器的实现中,目标函数是否标记有@rpc.functions.async_execution都没有关系。 训练器只需使用rpc_sync调用update_and_fetch_model,这会阻塞训练器,直到返回更新的模型。

  1. batch_size, image_w, image_h = 20, 64, 64
  2. class Trainer(object):
  3. def __init__(self, ps_rref):
  4. self.ps_rref, self.loss_fn = ps_rref, torch.nn.MSELoss()
  5. self.one_hot_indices = torch.LongTensor(batch_size) \
  6. .random_(0, num_classes) \
  7. .view(batch_size, 1)
  8. def get_next_batch(self):
  9. for _ in range(6):
  10. inputs = torch.randn(batch_size, 3, image_w, image_h)
  11. labels = torch.zeros(batch_size, num_classes) \
  12. .scatter_(1, self.one_hot_indices, 1)
  13. yield inputs.cuda(), labels.cuda()
  14. def train(self):
  15. name = rpc.get_worker_info().name
  16. # get initial model parameters
  17. m = self.ps_rref.rpc_sync().get_model().cuda()
  18. # start training
  19. for inputs, labels in self.get_next_batch():
  20. self.loss_fn(m(inputs), labels).backward()
  21. m = rpc.rpc_sync(
  22. self.ps_rref.owner(),
  23. BatchUpdateParameterServer.update_and_fetch_model,
  24. args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
  25. ).cuda()

在本教程中,我们将跳过启动多个进程的代码,有关完整实现,请参考示例回购。 请注意,可以在没有@rpc.functions.async_execution装饰器的情况下实现批量。 但是,这将需要在 PS 上阻塞更多的 RPC 线程,或者使用另一轮 RPC 来获取更新的模型,后者将增加代码的复杂性和通信开销。

本节使用一个简单的参数服务器训练示例来说明如何使用@rpc.functions.async_execution装饰器实现批量 RPC 应用。 在下一节中,我们将使用批量重新实现上一分布式 RPC 框架入门指南中的强化学习示例,并演示其对训练速度的影响。

批量 CartPole 求解器

本节以 OpenAI Gym 中的 CartPole-v1 为例,说明批量 RPC 的性能影响。 请注意,我们的目标是演示@rpc.functions.async_execution的用法,而不是构建最佳的 CartPole 求解器或解决大多数不同的 RL 问题,我们使用非常简单的策略和奖励计算策略,并将重点放在多观察者单智能体的批量 RPC 实现。 我们使用与前面的教程类似的Policy模型,如下所示。 与上一教程相比,不同之处在于其构造器使用了一个附加的batch参数来控制F.softmaxdim参数,因为进行批量时,forward函数中的x参数包含来自多个观察者的状态,因此尺寸需要适当更改。 其他所有内容保持不变。

  1. import argparse
  2. import torch.nn as nn
  3. import torch.nn.functional as F
  4. parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example')
  5. parser.add_argument('--gamma', type=float, default=1.0, metavar='G',
  6. help='discount factor (default: 1.0)')
  7. parser.add_argument('--seed', type=int, default=543, metavar='N',
  8. help='random seed (default: 543)')
  9. parser.add_argument('--num-episode', type=int, default=10, metavar='E',
  10. help='number of episodes (default: 10)')
  11. args = parser.parse_args()
  12. torch.manual_seed(args.seed)
  13. class Policy(nn.Module):
  14. def __init__(self, batch=True):
  15. super(Policy, self).__init__()
  16. self.affine1 = nn.Linear(4, 128)
  17. self.dropout = nn.Dropout(p=0.6)
  18. self.affine2 = nn.Linear(128, 2)
  19. self.dim = 2 if batch else 1
  20. def forward(self, x):
  21. x = self.affine1(x)
  22. x = self.dropout(x)
  23. x = F.relu(x)
  24. action_scores = self.affine2(x)
  25. return F.softmax(action_scores, dim=self.dim)

Observer的构造器也会相应地进行调整。 它还带有batch参数,该参数控制用于选择动作的Agent函数。 在批量模式下,它将调用Agent上的select_action_batch函数,该函数将很快出现,并且该函数将以@rpc.functions.async_execution装饰。

  1. import gym
  2. import torch.distributed.rpc as rpc
  3. class Observer:
  4. def __init__(self, batch=True):
  5. self.id = rpc.get_worker_info().id - 1
  6. self.env = gym.make('CartPole-v1')
  7. self.env.seed(args.seed)
  8. self.select_action = Agent.select_action_batch if batch else Agent.select_action

与之前的教程分布式 RPC 框架入门相比,观察者的行为略有不同。 它不会在环境停止时退出,而是始终在每个剧集中运行n_steps迭代。 当环境返回时,观察者只需重置环境并重新开始。 通过这种设计,智能体将从每个观察者那里收到固定数量的状态,因此可以将它们打包成固定大小的张量。 在每个步骤中,Observer使用 RPC 将其状态发送到Agent,并通过返回值获取操作。 在每个剧集的结尾,它将所有步骤的奖励返还给Agent。 注意,Agent将使用 RPC 调用此run_episode函数。 因此,此函数中的rpc_sync调用将是嵌套的 RPC 调用。 我们也可以将此函数标记为@rpc.functions.async_execution,以避免阻塞Observer上的一个线程。 但是,由于瓶颈是Agent而不是Observer,因此可以在Observer进程中阻塞一个线程。

  1. import torch
  2. class Observer:
  3. ...
  4. def run_episode(self, agent_rref, n_steps):
  5. state, ep_reward = self.env.reset(), NUM_STEPS
  6. rewards = torch.zeros(n_steps)
  7. start_step = 0
  8. for step in range(n_steps):
  9. state = torch.from_numpy(state).float().unsqueeze(0)
  10. # send the state to the agent to get an action
  11. action = rpc.rpc_sync(
  12. agent_rref.owner(),
  13. self.select_action,
  14. args=(agent_rref, self.id, state)
  15. )
  16. # apply the action to the environment, and get the reward
  17. state, reward, done, _ = self.env.step(action)
  18. rewards[step] = reward
  19. if done or step + 1 >= n_steps:
  20. curr_rewards = rewards[start_step:(step + 1)]
  21. R = 0
  22. for i in range(curr_rewards.numel() -1, -1, -1):
  23. R = curr_rewards[i] + args.gamma * R
  24. curr_rewards[i] = R
  25. state = self.env.reset()
  26. if start_step == 0:
  27. ep_reward = min(ep_reward, step - start_step + 1)
  28. start_step = step + 1
  29. return [rewards, ep_reward]

Agent的构造器还采用batch参数,该参数控制如何对动作概率进行批量。 在批量模式下,saved_log_probs包含一张张量列表,其中每个张量包含一个步骤中所有观察者的动作抢夺。 如果不进行批量,则saved_log_probs是字典,其中的键是观察者 ID,值是该观察者的动作概率列表。

  1. import threading
  2. from torch.distributed.rpc import RRef
  3. class Agent:
  4. def __init__(self, world_size, batch=True):
  5. self.ob_rrefs = []
  6. self.agent_rref = RRef(self)
  7. self.rewards = {}
  8. self.policy = Policy(batch).cuda()
  9. self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
  10. self.running_reward = 0
  11. for ob_rank in range(1, world_size):
  12. ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
  13. self.ob_rrefs.append(rpc.remote(ob_info, Observer, args=(batch,)))
  14. self.rewards[ob_info.id] = []
  15. self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
  16. self.batch = batch
  17. self.saved_log_probs = [] if batch else {k:[] for k in range(len(self.ob_rrefs))}
  18. self.future_actions = torch.futures.Future()
  19. self.lock = threading.Lock()
  20. self.pending_states = len(self.ob_rrefs)

非批量select_acion只需运行状态抛出策略,保存动作概率,然后立即将动作返回给观察者。

  1. from torch.distributions import Categorical
  2. class Agent:
  3. ...
  4. @staticmethod
  5. def select_action(agent_rref, ob_id, state):
  6. self = agent_rref.local_value()
  7. probs = self.policy(state.cuda())
  8. m = Categorical(probs)
  9. action = m.sample()
  10. self.saved_log_probs[ob_id].append(m.log_prob(action))
  11. return action.item()

使用批量时,状态以观察者 id 为行 ID 存储在 2D 张量self.states中。 然后,它通过将回调函数安装到批量生成的self.future_actions Future对象上来链接Future,该对象将使用使用该观察者 ID 索引的特定行进行填充。 最后到达的观察者一口气通过策略运行所有批量状态,并相应地设置self.future_actions。 发生这种情况时,将触发安装在self.future_actions上的所有回调函数,并使用它们的返回值来填充链接的Future对象,该对象进而通知Agent为所有先前的 RPC 请求准备和传达来自其他观察者的响应。

  1. class Agent:
  2. ...
  3. @staticmethod
  4. @rpc.functions.async_execution
  5. def select_action_batch(agent_rref, ob_id, state):
  6. self = agent_rref.local_value()
  7. self.states[ob_id].copy_(state)
  8. future_action = self.future_actions.then(
  9. lambda future_actions: future_actions.wait()[ob_id].item()
  10. )
  11. with self.lock:
  12. self.pending_states -= 1
  13. if self.pending_states == 0:
  14. self.pending_states = len(self.ob_rrefs)
  15. probs = self.policy(self.states.cuda())
  16. m = Categorical(probs)
  17. actions = m.sample()
  18. self.saved_log_probs.append(m.log_prob(actions).t()[0])
  19. future_actions = self.future_actions
  20. self.future_actions = torch.futures.Future()
  21. future_actions.set_result(actions.cpu())
  22. return future_action

现在,让我们定义如何将不同的 RPC 函数结合在一起。 Agent控制每个剧集的执行。 它首先使用rpc_async在所有观察者上开始该剧集,并阻止将由观察者奖励填充的返还期货。 请注意,以下代码使用 RRef 帮助器ob_rref.rpc_async()在具有提供的参数的ob_rref RRef 的所有者上启动run_episode函数。 然后将保存的动作概率和返回的观察者奖励转换为期望的数据格式,并开始训练步骤。 最后,它将重置所有状态并返回当前剧集的奖励。 此函数是运行一集的入口。

  1. class Agent:
  2. ...
  3. def run_episode(self, n_steps=0):
  4. futs = []
  5. for ob_rref in self.ob_rrefs:
  6. # make async RPC to kick off an episode on all observers
  7. futs.append(ob_rref.rpc_async().run_episode(self.agent_rref, n_steps))
  8. # wait until all obervers have finished this episode
  9. rets = torch.futures.wait_all(futs)
  10. rewards = torch.stack([ret[0] for ret in rets]).cuda().t()
  11. ep_rewards = sum([ret[1] for ret in rets]) / len(rets)
  12. # stack saved probs into one tensor
  13. if self.batch:
  14. probs = torch.stack(self.saved_log_probs)
  15. else:
  16. probs = [torch.stack(self.saved_log_probs[i]) for i in range(len(rets))]
  17. probs = torch.stack(probs)
  18. policy_loss = -probs * rewards / len(rets)
  19. policy_loss.sum().backward()
  20. self.optimizer.step()
  21. self.optimizer.zero_grad()
  22. # reset variables
  23. self.saved_log_probs = [] if self.batch else {k:[] for k in range(len(self.ob_rrefs))}
  24. self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
  25. # calculate running rewards
  26. self.running_reward = 0.5 * ep_rewards + 0.5 * self.running_reward
  27. return ep_rewards, self.running_reward

其余代码是正常的进程启动和日志记录,与其他 RPC 教程类似。 在本教程中,所有观察者都被动地等待来自智能体的命令。 有关完整的实现,请参考示例回购。

  1. def run_worker(rank, world_size, n_episode, batch, print_log=True):
  2. os.environ['MASTER_ADDR'] = 'localhost'
  3. os.environ['MASTER_PORT'] = '29500'
  4. if rank == 0:
  5. # rank0 is the agent
  6. rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
  7. agent = Agent(world_size, batch)
  8. for i_episode in range(n_episode):
  9. last_reward, running_reward = agent.run_episode(n_steps=NUM_STEPS)
  10. if print_log:
  11. print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
  12. i_episode, last_reward, running_reward))
  13. else:
  14. # other ranks are the observer
  15. rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
  16. # observers passively waiting for instructions from agents
  17. rpc.shutdown()
  18. def main():
  19. for world_size in range(2, 12):
  20. delays = []
  21. for batch in [True, False]:
  22. tik = time.time()
  23. mp.spawn(
  24. run_worker,
  25. args=(world_size, args.num_episode, batch),
  26. nprocs=world_size,
  27. join=True
  28. )
  29. tok = time.time()
  30. delays.append(tok - tik)
  31. print(f"{world_size}, {delays[0]}, {delays[1]}")
  32. if __name__ == '__main__':
  33. main()

批量 RPC 有助于将操作推断合并为较少的 CUDA 操作,从而减少了摊销的开销。 上面的main函数使用不同数量的观察者(从 1 到 10)在批量和无批量模式下运行相同的代码。下图使用默认参数值绘制了不同世界大小的执行时间。 结果证实了我们的期望,即批量有助于加快训练速度。

使用异步执行实现批量 RPC 处理 - 图1

了解更多