• 分布式训练架构
  • 分布式训练 API
  • 分布式 program 的构建
  • 通信相关 OP
  • 参数保存

    • save_op

      分布式训练架构

      分布式分为模型并行和数据并行,其中数据并行的模型有2种通信模式:RPC通信和Collective通信。RPC具体实现是gRC,Collective 使用 NCCL2。
      Paddle 源码分析(三)——分布式训练 - 图1

      数据并行

      数据并行的方式,分布式训练集群中有2个角色——Pserver和 Trainer。
      Paddle 源码分析(三)——分布式训练 - 图2
      使用RPC通信方式的数据并行分布式训练,会启动多个pserver进程和多个trainer进程,每个pserver进程会保存一部分模型参数,并负责接收从trainer发送的梯度并更新这些模型参数;每个trainer进程会保存一份完整的模型,并使用一部分数据进行训练,然后向pserver发送梯度,最后从pserver拉取更新后的参数。
      pserver进程可以在和trainer完全不同的计算节点上,也可以和trainer公用节点。一个分布式任务所需要的pserver进程个数通常需要根据实际情况调整,以达到最佳的性能,然而通常来说pserver的进程不会比trainer更多。
  • 在使用GPU训练时,pserver可以选择使用GPU或只使用CPU,如果pserver也使用GPU,则会增加一次从CPU拷贝接收到的梯度数据到GPU的开销,在某些情况下会导致整体训练性能降低。

  • 在使用GPU训练时,如果每个trainer节点有多个GPU卡,则会先在每个trainer节点的多个卡之间执行NCCL2通信方式的梯度聚合,然后再通过pserver聚合多个节点的梯度。

    分布式训练 API

    Paddle 提供了2套API来进行分布式训练,fleet 是更高级的 API,transpile 是底层 API。
    假设总共有 2个节点:192.168.1.1 和 192.168.1.2,使用端口 6170,启动 4 个 Trainer。可以如下定义:

    使用 transpiler API

    官网说明 ```python

    必填,取值 PSERVER/TRAINER

    role = “PSERVER”

    必填,标识多个TRAINER的ID,从0开始

    trainer_id = 0

    必填,所有的PSERVER地址

    pserver_endpoints = “192.168.1.1:6170,192.168.1.2:6170”

    必填,trainer节点的个数

    trainers = 4

    选填,只有NCCL2模式需要传这个参数

    current_endpoint = “192.168.1.1:6170” t = fluid.DistributeTranspiler() t.transpile(trainer_id = 0, pservers=pserver_endpoints, trainers=trainers, sync_mode=False)

if role == “PSERVER”: pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) exe.run(pserver_startup) exe.run(pserver_prog) elif role == “TRAINER”: train_loop(t.get_trainer_program())

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/1328939/1590832730553-99454049-fd37-4fef-b567-e4f960e391e1.png#align=left&display=inline&height=1141&margin=%5Bobject%20Object%5D&originHeight=1141&originWidth=2596&status=done&style=none&width=2596)
  2. <a name="xxDaN"></a>
  3. ### 使用 fleet API
  4. [官网说明](https://www.paddlepaddle.org.cn/documentation/docs/zh/user_guides/howto/training/fleet_api_howto_cn.html)
  5. ```python
  6. # 1. 根据环境变量创建一个 role
  7. role = role_maker.UserDefinedRoleMaker(
  8. current_id=int(os.getenv("CURRENT_TRAIN_ID")),
  9. role=role_maker.Role.WORKER if bool(int(os.getenv("IS_WORKER"))) else role_maker.Role.SERVER,
  10. worker_num=int(os.getenv("WORKER_NUM")),
  11. server_endpoints=pserver_endpoints)
  12. # 2. 初始化 role
  13. fleet.init(role)
  14. # 3. 将普通的单机 optimizer 改为分布式的 optimizer
  15. optimizer = fleet.distributed_optimizer(optimizer)
  16. optimizer.minimize(cost)
  17. # 4. 使用 fleet API 启动训练
  18. if fleet.is_server():
  19. # 4.1 初始化并启动 pserver
  20. fleet.init_server()
  21. fleet.run_server()
  22. elif fleet.is_worker():
  23. # 4.2 初始化并启动 trainer
  24. fleet.init_worker()
  25. exe = fluid.Executor(fluid.CPUPlace())
  26. exe.run(fluid.default_startup_program())
  27. step = 1001
  28. for i in range(step):
  29. '''
  30. 或者
  31. fleet._executor.train_from_dataset(
  32. program=fleet.main_program,
  33. dataset=dataset,
  34. fetch_list=[cost.name],
  35. fetch_info=fetch_info)
  36. '''
  37. cost_val = exe.run(
  38. program=fluid.default_main_program(),
  39. feed=dataset,
  40. fetch_list=[cost.name])
  41. print("worker_index: %d, step%d cost = %f" %
  42. (fleet.worker_index(), i, cost_val[0]))

