Worker 作为 Endpoint 的具体实例,下面我们介绍一下 Worker 启动以及 OnStart 指令后的额外工作。

6.5.1 脚本概览

下面是一个举例:

  1. /opt/module/jdk1.8.0_144 \
  2. -cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars

6.5.2 启动流程

Worker 的启动流程如下:

image.png

详解如下:

  1. SparkConf:加载 key 以 spark. 开头的系统属性 (Utils.getSystemProperties)。
  2. WorkerArguments
    1. 解析 Master 启动的参数:
      • —ip -i
      • —host -h
      • —port -p
      • —cores -c
      • —memory -m
      • —work-dir
      • —webui-port
      • —properties-file
    2. 将 —properties-file (没有配置默认为 conf/spark-defaults.conf) 中以 spark. 开头的配置存入 SparkConf。
    3. 在没有配置情况下,cores 默认为服务器 CPU 核数。
    4. 在没有配置情况下,memory 默认为服务器内存减 1G,如果低于 1G 取 1G。
    5. webUiPort 默认为 8081。
  3. NettyRpcEnv 中的内部处理遵循 RpcEndpoint 统一处理,这里不再赘述。
  4. 最终守护进程会一直存在等待结束信 awaitTermination

6.5.3 OnStart 监听事件

Worker 的启动完成后异步执行工作如下:

image.png

详解如下:

  1. dispatcher-event-loop】线程扫描到 OnStart 指令后会启动相关 WorkerWebUI (默认端口 8081)。
  2. WorkerMaster 发起一次 RegisterWorker 指令。
  3. 另起【master-forward-message-thread】线程定期执行 ReregisterWithMaster 任务,如果注册成功 (RegisteredWorker) 则跳过,否则再次向 Master 发起 RegisterWorker 指令,直到超过最大次数报错 (默认16次)。
  4. Master 如果可以注册,则维护对应的 WorkerInfo 对象并持久化,完成后向 Worker 发起一条 RegisteredWorker 指令,如果 Master 为 standby 状态,则向 Worker 发起一条 MasterInStandby 指令。
  5. Worker 接受 RegisteredWorker 后,提交【master-forward-message-thread】线程定期执行 SendHeartbeat 任务,完成后向 Worker 发起一条 WorkerLatestState 指令。
  6. Worker 发心跳检测,会触发更新 Master 对应 WorkerInfo 对象,如果 Master 检测到异常,则发起 ReconnectWorker 指令至 Worker,Worker 则再次执行 ReregisterWithMaster 工作。

6.5.4 RpcMessage 处理 (receiveAndReply)

**
image.png

6.5.5 OneWayMessage 处理 (receive)

**
image.png