概述

  1. 本文主要讲Flink jobManager的实现。从源码角度分析其功能,内部核心组成及任务调度执行流程。主要从以下几个角度切入:
    • JobManager简介
    • 内部SchedulerNG介绍,任务调度执行流程。
    • 内部Scheduler、SlotPool介绍,资源分配流程。
  2. 一些更细节的东西,shuffleMaster,checkPoint,coodinator等暂不讨论。

JobManager简介

对于Flink我们都知道,有Dispatcher负责所有任务的分配,管理启动等。JM则负责单个任务的生命周期,包括JobGraph至ExecutionGraph构建,任务资源申请以及调度执行,checkPoint等。在实现上,JobMaster为其实现类,其内部由几个核心组件:SlotPool,Scheduler,SchedulerNG。如下类图关系:

JobMaster结构类图.png

  • SlotPool:其直接与ResourceManager连接,负责资源申请,保存。
  • Scheduler:根据不同策略,调度SlotPool申请资源。
  • SchedulerNG:实现类为DefaultScheduler,初始化ExecutionGraph,根据不同的调度策略,向Scheduler申请资源并提交任务。

JM构建时,会创建SchedulerNG,触发ExecutionGraph的构建。在JM启动后,SlotPool连接ResourceManager,SchedulerNG分配作业并向scheduler申请slot,scheduler根据不同策略向SlotPool申请资源,申请成功后ExecutionVertex进行Deploy。简略流程调用图如下:

JobManager调度流程.png

SchedulerNG,任务调度流程

SchedulerNG负责更细粒度的task,包括ExucutionGraph构建,单个task级别调度,checkPoint触发,任务状态,恢复重启等管理。

JobMaster中很多对外的API直接委派给SchedulerNG。我们重点看下如何分配资源、执行任务。

  • 入口函数为:startScheduling(),在父类中,注册Metric,启动OperatorCoordinators,具体调度由子类实现。

OperatorCoordinator持有者OperatorCoordinatorHolder由job manager调用,提供了一个上下文去负责checkPoint协调和精准一次消费语义。这里暂不深入讨论。

  1. public final void startScheduling() {
  2. mainThreadExecutor.assertRunningInMainThread();
  3. registerJobMetrics();
  4. startAllOperatorCoordinators();
  5. startSchedulingInternal();
  6. }
  7. //schedule实现
  8. protected void startSchedulingInternal() {
  9. log.info("Starting scheduling with scheduling strategy [{}]", schedulingStrategy.getClass().getName());
  10. prepareExecutionGraphForNgScheduling(); //状态转换
  11. schedulingStrategy.startScheduling(); //根据不同策略调度
  12. }
  • 调度策略
    • PipelinedRegionSchedulingStrategy:流水线任务调度
    • EagerSchedulingStrategy:所有任务同时调度 ,这是流式默认
    • LazyFromSourcesSchedulingStrategy:源数据到达之后再调度,对应批处理
  • 我们看下Eager调度
    因为所有任务同时调度,因此直接将verticesTopo转换成deploymentOptions,交由schedulerNG申请资源并deploy。

    1. private void allocateSlotsAndDeploy(final Set<ExecutionVertexID> verticesToDeploy) {
    2. final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions =
    3. SchedulingStrategyUtils.createExecutionVertexDeploymentOptionsInTopologicalOrder(
    4. schedulingTopology,
    5. verticesToDeploy,
    6. id -> deploymentOption);//转换
    7. schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);//schedulerOperations即为schedulerNG
    8. }
  • schedulerNG资源申请
    先根据exeuctionVertex构建request,再调用executionSlotAllocator申请资源。
    ExecutionJobVertex是并行的体现,内部持有每一个并行的ExecutionVertex和需要的资源等信息。

    1. private List<SlotExecutionVertexAssignment> allocateSlots(final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
    2. return executionSlotAllocator.allocateSlotsFor(executionVertexDeploymentOptions
    3. .stream()
    4. .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
    5. .map(this::getExecutionVertex)
    6. .map(ExecutionVertexSchedulingRequirementsMapper::from)
    7. .collect(Collectors.toList()));
    8. }
  • ExecutionSlotAllocator申请资源,代码有点长略过一部分,大概意思是:先拿出预分配的,后遍历每一个请求,计算出首选位置然后组成Unit再去申请。

    1. public List<SlotExecutionVertexAssignment> allocateSlotsFor(
    2. List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
    3. //略
    4. Set<AllocationID> allPreviousAllocationIds = computeAllPriorAllocationIds(executionVertexSchedulingRequirements);
    5. for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
    6. final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
    7. final SlotRequestId slotRequestId = new SlotRequestId();
    8. final SlotSharingGroupId slotSharingGroupId = schedulingRequirements.getSlotSharingGroupId();
    9. LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
    10. CompletableFuture<LogicalSlot> slotFuture = calculatePreferredLocations(
    11. executionVertexId,
    12. schedulingRequirements.getPreferredLocations(),
    13. inputsLocationsRetriever).thenCompose(
    14. (Collection<TaskManagerLocation> preferredLocations) ->
    15. slotProviderStrategy.allocateSlot(
    16. slotRequestId,
    17. new ScheduledUnit(
    18. executionVertexId,
    19. slotSharingGroupId,
    20. schedulingRequirements.getCoLocationConstraint()),
    21. SlotProfile.priorAllocation(
    22. schedulingRequirements.getTaskResourceProfile(),
    23. schedulingRequirements.getPhysicalSlotResourceProfile(),
    24. preferredLocations,
    25. Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
    26. allPreviousAllocationIds)));
    27. //略
    28. }
  • 我们看如何为每个ExcutionVertex计算首选位置(略过预分配的)
    这个是根据输入源的位置确定的,如果source尚未启动,返回的是空集。

    1. static CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs(
    2. ExecutionVertexID executionVertexId,
    3. InputsLocationsRetriever inputsLocationsRetriever) {
    4. CompletableFuture<Collection<TaskManagerLocation>> preferredLocations =
    5. CompletableFuture.completedFuture(Collections.emptyList());
    6. Collection<CompletableFuture<TaskManagerLocation>> locationsFutures = new ArrayList<>();
    7. //获取输入源
    8. Collection<Collection<ExecutionVertexID>> allProducers =
    9. inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexId);
    10. for (Collection<ExecutionVertexID> producers : allProducers) {
    11. for (ExecutionVertexID producer : producers) {
    12. Optional<CompletableFuture<TaskManagerLocation>> optionalLocationFuture =
    13. inputsLocationsRetriever.getTaskManagerLocation(producer);
    14. optionalLocationFuture.ifPresent(locationsFutures::add);
    15. // If the parallelism is large, wait for all futures coming back may cost a long time.
    16. //输入源并行度超过8个的话,不进行计算。
    17. if (locationsFutures.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
    18. locationsFutures.clear();
    19. break;
    20. }
    21. }
    22. //输入源为空的话,返回空集
    23. CompletableFuture<Collection<TaskManagerLocation>> uniqueLocationsFuture =
    24. FutureUtils.combineAll(locationsFutures).thenApply(HashSet::new);
    25. preferredLocations = preferredLocations.thenCombine(
    26. uniqueLocationsFuture,
    27. (locationsOnOneEdge, locationsOnAnotherEdge) -> {
    28. if ((!locationsOnOneEdge.isEmpty() && locationsOnAnotherEdge.size() > locationsOnOneEdge.size())
    29. || locationsOnAnotherEdge.isEmpty()) {
    30. return locationsOnOneEdge;
    31. } else {
    32. return locationsOnAnotherEdge;
    33. }
    34. });
    35. locationsFutures.clear();
    36. }
    37. return preferredLocations;
    38. }
  • 计算出首选位置后,我们看具体执行分配动作。分配的话有不同策略实现,目前有如下两种策略:

    • BatchSlotProviderStrategy:这个只有在ScheduleMode设置为LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST时才是。
    • NormalSlotProviderStrategy:默认流,批都是走这个分配策略。

