一、ApplicationMaster 整体运行流程
二、ApplicationMaster 启动流程源码分析
2.1 应用程序提交
不管是什么类型的应用程序,提交到 Yarn 上的入口,都是通过 YarnClient 这个接口 api 提交的,具体提交方法为 submitApplication()。
//位置:org/apache/hadoop/yarn/client/api/YarnClient.javapublic abstract ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException,IOException;
2.3 启动 AM
相关代码在这里:
在 ApplicationMasterLauncher 类中处理相关的启动事件
public class ApplicationMasterLauncher extends AbstractService implementsEventHandler<AMLauncherEvent> {// ...// handle 函数中走 LAUNCH 分支,处理 AM LAUNCH 事件public synchronized void handle(AMLauncherEvent appEvent) {AMLauncherEventType event = appEvent.getType();RMAppAttempt application = appEvent.getAppAttempt();switch (event) {case LAUNCH:launch(application);break;case CLEANUP:cleanup(application);break;default:break;}}
launch 函数这里创建处理线程,并加入到 masterEvents 处理队列中
private void launch(RMAppAttempt application) {Runnable launcher = createRunnableLauncher(application,AMLauncherEventType.LAUNCH);masterEvents.add(launcher);}
然后在 LauncherThread 的线程类的 run 方法中去消费事件
private class LauncherThread extends Thread {public LauncherThread() {super("ApplicationMaster Launcher");}@Overridepublic void run() {while (!this.isInterrupted()) {Runnable toLaunch;try {toLaunch = masterEvents.take();launcherPool.execute(toLaunch);} catch (InterruptedException e) {LOG.warn(this.getClass().getName() + " interrupted. Returning.");return;}}}}
取出事件后具体的执行逻辑就交给 AMLaunch 类了,在 run 方法中处理
//org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.javapublic void run() {switch (eventType) {case LAUNCH:try {LOG.info("Launching master" + application.getAppAttemptId());// 启动 launch() 方法launch();// 发送 RMAppAttemptEventType.LAUNCHED 事件handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),RMAppAttemptEventType.LAUNCHED));} catch(Exception ie) {String message = "Error launching " + application.getAppAttemptId()+ ". Got exception: " + StringUtils.stringifyException(ie);LOG.info(message);handler.handle(new RMAppAttemptLaunchFailedEvent(application.getAppAttemptId(), message));}break;case CLEANUP: // 省略default:LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");break;}}
AMLaunch 类的 launch() 方法操作会调用 RPC 函数与 NodeManager 通信,来启动 AM Container,这里 AM 与 NM 交互是通过 ContainerManagementProtocol 协议来实现 RPC 调用的。launch() 方法运行完成后会向调度器发送 RMAppAttemptEventType.LAUNCHED 事件,并将 AppAttempt 的状态从 ALLOCATED 转换为 LAUNCHED。
//位置:org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.javaprivate void launch() throws IOException, YarnException {// ...// 构建 Container 请求信息StartContainerRequest scRequest =StartContainerRequest.newInstance(launchContext,masterContainer.getContainerToken());List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();list.add(scRequest);StartContainersRequest allRequests =StartContainersRequest.newInstance(list);// 重点:调用 RPC 函数启动 ContainerStartContainersResponse response =containerMgrProxy.startContainers(allRequests);if (response.getFailedRequests() != null&& response.getFailedRequests().containsKey(masterContainerID)) {Throwable t =response.getFailedRequests().get(masterContainerID).deSerialize();parseAndThrowException(t);} else {LOG.info("Done launching container " + masterContainer + " for AM "+ application.getAppAttemptId());}}
至此,用于运行 ApplicationMaster 的 Container 已经启动,AM Container 在具体的 NodeManager 上启动后,Container 会根据上下文信息启动 ApplicationMaster 进程,ApplicationMaster 生命周期的第一步 ApplicationMaster 启动在这里已经完成了。
疑惑:
