PerJob在提交任务的时候,org.apache.flink.yarn.YarnClusterDescriptor#deployJobCluster参数的getYarnJobClusterEntrypoint() 就是获取AM入口类
    AM调用上游:

    1. @Override
    2. public ClusterClientProvider<ApplicationId> deployJobCluster(
    3. ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
    4. throws ClusterDeploymentException {
    5. try {
    6. return deployInternal(
    7. clusterSpecification,
    8. "Flink per-job cluster",
    9. // AM入口
    10. getYarnJobClusterEntrypoint(),
    11. jobGraph,
    12. detached);
    13. } catch (Exception e) {
    14. throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
    15. }
    16. }

    org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

    org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherGatewayServiceFactory#create dispatcher的创建与启动

    1. public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
    2. DispatcherId fencingToken,
    3. Collection<JobGraph> recoveredJobs,
    4. JobGraphWriter jobGraphWriter) {
    5. final Dispatcher dispatcher;
    6. try {
    7. dispatcher =
    8. dispatcherFactory.createDispatcher(
    9. rpcService,
    10. fencingToken,
    11. recoveredJobs,
    12. (dispatcherGateway, scheduledExecutor, errorHandler) ->
    13. new NoOpDispatcherBootstrap(),
    14. PartialDispatcherServicesWithJobGraphStore.from(
    15. partialDispatcherServices, jobGraphWriter));
    16. } catch (Exception e) {
    17. throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
    18. }
    19. dispatcher.start();
    20. return DefaultDispatcherGatewayService.from(dispatcher);
    21. }

    akka服务,最终调用 org.apache.flink.runtime.dispatcher.Dispatcher#onStart

    • 接收任务
    • 启动master org.apache.flink.runtime.dispatcher.Dispatcher#createJobManagerRunner

    jobMaster 启动流程 org.apache.flink.runtime.jobmaster.JobMaster#startJobExecution

    AM的进程是 JobManager,里面三个组件jobMaster dispatcher resourcemanager,其中jobMaster是dispatcher创建的

    ResourceManager启动
    org.apache.flink.runtime.resourcemanager.ResourceManager#onStart

    • 创建和tm rm的心跳
    • org.apache.flink.runtime.registration.RegisteredRpcConnection#start JM注册到RM
      • org.apache.flink.runtime.jobmaster.JobMaster#establishResourceManagerConnection
        • org.apache.flink.runtime.jobmaster.slotpool.SlotPool#connectToResourceManager
          • org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#requestSlotFromResourceManager

    最终由ResourceManager内部的SlotManager向Yarn的ResourceManager申请资源

    SlotManager 真正管理RM资源的组件

    Task运行入口

    • TaskExecutorRunner 进程名称 yarn运行入口 org.apache.flink.yarn.YarnTaskExecutorRunner#main
    • TaskManagerRunner standlone运行入口

    TaskManager最终RPC到TaskExecutor的onstart()方法

    • 向ResourceManager注册
    • 注册成功之后 org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerRegistrationListener#onRegistrationSuccess
    • org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#registerTaskManager slotManager注册slot
      • org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#registerSlot

    TM向RM注册slot,最终由slotManager完成

    slotManager如何分配slot
    org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl#allocateSlot

    • org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable#allocateSlot(int, org.apache.flink.api.common.JobID, org.apache.flink.runtime.clusterframework.types.AllocationID, org.apache.flink.runtime.clusterframework.types.ResourceProfile, org.apache.flink.api.common.time.Time)

    向JobMaster提供slot

    • org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager
    • org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl#offerSlots