pserver 和 trainer 对应的环境变量如下:

  1. # pserver1
  2. export WORKER_NUM=2
  3. export IS_WORKER=FALSE
  4. export PSERVER_ENDPOINTS=192.168.1.1:6170,192.168.1.2:6170
  5. # pserver2
  6. export WORKER_NUM=2
  7. export IS_WORKER=FALSE
  8. export PSERVER_ENDPOINTS=192.168.1.1:6170,192.168.1.2:6170
  9. # trainer1
  10. export CURRENT_TRAIN_ID=0
  11. export WORKER_NUM=2
  12. export IS_WORKER=TRUE
  13. export PSERVER_ENDPOINTS=192.168.1.1:6170,192.168.1.2:6170
  14. # trainer2
  15. export CURRENT_TRAIN_ID=1
  16. export WORKER_NUM=2
  17. export IS_WORKER=TRUE
  18. export PSERVER_ENDPOINTS=192.168.1.1:6170,192.168.1.2:6170

fleet Python层的封装

虽然 transpiler 和 fleet 都是 paddle 用于分布式训练的API,但 fleet 是更高层的API,其底层的实现仍然是 transpiler。
fleet 的实现在 python/paddle/fluid/incubate/fleet/ 里,由基础的接口类定义方法名,nccl2 和 collective 的实现由继承 base 类的子类实现。
base 接口类的定义在:fleet/base/fleet_base.py,基础的接口类有:Fleet 和 DistributedOptimizer
Paddle 源码分析(三)——分布式训练 - 图3
继承了上述 fleet 接口类的有:

  1. fleet/collective/__init__.py

    1. class Collective(Fleet)
    2. class CollectiveOpBasedOptimizer(DistributedOptimizer)
  2. fleet/parameter_server/distribute_transpiler/_init_.py

    1. class DistributedTranspiler(Fleet)
    2. class TranspilerOptimizer(DistributedOptimizer)
  3. fleet/parameter_server/pslib/__init__.py

    1. class PSLib(Fleet)
    2. class DownpourOptimizer(DistributedOptimizer):

    先查看用户层使用 fleet API 的调用:

    1. strategy = DistributeTranspilerConfig()
    2. strategy.sync_mode = False
    3. optimizer = fluid.optimizer.Adagrad(learning_rate=g_learn_rate)
    4. # 以下对fleet的调用实际调用的是 DistributedTranspiler.distributed_optimizer(),
    5. # 结果返回的是 TranspilerOptimizer 实例
    6. optimizer = fleet.distributed_optimizer(optimizer, strategy)
    7. # 调用的 TranspilerOptimizer.minimize(),
    8. # 而在该函数里,当配置为分布式时,又调用了 fleet._transpile()
    9. optimizer.minimize(avg_cost)

    通过 fleet 下 pserver 的实现,可以看到 fleet 就是 DistributedTranspiler(Fleet) 类的实例。
    而最终 fleet._transpile() 的实现,只是对底层 transpiler 的封装。

    1. # fleet/parameter_server/distribute_transpiler/_init_.py
    2. fleet = DistributedTranspiler()
    3. class DistributedTranspiler(Fleet):
    4. def distributed_optimizer(self, optimizer, strategy=None):
    5. self._optimizer = TranspilerOptimizer(optimizer, strategy)
    6. return self._optimizer
    7. def _transpile(self, config):
    8. ...
    9. class TranspilerOptimizer(DistributedOptimizer):
    10. def minimize(self,
    11. loss,
    12. scopes=None,
    13. startup_program=None,
    14. parameter_list=None,
    15. no_grad_set=None):
    16. optimize_ops, params_grads = self._optimizer.minimize(
    17. loss, startup_program, parameter_list, no_grad_set)
    18. # 封装了 transpile 中 DistributedTranspiler 的实现。
    19. fleet._transpile(config=self._strategy)
    20. return optimize_ops, params_grads

    也就是最后都会调用 python/paddle/fluid/transpiler/distribute_transpiler.py 里的实现。
    具体 fluid 里 transpile 的实现比较复杂,是实现分布式训练的关键,我们留到下一章做重点讲解。

    分布式 program 的构建

    这一节具体分析 fluid.transpile 的实现,代码路径:python/paddle/fluid/transpiler/distribute_transpiler.py
    transpile 的部分配置信息如下,可以暴露给用户自定义:

    1. class DistributeTranspilerConfig(object):
    2. # 是否要切分变量到不同的 pserver 上? 默认开启
    3. slice_var_up = True
    4. # 变量切分方式,支持 hash,RoundRobin(默认)
    5. split_method = None
    6. # 变量切分的最小参数量,当参数量小于该值时,将不会做切分
    7. min_block_size = 8192
    8. # supported modes: pserver, nccl2, collective
    9. mode = "pserver"
    10. # 是否同步更新变量?True表示对同一个变量,pserver会等待所有的 trainer 上传梯度之后才做更新。
    11. _sync_mode = True

    transpile 的过程会重构整个 program,使之可以满足在分布式环境中运行,具体有以下几点:

  • 将原始的 origin_program 拆开成了 pserver_program 和 trainer_program 两部分。pserver_program 里新增 listen_and_serv OP 和 save OP;trainer_program 里新增 send OP 和 recv OP 用于发送梯度和获取参数。
  • 将本地的OP替换成分布式的OP,比如lookup_table_op换成了distributed_lookup_table_op
  • OP 的合并和优化。
  • 将参数量过大的变量切分成多个变量,分配到不同的 pserver 上,为每个需要通信的变量分配指定的 pserver 地址。

    变量拆分

    当调用 transpile() 的时候会调用self._init_splited_vars() 执行变量拆分。具体的拆分逻辑在 slice_variable() 里。
  1. 如果变量的参数量 < min_block_size,则不做拆分;
  2. 需要拆分的变量,按照 pserver 的个数平均拆分成 N 个block。当然前提是保证每个block的参数量依然大于 min_block_size,也就是当 pserver 个数很多的时候,参数只会分布在一部分pserver上。

    1. def slice_variable(var_list, slice_count, min_block_size):
    2. blocks = []
    3. for var in var_list:
    4. split_count = slice_count
    5. var_numel = reduce(lambda x, y: x * y, var.shape)
    6. max_pserver_count = int(math.floor(var_numel / float(min_block_size)))
    7. if max_pserver_count == 0:
    8. max_pserver_count = 1
    9. if max_pserver_count < slice_count:
    10. split_count = max_pserver_count
    11. block_size = int(math.ceil(var_numel / float(split_count)))
    12. if len(var.shape) >= 2:
    13. # align by dim1(width)
    14. dim1 = reduce(lambda x, y: x * y, var.shape[1:])
    15. remains = block_size % dim1
    16. if remains != 0:
    17. block_size += dim1 - remains
    18. # update split_count after aligning
    19. split_count = int(math.ceil(var_numel / float(block_size)))
    20. for block_id in range(split_count):
    21. curr_block_size = min(block_size, var_numel - (
    22. (block_id) * block_size))
    23. block = VarBlock(var.name, block_id, curr_block_size)
    24. blocks.append(str(block))
    25. return blocks

    变量分为2种:一种是需要永久性存储的模型参数,一种是临时计算出来的梯度。这2类变量的拆分逻辑都一样。
    梯度变量拆分之后会做一个 shuffle 操作,然后给每一个变量分配一个pserver的地址,分配算法有2种:

  3. hash 的方式

    1. # 每个参数 hash 的分布在各个 pserver 上,返回一个 pserve 的列表,代表每个参数分布的地址
    2. class HashName(PSDispatcher):
    3. def _hash_block(self, block_str, total):
    4. return hash(block_str) % total
    5. def dispatch(self, varlist):
    6. eplist = []
    7. for var in varlist:
    8. server_id = self._hash_block(var.name(), len(self._eps))
    9. server_for_param = self._eps[server_id]
    10. eplist.append(server_for_param)
    11. return eplist
  4. 轮询的方式

    1. # 每个参数均匀的分布在各个 pserver 上,返回一个 pserve 的列表,代表每个参数分布的地址
    2. class RoundRobin(PSDispatcher):
    3. def dispatch(self, varlist):
    4. eplist = []
    5. for var in varlist:
    6. server_for_param = self._eps[self._step]
    7. eplist.append(server_for_param)
    8. self._step += 1
    9. if self._step >= len(self._eps):
    10. self._step = 0
    11. return eplist

    program 拆分

    原始的 origin_program 拆开成了 pserver_program 和 trainer_program 两部分。同时插入了 send OP,recv OP 等操作,还有 OP 的合并优化等处理,逻辑很复杂,于是直接打印出前后的 program。可以看到具体的修改差异,对比如下:

  • origin_program
    绿色部分是 embedding 相关的OP以及对应的梯度OP;
    红色部分是 FC 相关的OP和对应的梯度 OP;
    蓝色部分是 不参与反向传播的一些指标性的OP。