NormalSlotProviderStrategy正常的分配:首先根据是否ShraingGroupId,选择不同的分配流程(group模式相对复杂,暂先不讨论)

  1. private void internalAllocateSlot(
  2. CompletableFuture<LogicalSlot> allocationResultFuture,
  3. SlotRequestId slotRequestId,
  4. ScheduledUnit scheduledUnit,
  5. SlotProfile slotProfile,
  6. Time allocationTimeout) {
  7. CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
  8. allocateSingleSlot(slotRequestId, slotProfile, allocationTimeout) :
  9. allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allocationTimeout);
  10. }
  • 资源申请完事了,deploy相对来说就简单了。根据申请来的slot,deployOption等组成DeploymentHandle。首先LogicalSlot assignSlot,然后构造TDD进行deploy。 ```java private void waitForAllSlotsAndDeploy(final List deploymentHandles) { FutureUtils.assertNoException(
    1. assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
    } //assign 略一部分代码 public boolean tryAssignResource(final LogicalSlot logicalSlot) { // only allow to set the assigned resource in state SCHEDULED or CREATED // note: we also accept resource assignment when being in state CREATED for testing purposes if (state == SCHEDULED || state == CREATED) {
    1. if (assignedResource == null) {
    2. assignedResource = logicalSlot;
    3. if (logicalSlot.tryAssignPayload(this)) {
    4. // check for concurrent modification (e.g. cancelling call)
    5. if ((state == SCHEDULED || state == CREATED) && !taskManagerLocationFuture.isDone()) {
    6. taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
    7. assignedAllocationID = logicalSlot.getAllocationId();
    8. return true;
    9. }
    10. }
    11. }
    } }

//deploy private void deployTaskSafe(final ExecutionVertexID executionVertexId) { try { final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); executionVertexOperations.deploy(executionVertex); } catch (Throwable e) { handleTaskDeploymentFailure(executionVertexId, e); } } //默认直接调用executionVertex::deploy class DefaultExecutionVertexOperations implements ExecutionVertexOperations { @Override public void deploy(final ExecutionVertex executionVertex) throws JobException { executionVertex.deploy(); } }

  1. <a name="Scheduler"></a>
  2. ### Scheduler
  3. Scheduler更主要的是slotProvider的实现。负责资源的申请,包括shraing模式的分配,以及根据不同策略申请slot。我们看下slotProvider接口:
  4. ```java
  5. CompletableFuture<LogicalSlot> allocateSlot();
  6. default CompletableFuture<LogicalSlot> allocateBatchSlot();
  7. default CompletableFuture<LogicalSlot> allocateSlot();
  8. void cancelSlotRequest();
  • 上面说到,在不同slotProvdier策略下,最终都会由Scheduler实现最终的资源分配。如下是对应上面,分配单个槽的思路:

当未开启sharingGroup时,会先尝试从本地已有分配,本地没有则时申请slot。

  1. private CompletableFuture<LogicalSlot> allocateSingleSlot(
  2. SlotRequestId slotRequestId,
  3. SlotProfile slotProfile,
  4. @Nullable Time allocationTimeout) {
  5. Optional<SlotAndLocality> slotAndLocality = tryAllocateFromAvailable(slotRequestId, slotProfile);
  6. if (slotAndLocality.isPresent()) {
  7. // already successful from available
  8. try {
  9. return CompletableFuture.completedFuture(
  10. completeAllocationByAssigningPayload(slotRequestId, slotAndLocality.get()));
  11. } catch (FlinkException e) {
  12. return FutureUtils.completedExceptionally(e);
  13. }
  14. } else {
  15. // we allocate by requesting a new slot
  16. return requestNewAllocatedSlot(slotRequestId, slotProfile, allocationTimeout)
  17. .thenApply((PhysicalSlot allocatedSlot) -> {
  18. try {
  19. return completeAllocationByAssigningPayload(slotRequestId, new SlotAndLocality(allocatedSlot, Locality.UNKNOWN));
  20. } catch (FlinkException e) {
  21. throw new CompletionException(e);
  22. }
  23. });
  24. }
  25. }
  • requestNewSlot根据是否有超时时间,判断走流或批的申请

    1. private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
    2. SlotRequestId slotRequestId,
    3. SlotProfile slotProfile,
    4. @Nullable Time allocationTimeout) {
    5. if (allocationTimeout == null) {
    6. return slotPool.requestNewAllocatedBatchSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile());
    7. } else {
    8. return slotPool.requestNewAllocatedSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile(), allocationTimeout);
    9. }
    10. }
  • slotPool中实现最终通过rpc调用向resourceManager申请,这涉及到taskManager等,例如yarn per-job模式时,这里申请slot就会启动taskManager,也就是yarn上的container。

    1. CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
    2. jobMasterId,
    3. new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
    4. rpcTimeout);