一、ApplicationMaster 整体运行流程
二、ApplicationMaster 启动流程源码分析
2.1 应用程序提交
不管是什么类型的应用程序,提交到 Yarn 上的入口,都是通过 YarnClient 这个接口 api 提交的,具体提交方法为 submitApplication()。
//位置:org/apache/hadoop/yarn/client/api/YarnClient.java
public abstract ApplicationId submitApplication(
ApplicationSubmissionContext appContext) throws YarnException,
IOException;
2.3 启动 AM
相关代码在这里:
在 ApplicationMasterLauncher
类中处理相关的启动事件
public class ApplicationMasterLauncher extends AbstractService implements
EventHandler<AMLauncherEvent> {
// ...
// handle 函数中走 LAUNCH 分支,处理 AM LAUNCH 事件
public synchronized void handle(AMLauncherEvent appEvent) {
AMLauncherEventType event = appEvent.getType();
RMAppAttempt application = appEvent.getAppAttempt();
switch (event) {
case LAUNCH:
launch(application);
break;
case CLEANUP:
cleanup(application);
break;
default:
break;
}
}
launch
函数这里创建处理线程,并加入到 masterEvents
处理队列中
private void launch(RMAppAttempt application) {
Runnable launcher = createRunnableLauncher(application,
AMLauncherEventType.LAUNCH);
masterEvents.add(launcher);
}
然后在 LauncherThread
的线程类的 run 方法中去消费事件
private class LauncherThread extends Thread {
public LauncherThread() {
super("ApplicationMaster Launcher");
}
@Override
public void run() {
while (!this.isInterrupted()) {
Runnable toLaunch;
try {
toLaunch = masterEvents.take();
launcherPool.execute(toLaunch);
} catch (InterruptedException e) {
LOG.warn(this.getClass().getName() + " interrupted. Returning.");
return;
}
}
}
}
取出事件后具体的执行逻辑就交给 AMLaunch 类了,在 run 方法中处理
//org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
public void run() {
switch (eventType) {
case LAUNCH:
try {
LOG.info("Launching master" + application.getAppAttemptId());
// 启动 launch() 方法
launch();
// 发送 RMAppAttemptEventType.LAUNCHED 事件
handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
} catch(Exception ie) {
String message = "Error launching " + application.getAppAttemptId()
+ ". Got exception: " + StringUtils.stringifyException(ie);
LOG.info(message);
handler.handle(new RMAppAttemptLaunchFailedEvent(application
.getAppAttemptId(), message));
}
break;
case CLEANUP: // 省略
default:
LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");
break;
}
}
AMLaunch 类的 launch() 方法操作会调用 RPC 函数与 NodeManager 通信,来启动 AM Container,这里 AM 与 NM 交互是通过 ContainerManagementProtocol 协议来实现 RPC 调用的。launch() 方法运行完成后会向调度器发送 RMAppAttemptEventType.LAUNCHED 事件,并将 AppAttempt 的状态从 ALLOCATED 转换为 LAUNCHED。
//位置:org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
private void launch() throws IOException, YarnException {
// ...
// 构建 Container 请求信息
StartContainerRequest scRequest =
StartContainerRequest.newInstance(launchContext,
masterContainer.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
// 重点:调用 RPC 函数启动 Container
StartContainersResponse response =
containerMgrProxy.startContainers(allRequests);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(masterContainerID)) {
Throwable t =
response.getFailedRequests().get(masterContainerID).deSerialize();
parseAndThrowException(t);
} else {
LOG.info("Done launching container " + masterContainer + " for AM "
+ application.getAppAttemptId());
}
}
至此,用于运行 ApplicationMaster 的 Container 已经启动,AM Container 在具体的 NodeManager 上启动后,Container 会根据上下文信息启动 ApplicationMaster 进程,ApplicationMaster 生命周期的第一步 ApplicationMaster 启动在这里已经完成了。
疑惑: