准备

  1. 首先了解调度相关的配置
    SlotSharingGroup
    CoLocationGroup
    要了解一下Flink官网的Jobs and Scheduling 。熟悉JobGraphExecutionGraph及其附属的数据结构。
  2. 此外我们基于默认的配置进行源码解析。

    1. public static final ConfigOption<SchedulerType> SCHEDULER =
    2. key("jobmanager.scheduler")
    3. .enumType(SchedulerType.class)
    4. .defaultValue(SchedulerType.Ng)

    //org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory#fromConfiguration public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration(

    1. Configuration configuration, JobType jobType) {

    ··· switch (schedulerType) {

    1. case Ng:
    2. schedulerNGFactory = new DefaultSchedulerFactory();
    3. slotPoolServiceFactory =
    4. new DeclarativeSlotPoolBridgeServiceFactory(
    5. SystemClock.getInstance(),
    6. rpcTimeout,
    7. slotIdleTimeout,
    8. batchSlotTimeout,
    9. getRequestSlotMatchingStrategy(configuration, jobType));
    10. break;

    ··· } ``` 有jobmanager.scheduler默认值为Ng,可以得到。
    SchedulerNG:DefaultScheduler
    SchedulingStrategy:PipelinedRegionSchedulingStrategy,具体调度策略。
    优化点:1.防止deadlock。2.统一流批逻辑。
    按region分配资源,整个shuffled streaming task 为一个region。批处理根据shuffle 分为多个region。一个region的slot都满足才会执行。

  3. Declarative Resource management

    任务调度

    在上一文Flink Application模式任务提交中分析了JobGraph提交到Dispatch的过程。
    本文分析一下Dispatch调度到TaskManager启动任务的过程。

在Dispatch部分,submitJob过程主要是创建并启动jobManagerRunner。
jobManagerRunner lead选举,回调创建JobMasterServiceProcess和JobMaster,此过程和Dispatch启动类似,在Flink任务提交一文中已经介绍过了。在此从JobMaster startJobExecution()任务调度开始分析。

JobMaster初始化

  1. SlotPoolService初始化,JobMaster本地持有一个slotpool,优先从此pool申请slot。
  2. ResourceManagerLeaderRetriever初始化,监测获取resource manager的地址,初始化ResourceManagerConnection并向RM注册JobMaster,如果注册成功回调slotPoolService.connectToResourceManager(resourceManagerGateway),slotPoolService连接RM,以便后续向RM申请资源。

调度部署

JobMaster通过DefaultScheduler#startScheduling开始执行任务调度。

  1. transitionToRunning把JobStatus改为Running。
  2. schedulingStrategy.startScheduling()。此处是PipelinedRegionSchedulingStrategy,主要实现基于regions的调度策略。
    1. 计算获取region。
    2. schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
      schedulerOperations是Scheduler接口,把deploy信息传给scheduler申请资源和部署。
  3. DefaultScheduler#allocateSlotsAndDeploy。资源的申请是异步的,发请求-》wait-》收到资源回调-》deploy

    1. transitionToScheduled把ExecutionState改为SCHEDULED状态
    2. allocateSlots(executionVertexDeploymentOptions)申请资源,此处调用逻辑比较复杂,简单罗列一下调用链,不具体分析:
      SlotSharingExecutionSlotAllocator#allocateSlotsFor-》SlotSharingExecutionSlotAllocator#getOrAllocateSharedSlot-》PhysicalSlotProviderImpl#allocatePhysicalSlot-》PhysicalSlotProviderImpl#requestNewSlot-》DeclarativeSlotPoolBridge#requestNewAllocatedSlot-》DeclarativeSlotPoolBridge#internalRequestNewSlot-》DeclarativeSlotPoolBridge#internalRequestNewAllocatedSlot-》DefaultDeclarativeSlotPool#increaseResourceRequirementsBy-》DefaultDeclarativeSlotPool#declareResourceRequirements
      最终会通过DeclarativeSlotPoolService#declareResourceRequirements调用resourceManagerGateway.declareRequiredResources向resource manager申请资源。
      ResourceManager资源申请看ResourceManager这篇文章。
    3. 上面DeclarativeSlotPoolBridge#internalRequestNewAllocatedSlot这一步,会记录下来pendingRequest,里面包含slot申请的future slotFuture,后续offerSlots(见下面备注)时会回调slotFuture.complete。

      资源申请失败由ResourceManager回调JobMaster。
      申请成功task manger回调JobMaster#offerSlots通知JobMaster。JM继续调用DefaultDeclarativeSlotPool#offerSlots-》DefaultDeclarativeSlotPool#internalOfferSlots,在此处回调NewSlotsListener,也就是DeclarativeSlotPoolBridge#newSlotsAreAvailable方法。
      此处主要逻辑是

      1. 1. 匹配slot满足的pendingRequest
      2. 1. 调用pendingRequest.fulfill(slot)去slotFuture.complete(slot);
    4. waitForAllSlotsAndDeploy等待3.b里的资源申请完成并部署
      资源申请wait的直接future是CompletableFuture logicalSlotFuture,logicalSlotFuture最终是由pendingRequest里的CompletableFuture slotFuture转化来的。
      Deploy逻辑ExecutionVertex#deploy

  4. ExecutionVertex#deploy
    taskManagerGateway.submitTask(deployment, rpcTimeout)提交任务到TM。