PerJob在提交任务的时候,org.apache.flink.yarn.YarnClusterDescriptor#deployJobCluster参数的getYarnJobClusterEntrypoint() 就是获取AM入口类
AM调用上游:
@Overridepublic ClusterClientProvider<ApplicationId> deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)throws ClusterDeploymentException {try {return deployInternal(clusterSpecification,"Flink per-job cluster",// AM入口getYarnJobClusterEntrypoint(),jobGraph,detached);} catch (Exception e) {throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);}}
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherGatewayServiceFactory#create dispatcher的创建与启动
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(DispatcherId fencingToken,Collection<JobGraph> recoveredJobs,JobGraphWriter jobGraphWriter) {final Dispatcher dispatcher;try {dispatcher =dispatcherFactory.createDispatcher(rpcService,fencingToken,recoveredJobs,(dispatcherGateway, scheduledExecutor, errorHandler) ->new NoOpDispatcherBootstrap(),PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));} catch (Exception e) {throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);}dispatcher.start();return DefaultDispatcherGatewayService.from(dispatcher);}
akka服务,最终调用 org.apache.flink.runtime.dispatcher.Dispatcher#onStart
- 接收任务
- 启动master org.apache.flink.runtime.dispatcher.Dispatcher#createJobManagerRunner
jobMaster 启动流程 org.apache.flink.runtime.jobmaster.JobMaster#startJobExecution
AM的进程是 JobManager,里面三个组件jobMaster dispatcher resourcemanager,其中jobMaster是dispatcher创建的
ResourceManager启动
org.apache.flink.runtime.resourcemanager.ResourceManager#onStart
- 创建和tm rm的心跳
- org.apache.flink.runtime.registration.RegisteredRpcConnection#start JM注册到RM
- org.apache.flink.runtime.jobmaster.JobMaster#establishResourceManagerConnection
- org.apache.flink.runtime.jobmaster.slotpool.SlotPool#connectToResourceManager
- org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#requestSlotFromResourceManager
- org.apache.flink.runtime.jobmaster.slotpool.SlotPool#connectToResourceManager
- org.apache.flink.runtime.jobmaster.JobMaster#establishResourceManagerConnection
最终由ResourceManager内部的SlotManager向Yarn的ResourceManager申请资源
SlotManager 真正管理RM资源的组件
Task运行入口
- TaskExecutorRunner 进程名称 yarn运行入口 org.apache.flink.yarn.YarnTaskExecutorRunner#main
- TaskManagerRunner standlone运行入口
TaskManager最终RPC到TaskExecutor的onstart()方法
- 向ResourceManager注册
- 注册成功之后 org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerRegistrationListener#onRegistrationSuccess
- org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#registerTaskManager slotManager注册slot
- org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#registerSlot
TM向RM注册slot,最终由slotManager完成
slotManager如何分配slot
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#allocateSlot
- org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable#allocateSlot(int, org.apache.flink.api.common.JobID, org.apache.flink.runtime.clusterframework.types.AllocationID, org.apache.flink.runtime.clusterframework.types.ResourceProfile, org.apache.flink.api.common.time.Time)
向JobMaster提供slot
- org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager
- org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#offerSlots