Paddle 源码分析(三)——分布式训练 - 图4

  • trainer_program
    如下图,前向计算的OP主要是 lookup_table_op 换成了 distributed_lookup_table_op
    原来的 program 中有 232 个 lookup_table_op + sequence_pool_op,现在变成了2个 distributed_lookup_table_op 和 232 个 sequence_pool_op。也就是将原来 232 个 lookup_table_op 变成了 2 个 distributed_lookup_table_op
    第一个 distributed_lookup_table_op 是查 embedding 表的,该OP将一次性查所有的 feature_id 对应的 embedding 参数,也就是将 231 个查 embedding 的 lookup_table_op 合并成了一个,注意这里在合并之前每个 slot 是对应一个 lookup_table_op 的,合并之后在input里必须要保留 slot 的信息才能做 pooling。
    第二个是查 lr 的参数,因为 lr 的实现也是用的 embedding。所以这里的 OP 一样。
    distributed_lookup_table_op 最终会从 pserver 上 fetch embedding 的参数,而不是从本地。除此之外,反向传播计算梯度的OP后面加了一个send OP,用于将计算出来的梯度发送到 pserver 上去。当整个反向传播也结束的时候,会有一些 recv 的操作,此时会从 pserver 上获取最新的参数。
    Paddle 源码分析(三)——分布式训练 - 图5
  • pserver_program
    pserver_program 就比较简单了,开头新增了一个 listen_and_serv 的 OP,用于提供 GRPC 服务,处理 trainer 发起的参数更新,参数获取等请求。剩下的就是优化器 adgrad 的 OP,用于根据梯度更新参数,最后是 save OP,用于将参数保存起来。
    Paddle 源码分析(三)——分布式训练 - 图6

