Worker 作为 Endpoint 的具体实例,下面我们介绍一下 Worker 启动以及 OnStart 指令后的额外工作。
6.5.1 脚本概览
下面是一个举例:
/opt/module/jdk1.8.0_144 \
-cp /opt/module/spark-2.1.1-bin-hadoop2.7/conf/:/opt/module/spark-2.1.1-bin-hadoop2.7/jars
6.5.2 启动流程
Worker 的启动流程如下:
详解如下:
- SparkConf:加载 key 以 spark. 开头的系统属性 (
Utils.getSystemProperties
)。 - WorkerArguments:
- 解析 Master 启动的参数:
- —ip -i
- —host -h
- —port -p
- —cores -c
- —memory -m
- —work-dir
- —webui-port
- —properties-file
- 将 —properties-file (没有配置默认为 conf/spark-defaults.conf) 中以 spark. 开头的配置存入 SparkConf。
- 在没有配置情况下,cores 默认为服务器 CPU 核数。
- 在没有配置情况下,memory 默认为服务器内存减 1G,如果低于 1G 取 1G。
- webUiPort 默认为 8081。
- 解析 Master 启动的参数:
- NettyRpcEnv 中的内部处理遵循 RpcEndpoint 统一处理,这里不再赘述。
- 最终守护进程会一直存在等待结束信 awaitTermination。
6.5.3 OnStart 监听事件
Worker 的启动完成后异步执行工作如下:
详解如下:
- 【dispatcher-event-loop】线程扫描到 OnStart 指令后会启动相关 WorkerWebUI (默认端口 8081)。
- Worker 向 Master 发起一次 RegisterWorker 指令。
- 另起【master-forward-message-thread】线程定期执行 ReregisterWithMaster 任务,如果注册成功 (RegisteredWorker) 则跳过,否则再次向 Master 发起 RegisterWorker 指令,直到超过最大次数报错 (默认16次)。
- Master 如果可以注册,则维护对应的 WorkerInfo 对象并持久化,完成后向 Worker 发起一条 RegisteredWorker 指令,如果 Master 为 standby 状态,则向 Worker 发起一条 MasterInStandby 指令。
- Worker 接受 RegisteredWorker 后,提交【master-forward-message-thread】线程定期执行 SendHeartbeat 任务,完成后向 Worker 发起一条 WorkerLatestState 指令。
- Worker 发心跳检测,会触发更新 Master 对应 WorkerInfo 对象,如果 Master 检测到异常,则发起 ReconnectWorker 指令至 Worker,Worker 则再次执行 ReregisterWithMaster 工作。
6.5.4 RpcMessage 处理 (receiveAndReply)
**
6.5.5 OneWayMessage 处理 (receive)
**