一、ApplicationMaster 整体运行流程

二、ApplicationMaster 启动流程源码分析

2.1 应用程序提交

不管是什么类型的应用程序,提交到 Yarn 上的入口,都是通过 YarnClient 这个接口 api 提交的,具体提交方法为 submitApplication()。

  1. //位置:org/apache/hadoop/yarn/client/api/YarnClient.java
  2. public abstract ApplicationId submitApplication(
  3. ApplicationSubmissionContext appContext) throws YarnException,
  4. IOException;

2.3 启动 AM

相关代码在这里:
image.png
ApplicationMasterLauncher 类中处理相关的启动事件

  1. public class ApplicationMasterLauncher extends AbstractService implements
  2. EventHandler<AMLauncherEvent> {
  3. // ...
  4. // handle 函数中走 LAUNCH 分支,处理 AM LAUNCH 事件
  5. public synchronized void handle(AMLauncherEvent appEvent) {
  6. AMLauncherEventType event = appEvent.getType();
  7. RMAppAttempt application = appEvent.getAppAttempt();
  8. switch (event) {
  9. case LAUNCH:
  10. launch(application);
  11. break;
  12. case CLEANUP:
  13. cleanup(application);
  14. break;
  15. default:
  16. break;
  17. }
  18. }

launch 函数这里创建处理线程,并加入到 masterEvents 处理队列中

  1. private void launch(RMAppAttempt application) {
  2. Runnable launcher = createRunnableLauncher(application,
  3. AMLauncherEventType.LAUNCH);
  4. masterEvents.add(launcher);
  5. }

然后在 LauncherThread 的线程类的 run 方法中去消费事件

  1. private class LauncherThread extends Thread {
  2. public LauncherThread() {
  3. super("ApplicationMaster Launcher");
  4. }
  5. @Override
  6. public void run() {
  7. while (!this.isInterrupted()) {
  8. Runnable toLaunch;
  9. try {
  10. toLaunch = masterEvents.take();
  11. launcherPool.execute(toLaunch);
  12. } catch (InterruptedException e) {
  13. LOG.warn(this.getClass().getName() + " interrupted. Returning.");
  14. return;
  15. }
  16. }
  17. }
  18. }

取出事件后具体的执行逻辑就交给 AMLaunch 类了,在 run 方法中处理

  1. //org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
  2. public void run() {
  3. switch (eventType) {
  4. case LAUNCH:
  5. try {
  6. LOG.info("Launching master" + application.getAppAttemptId());
  7. // 启动 launch() 方法
  8. launch();
  9. // 发送 RMAppAttemptEventType.LAUNCHED 事件
  10. handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
  11. RMAppAttemptEventType.LAUNCHED));
  12. } catch(Exception ie) {
  13. String message = "Error launching " + application.getAppAttemptId()
  14. + ". Got exception: " + StringUtils.stringifyException(ie);
  15. LOG.info(message);
  16. handler.handle(new RMAppAttemptLaunchFailedEvent(application
  17. .getAppAttemptId(), message));
  18. }
  19. break;
  20. case CLEANUP: // 省略
  21. default:
  22. LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");
  23. break;
  24. }
  25. }

AMLaunch 类的 launch() 方法操作会调用 RPC 函数与 NodeManager 通信,来启动 AM Container,这里 AM 与 NM 交互是通过 ContainerManagementProtocol 协议来实现 RPC 调用的。launch() 方法运行完成后会向调度器发送 RMAppAttemptEventType.LAUNCHED 事件,并将 AppAttempt 的状态从 ALLOCATED 转换为 LAUNCHED。

  1. //位置:org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
  2. private void launch() throws IOException, YarnException {
  3. // ...
  4. // 构建 Container 请求信息
  5. StartContainerRequest scRequest =
  6. StartContainerRequest.newInstance(launchContext,
  7. masterContainer.getContainerToken());
  8. List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
  9. list.add(scRequest);
  10. StartContainersRequest allRequests =
  11. StartContainersRequest.newInstance(list);
  12. // 重点:调用 RPC 函数启动 Container
  13. StartContainersResponse response =
  14. containerMgrProxy.startContainers(allRequests);
  15. if (response.getFailedRequests() != null
  16. && response.getFailedRequests().containsKey(masterContainerID)) {
  17. Throwable t =
  18. response.getFailedRequests().get(masterContainerID).deSerialize();
  19. parseAndThrowException(t);
  20. } else {
  21. LOG.info("Done launching container " + masterContainer + " for AM "
  22. + application.getAppAttemptId());
  23. }
  24. }

至此,用于运行 ApplicationMaster 的 Container 已经启动,AM Container 在具体的 NodeManager 上启动后,Container 会根据上下文信息启动 ApplicationMaster 进程,ApplicationMaster 生命周期的第一步 ApplicationMaster 启动在这里已经完成了。


疑惑:
image.png