在创建 embedding layer 的时候提供了2个参数,is_sparse 和 is_distributed,is_sparse 决定了 embedding 表的存储方式是否为稀疏存储,is_sparse=True 将使用 SelectedRows 存储,否则使用 LodTensor 存储。
当 is_sparse=True 且 is_distributed=False 的时候,embedding 参数将保存在 pserver 上,lookup_table_op 在 transpiler 时就会被换成 distributed_lookup_table_op ,每次会从 pserver 上 prefetch embedding 参数,而不是在本地查找 embedding。

  1. # 用户层的调用如下
  2. fluid.layers.embedding(
  3. input=input,
  4. is_sparse=True,
  5. is_distributed=False,
  6. padding_idx=0,
  7. size=[10^9, 32],
  8. param_attr=fluid.ParamAttr(name="SparseFeatFactors_deep"))
  9. # 实际的API如下
  10. def embedding(input...):
  11. ....
  12. helper.append_op(
  13. type='lookup_table',
  14. inputs={'Ids': input,
  15. 'W': w},
  16. outputs={'Out': tmp},
  17. attrs={
  18. 'is_sparse': is_sparse,
  19. 'is_distributed': is_distributed,
  20. 'remote_prefetch': remote_prefetch,
  21. 'padding_idx': padding_idx,
  22. 'is_test': is_test
  23. })
  24. return tmp

可以看到 OP 的 inputs 是 Ids 的名称,也就是用户层定义的 sparse_slot_xxx,另外一个 inputs 是 W ,也叫 SparseFeatFactors_deep,这个 W 就是 embedding 表,Out 就是查表后的结果。
在建立网络结构的时候,这里的 slots 是预先在 yaml 文件里定义好的,所以每个 slot 对应一个 lookup_table_op。而所有的 lookup_table_op 是共用的同一个 embedding 表,其属性名为 SparseFeatFactors_deep。
由于输入的样本文件是 feature:slot,所以同一个 slot 的feature 会被分配到同一个 lookup_table_op 上去运行,最终同一个lookup_table_op 的查找结果可以做一个pooling 的操作。
下面是一个 lookup_table_op 的详情:

  1. inputs {
  2. parameter: "Ids"
  3. arguments: "sparse_slot_705"
  4. }
  5. inputs {
  6. parameter: "W"
  7. arguments: "SparseFeatFactors_deep"
  8. }
  9. outputs {
  10. parameter: "Out"
  11. arguments: "embedding_230.tmp_0"
  12. }
  13. type: "lookup_table"
  14. attrs {
  15. name: "op_role_var"
  16. type: STRINGS
  17. }
  18. attrs {
  19. name: "padding_idx"
  20. type: LONG
  21. l: 0
  22. }
  23. attrs {
  24. name: "epmap"
  25. type: STRINGS
  26. }
  27. attrs {
  28. name: "is_distributed"
  29. type: BOOLEAN
  30. b: false
  31. }
  32. attrs {
  33. name: "op_namescope"
  34. type: STRING
  35. s: "/"
  36. }
  37. attrs {
  38. name: "op_callstack"
  39. type: STRINGS
  40. strings: "..."
  41. }
  42. attrs {
  43. name: "is_sparse"
  44. type: BOOLEAN
  45. b: true
  46. }
  47. attrs {
  48. name: "op_role"
  49. type: INT
  50. i: 0
  51. }
  52. attrs {
  53. name: "remote_prefetch"
  54. type: BOOLEAN
  55. b: true
  56. }
  57. attrs {
  58. name: "is_test"
  59. type: BOOLEAN
  60. b: false
  61. }
  62. attrs {
  63. name: "grad_inplace"
  64. type: BOOLEAN
  65. b: false
  66. }
  67. attrs {
  68. name: "table_names"
  69. type: STRINGS
  70. }
  71. attrs {
  72. name: "height_sections"
  73. type: LONGS
  74. }
  75. attrs {
  76. name: "trainer_id"
  77. type: INT
  78. i: 0
  79. }

