PerJob在提交任务的时候,org.apache.flink.yarn.YarnClusterDescriptor#deployJobCluster参数的getYarnJobClusterEntrypoint() 就是获取AM入口类
AM调用上游:
@Override
public ClusterClientProvider<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
// AM入口
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherGatewayServiceFactory#create dispatcher的创建与启动
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
final Dispatcher dispatcher;
try {
dispatcher =
dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new NoOpDispatcherBootstrap(),
PartialDispatcherServicesWithJobGraphStore.from(
partialDispatcherServices, jobGraphWriter));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
akka服务,最终调用 org.apache.flink.runtime.dispatcher.Dispatcher#onStart
- 接收任务
- 启动master org.apache.flink.runtime.dispatcher.Dispatcher#createJobManagerRunner
jobMaster 启动流程 org.apache.flink.runtime.jobmaster.JobMaster#startJobExecution
AM的进程是 JobManager,里面三个组件jobMaster dispatcher resourcemanager,其中jobMaster是dispatcher创建的
ResourceManager启动
org.apache.flink.runtime.resourcemanager.ResourceManager#onStart
- 创建和tm rm的心跳
- org.apache.flink.runtime.registration.RegisteredRpcConnection#start JM注册到RM
- org.apache.flink.runtime.jobmaster.JobMaster#establishResourceManagerConnection
- org.apache.flink.runtime.jobmaster.slotpool.SlotPool#connectToResourceManager
- org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#requestSlotFromResourceManager
- org.apache.flink.runtime.jobmaster.slotpool.SlotPool#connectToResourceManager
- org.apache.flink.runtime.jobmaster.JobMaster#establishResourceManagerConnection
最终由ResourceManager内部的SlotManager向Yarn的ResourceManager申请资源
SlotManager 真正管理RM资源的组件
Task运行入口
- TaskExecutorRunner 进程名称 yarn运行入口 org.apache.flink.yarn.YarnTaskExecutorRunner#main
- TaskManagerRunner standlone运行入口
TaskManager最终RPC到TaskExecutor的onstart()方法
- 向ResourceManager注册
- 注册成功之后 org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerRegistrationListener#onRegistrationSuccess
- org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#registerTaskManager slotManager注册slot
- org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#registerSlot
TM向RM注册slot,最终由slotManager完成
slotManager如何分配slot
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#allocateSlot
- org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable#allocateSlot(int, org.apache.flink.api.common.JobID, org.apache.flink.runtime.clusterframework.types.AllocationID, org.apache.flink.runtime.clusterframework.types.ResourceProfile, org.apache.flink.api.common.time.Time)
向JobMaster提供slot
- org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager
- org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#offerSlots