通信相关 OP

分布式训练中通信相关的 OP主要有 listen_and_serv_opdistributed_lookup_table_opsend_oprecv_op。涉及参数的获取和更新。

listen_and_serv_op

listen_and_serv_op 是在 pserver_program 中的。主要是启动 gRPC 服务器,监听 trainer 的请求并进行参数更新等操作。
源文件:listen_and_serv_op.cc
ListenAndServOp 继承了 framework::OperatorBase,并重写了父类的 RunImpl() 函数,在OP运行的时候,就会调用 RunImpl()。
看看 RunImpl() 的整体流程:

  • RegisterRPC(): 注册请求的handler
  • 初始化一个 Executor,做program相关的预处理,准备好请求的context
  • 绑定每个handler的context,FillRequestCtx()
  • 新建一个线程启动服务,StartServer();
  • 保存监听的端口到文件中
  • 初始化参数更新的 recorder SparseParamUpdateRecorder::Init()

在 listen_and_serv_op.cc 中注册需要监听的 RPC 实现:

  1. constexpr char kRequestSend[] = "RequestSend";
  2. constexpr char kRequestGet[] = "RequestGet";
  3. constexpr char kRequestPrefetch[] = "RequestPrefetch";
  4. rpc_service_->RegisterRPC(distributed::kRequestSend,
  5. request_send_handler_.get(),
  6. FLAGS_rpc_send_thread_num);
  7. rpc_service_->RegisterRPC(distributed::kRequestGet,
  8. request_get_handler_.get(),
  9. FLAGS_rpc_get_thread_num);
  10. rpc_service_->RegisterRPC(distributed::kRequestPrefetch,
  11. request_prefetch_handler_.get(),
  12. FLAGS_rpc_prefetch_thread_num);

request_handler.cc
用于实现不同的请求的处理的 handler类
父类:class RequestHandler()
子类:
class RequestSendHandler
class RequestGetHandler
class RequestGetNoBarrierHandler
class RequestPrefetchHandler
class RequestCheckpointHandler
class RequestNotifyHandler
然后 StartServer():
grpc_server.cc 里有 RequestBase 和 继承了 RPCServer 的 AsyncGRPCServer 两个类:

  1. RequestBase 持有 RequestHandler* 对象,代表一个 Request
    父类:class RequestBase
    子类:
    class RequestSend final : public RequestBase
    class RequestGet final : public RequestBase
    class RequestPrefetch final : public RequestBase
  2. AsyncGRPCServer 是 RPCServer 的子类,
    父类:class RPCServer 定义在 rpcserve.cc 中:
    定义了RegisterRPC、StartServer、Stop等实现。
    RegisterRPC() 操作将RPC的名字和handler 保存在 rpc_call_map
    中。
    AsyncGRPCServer 重载了 StartServer() 的实现,会将 rpccall_map 中的rpc请求再重新注册一遍,而对于的注册对象变成了 RequestBase 的子类,也就是 RequestSend、RequestGet、RequestPrefetch 等。
    ```cpp for (auto& t : rpccall_map) { auto& rpc_name = t.first; for (int i = 0; i < kRequestBufSize; i++) { VLOG(6) << “TryToRegisterNewOne on RPC NAME: “ << rpc_name << “ I: “ << i; TryToRegisterNewOne(rpc_name, i); } }

void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpcname, int req_id) { if (rpc_name == kRequestSend) { b = new RequestSend(&service, cq.get(), handler, reqid); } else if (rpc_name == kRequestGet) { b = new RequestGet(&service, cq.get(), handler, reqid); } else if (rpc_name == kRequestPrefetch) { b = new RequestPrefetch(&service, cq.get(), handler, req_id); } }

<a name="yOfUc"></a>
### distributed_lookup_table_op
Trainer 端运行 DistributedLookupTable OP 发起 PrefetchRequest。
```cpp
template <typename T>
class DistributedLookupTableKernel : public framework::OpKernel<T> {
 public:
  void Compute(const framework::ExecutionContext &context) const override {
    // 获取各个属性
    ....
    // 执行 prefetch
    operators::distributed::prefetchs(
        id_names, out_names, embedding_name, false, lookup_tables, endpoints,
        height_sections, context, context.scope());
  }
};

Input是所有的Ids, output 是所有的embedding 参数,前提是知道哪些参数分布在哪些 Pserver 上。
先看看其中的2个属性 table_names 和 endpoints。这2个是一一对应的关系,显示了同一个embedding参数被分成多个block之后,存储在哪个pserver 上。height_sections就是每个block的大小。

attrs {
  name: "table_names"
  type: STRINGS
  strings: "SparseFeatFactors_deep.block0"
  strings: "SparseFeatFactors_deep.block1"
}
attrs {
  name: "height_sections"
  type: LONGS
  longs: 50000000
  longs: 50000000
}
attrs {
  name: "endpoints"
  type: STRINGS
  strings: "127.0.0.1:6170"
  strings: "127.0.0.1:6171"
}

parameter_prefetch.cc

void prefetch_core(
    const std::vector<int64_t>& ids, const TableAndEndpoints& tables,
    const std::vector<int64_t>& height_sections,
    const framework::ExecutionContext& context, const framework::Scope& scope,
    std::unordered_map<int64_t, std::vector<float>>* recved_vec_map) {
  platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
  auto& actual_ctx = *pool.Get(context.GetPlace());
  // 创建一个临时的 local_scope 来保存 input_var 和 output_var 的数据。
  std::unique_ptr<framework::Scope> local_scope = scope.NewTmpScope();

  // 给每个table 新建两个变量 
  // in_var_names=[prefetch_send@127.0.0.1:6170, prefetch_send@127.0.0.1:6171]
  // out_var_names=[prefetch_recv@127.0.0.1:6170, prefetch_recv@127.0.0.1:6171]
  std::vector<std::string> in_var_names;
  std::vector<std::string> out_var_names;
  for (size_t i = 0; i < tables.size(); ++i) {
    in_var_names.push_back("prefetch_send@" + tables[i].second);
    out_var_names.push_back("prefetch_recv@" + tables[i].second);
  }

  // 如果 ids 数组太大,就分割,并将 ids 的值存在 local_scope 中
  auto splited_ids = SplitIds(ids, height_sections);
  SplitIdsIntoMultipleVarsBySection(in_var_names, height_sections, splited_ids,
                                    local_scope.get());
  // 在 local_scope 中创建 LoDTensor 类型的 output_var
  for (auto& name : out_var_names) {
    local_scope->Var(name)->GetMutable<framework::LoDTensor>();
  }
  distributed::RPCClient* rpc_client =
      distributed::RPCClient::GetInstance<RPCCLIENT_T>(
          context.Attr<int>("trainer_id"));
  // 异步发送请求
  std::vector<distributed::VarHandlePtr> rets;
  for (size_t i = 0; i < in_var_names.size(); i++) {
    if (NeedSend(*local_scope.get(), in_var_names[i])) {
      VLOG(3) << "sending " << in_var_names[i] << " to " << tables[i].second
              << " to get " << out_var_names[i] << " back";
      // AsyncPrefetchVar 入参:
      // endpoint,连接上下文,local_scope,输入的变量名,输出的变量名,表名
      rets.push_back(rpc_client->AsyncPrefetchVar(
          tables[i].second, actual_ctx, *local_scope.get(), in_var_names[i],
          out_var_names[i], tables[i].first));
    } else {
      VLOG(3) << "don't send no-initialied variable: " << out_var_names[i];
    }
  }
  // 等待回包
  for (size_t i = 0; i < rets.size(); i++) {
    PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient");
  }
  // 处理收到的数据
  for (size_t section_idx = 0; section_idx < out_var_names.size();
       ++section_idx) {
    .....
  }
}

由于输入是一个 int64 数组,现在需要切分成多个section,大小和 endpoints 相同,从而去相应的 pserver 上去查。采用 id%pservers 取余的方式去切分的。
首先这里的 ids 已经是取余之后的 ids,也就是在 ctr_reader.py 里的输出的已经是对 dict_size 取余之后的。
切分是根据 embedding 的 shape 做的切分。也就是根据[dict_size, embedding_size] 切分。

  1. SplitIds()
    先将 ids 去重,然后根据 height_sections,用取余的方式切分成多组 SplitIds。
  2. SplitIdsIntoMultipleVarsBySection()
    将 SplitIds 变成 Var 类型放进 in_var_names 里。
    也就是变量
    vars {
    "name": prefetch_send@127.0.0.1:6170
    "value": SplitIds[0]
    }
    vars {
    "name": prefetch_send@127.0.0.1:6171
    "value": SplitIds[1]
    }
    
    所以 transpile 的时候用 RoundRobin/Hash 将 embedding 切分在多个 pserver 上,而在 lookup_table 的时候通过 id%pserver 的方式去查询,这样保证同一个参数是分布在同一个的
    AsyncPrefetchVar() 细节:
    VarHandlePtr GRPCClient::AsyncPrefetchVar(const std::string& ep...) {
    const platform::DeviceContext* p_ctx = &ctx;
    const std::string ep_val = ep;
    const std::string in_var_name_val = in_var_name;
    const std::string out_var_name_val = out_var_name;
    const std::string table_name_val = table_name;
    const framework::Scope* p_scope = &scope;
    // 根据 ep 地址获取一个 channel
    const auto ch = GetChannel(ep_val);
    const std::string method = kPrefetchRPC;
    int retry_times_ = 0;
    // 具体的通信发包
    while (true) {
     GetProcessor* s = new GetProcessor(ch);
     VarHandlePtr h(new VarHandle(ep, method, out_var_name_val, p_ctx, p_scope));
     s->Prepare(h, time_out);
     framework::AsyncIO([in_var_name_val, out_var_name_val, ep_val, p_scope,
                         p_ctx, s, method, h, table_name_val, this] {
       auto* var = p_scope->FindVar(in_var_name_val);
       ::grpc::ByteBuffer req;
       SerializeToByteBuffer(in_var_name_val, var, *p_ctx, &req,
                             out_var_name_val, 0, table_name_val);
       VLOG(3) << s->GetVarHandlePtr()->String() << " begin";
       // stub context
       s->response_call_back_ = ProcGetResponse;
       platform::RecordRPCEvent record_event(method);
       auto call = s->stub_g_.PrepareUnaryCall(
           s->context_.get(), "/sendrecv.SendRecvService/PrefetchVariable", req,
           &cq_);
       call->StartCall();
       call->Finish(&s->reply_, &s->status_, static_cast<void*>(s));
       if (UNLIKELY(platform::IsProfileEnabled())) {
         h->Wait();
       }
     });
     req_count_++;
     // 重试逻辑
     return h;
    }
    }
    
    trainer Request 过程的 log 如下:
    I0108 09:09:58.494256  9344 hogwild_worker.cc:135] Going to run op distributed_lookup_table
    I0108 09:09:58.497203  9344 operator.cc:165] CPUPlace Op(distributed_lookup_table), inputs:{Ids[sparse_slot_1:int64_t[32, 1]({{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}}), sparse_slot_2:...}
    I0108 09:09:58.497671  9344 operator.cc:988] expected_kernel_key:data_type[float]:data_layout[ANY_LAYOUT]:place[CPUPlace]:library_type[PLAIN]
    I0108 09:09:58.503204  9344 scope.cc:164] Create variable prefetch_send@127.0.0.1:6170
    I0108 09:09:58.503458  9344 scope.cc:164] Create variable prefetch_recv@127.0.0.1:6170
    I0108 09:09:58.503598  9344 parameter_prefetch.cc:119] sending prefetch_send@127.0.0.1:6170 to 127.0.0.1:6170 to get prefetch_recv@127.0.0.1:6170 back
    I0108 09:09:58.503856  9148 grpc_client.cc:273] PrefetchRPC name:[prefetch_recv@127.0.0.1:6170], ep:[127.0.0.1:6170], status:[-1] begin
    I0108 09:09:58.568178  9139 grpc_client.cc:457] PrefetchRPC name:[prefetch_recv@127.0.0.1:6170], ep:[127.0.0.1:6170], status:[-1] process
    I0108 09:09:58.568346  9139 grpc_client.cc:127] ProcGetResponse
    I0108 09:09:58.579715  9344 operator.cc:185] CPUPlace Op(distributed_lookup_table), inputs:{Ids[sparse_slot_1:int64_t[32, 1]({{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32}}), sparse_slot_2:...}
    I0108 09:09:58.580394  9344 hogwild_worker.cc:139] Op distributed_lookup_table Finished
    
    接着是 pserver 的 response 过程:
    由 RequestPrefetchHandler() 处理上述请求,最终是新建一个 LookupSparseTableOp 去做查找的。由于 embedding Table 使用 SelectedRow 实现,最后的查找在 selected_rows.cc 里实现。
    LookupSparseTableOp::RunImpl() 实现细节:
    class LookupSparseTableOp : public framework::OperatorBase {
    public:
    using framework::OperatorBase::OperatorBase;
    private:
    void RunImpl(const framework::Scope &scope,
                const platform::Place &dev_place) const override {
     auto out_var = scope.FindVar(Output("Out"));
     auto w_var = scope.FindVar(Input("W"));
     auto ids_var = scope.FindVar(Input("Ids"));
     auto is_test = Attr<bool>("is_test");
     auto &ids_t = ids_var->Get<framework::LoDTensor>();
     auto out_t = out_var->GetMutable<framework::LoDTensor>();
     // w_t 是 framework::SelectedRows 实例
     auto w_t = w_var->GetMutable<framework::SelectedRows>();
     platform::CPUPlace cpu;
     auto out_shape = w_t->value().dims();
     out_shape[0] = ids_t.numel();
     out_t->Resize(out_shape);
     out_t->mutable_data(cpu, w_t->value().type());
     // w_t->Get() 即调用 SelectedRows::Get(),具体的查找过程在 SelectedRows 里实现的
     w_t->Get(ids_t, out_t, true, is_test, true, true);
     out_t->set_lod(ids_t.lod());
    }
    };
    
    日志:
    I0108 09:09:58.505403  9012 scope.cc:164] Create variable prefetch_send@127.0.0.1:6170
    I0108 09:09:58.505606  9012 grpc_server.cc:556] HandleRequest RequestPrefetch, req_id:0 get next
    I0108 09:09:58.505641  9012 grpc_server.cc:567] RequestPrefetch name:[prefetch_send@127.0.0.1:6170], ep:[ipv4:127.0.0.1:10724] Process using req_id:0
    I0108 09:09:58.505651  9012 grpc_server.cc:337] RequestPrefetch, in_var_name: prefetch_send@127.0.0.1:6170 out_var_name: prefetch_recv@127.0.0.1:6170
    I0108 09:09:58.505666  9012 scope.cc:164] Create variable prefetch_recv@127.0.0.1:6170
    I0108 09:09:58.505677  9012 request_handler_impl.cc:211] RequestPrefetchHandler prefetch_send@127.0.0.1:6170
    I0108 09:09:58.505686  9012 request_handler_impl.cc:222] RequestPrefetchHandler: table name is SparseFeatFactors_deep
    I0108 09:09:58.505705  9012 op_registry.cc:49] CreateOp directly from OpDesc is deprecated. It should only beused in unit tests. Use CreateOp(const OpDesc& op_desc) instead.
    I0108 09:09:58.505780  9012 operator.cc:165] CPUPlace Op(lookup_sparse_table), inputs:{Ids[prefetch_send@127.0.0.1:6170:int64_t[4025, 1]({})], W[SparseFeatFactors_deep[row_size=0]:float[100000000, 8]({{}})]}, outputs:{Out[prefetch_recv@127.0.0.1:6170:[0]({{}})]}.
    I0108 09:09:58.567589  9012 operator.cc:185] CPUPlace Op(lookup_sparse_table), inputs:{Ids[prefetch_send@127.0.0.1:6170:int64_t[4025, 1]({})], W[SparseFeatFactors_deep[row_size=0]:float[100000000, 8]({{}})]}, outputs:{Out[prefetch_recv@127.0.0.1:6170:float[4025, 8]({})]}.
    I0108 09:09:58.567870  9012 grpc_server.cc:549] HandleRequest RequestPrefetch wait next
    I0108 09:09:58.567914  9017 grpc_server.cc:556] HandleRequest RequestPrefetch, req_id:0 get next
    I0108 09:09:58.567983  9017 grpc_server.cc:567] RequestPrefetch name:[prefetch_send@127.0.0.1:6170], ep:[ipv4:127.0.0.1:10724] Finish using req_id:0
    

    send_op

    send op 是发送梯度信息到
    send op log:
    I0108 09:09:58.968538  9344 hogwild_worker.cc:135] Going to run op send
    I0108 09:09:58.968704  9344 operator.cc:165] CPUPlace Op(send), inputs:{X[fc_3.b_0@GRAD:float[2]({})]}, outputs:{Out[__control_var@0.436815957363:[0]({{}})]}.
    I0108 09:09:58.968796  9344 communicator.cc:253] communicator send fc_3.b_0@GRAD
    I0108 09:09:58.968989  9344 tensor_util.cc:28] TensorCopy 2 from CPUPlace to CPUPlace
    I0108 09:09:58.969172  9344 communicator.cc:268] send fc_3.b_0@GRAD queue size 0
    I0108 09:09:58.969363  9344 operator.cc:185] CPUPlace Op(send), inputs:{X[fc_3.b_0@GRAD:float[2]({})]}, outputs:{Out[__control_var@0.436815957363:[0]({{}})]}.
    I0108 09:09:58.969411  9344 hogwild_worker.cc:139] Op send Finished
    

    recv_op

    参数保存

    save_op