概述
- 本文主要讲Flink jobManager的实现。从源码角度分析其功能,内部核心组成及任务调度执行流程。主要从以下几个角度切入:
- JobManager简介
- 内部SchedulerNG介绍,任务调度执行流程。
- 内部Scheduler、SlotPool介绍,资源分配流程。
- 一些更细节的东西,shuffleMaster,checkPoint,coodinator等暂不讨论。
JobManager简介
对于Flink我们都知道,有Dispatcher负责所有任务的分配,管理启动等。JM则负责单个任务的生命周期,包括JobGraph至ExecutionGraph构建,任务资源申请以及调度执行,checkPoint等。在实现上,JobMaster为其实现类,其内部由几个核心组件:SlotPool,Scheduler,SchedulerNG。如下类图关系:

- SlotPool:其直接与ResourceManager连接,负责资源申请,保存。
- Scheduler:根据不同策略,调度SlotPool申请资源。
- SchedulerNG:实现类为DefaultScheduler,初始化ExecutionGraph,根据不同的调度策略,向Scheduler申请资源并提交任务。
JM构建时,会创建SchedulerNG,触发ExecutionGraph的构建。在JM启动后,SlotPool连接ResourceManager,SchedulerNG分配作业并向scheduler申请slot,scheduler根据不同策略向SlotPool申请资源,申请成功后ExecutionVertex进行Deploy。简略流程调用图如下:

SchedulerNG,任务调度流程
SchedulerNG负责更细粒度的task,包括ExucutionGraph构建,单个task级别调度,checkPoint触发,任务状态,恢复重启等管理。
JobMaster中很多对外的API直接委派给SchedulerNG。我们重点看下如何分配资源、执行任务。
- 入口函数为:startScheduling(),在父类中,注册Metric,启动OperatorCoordinators,具体调度由子类实现。
OperatorCoordinator持有者OperatorCoordinatorHolder由job manager调用,提供了一个上下文去负责checkPoint协调和精准一次消费语义。这里暂不深入讨论。
public final void startScheduling() {mainThreadExecutor.assertRunningInMainThread();registerJobMetrics();startAllOperatorCoordinators();startSchedulingInternal();}//schedule实现protected void startSchedulingInternal() {log.info("Starting scheduling with scheduling strategy [{}]", schedulingStrategy.getClass().getName());prepareExecutionGraphForNgScheduling(); //状态转换schedulingStrategy.startScheduling(); //根据不同策略调度}
- 调度策略
- PipelinedRegionSchedulingStrategy:流水线任务调度
- EagerSchedulingStrategy:所有任务同时调度 ,这是流式默认
- LazyFromSourcesSchedulingStrategy:源数据到达之后再调度,对应批处理
我们看下Eager调度
因为所有任务同时调度,因此直接将verticesTopo转换成deploymentOptions,交由schedulerNG申请资源并deploy。private void allocateSlotsAndDeploy(final Set<ExecutionVertexID> verticesToDeploy) {final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions =SchedulingStrategyUtils.createExecutionVertexDeploymentOptionsInTopologicalOrder(schedulingTopology,verticesToDeploy,id -> deploymentOption);//转换schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);//schedulerOperations即为schedulerNG}
schedulerNG资源申请
先根据exeuctionVertex构建request,再调用executionSlotAllocator申请资源。
ExecutionJobVertex是并行的体现,内部持有每一个并行的ExecutionVertex和需要的资源等信息。private List<SlotExecutionVertexAssignment> allocateSlots(final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {return executionSlotAllocator.allocateSlotsFor(executionVertexDeploymentOptions.stream().map(ExecutionVertexDeploymentOption::getExecutionVertexId).map(this::getExecutionVertex).map(ExecutionVertexSchedulingRequirementsMapper::from).collect(Collectors.toList()));}
ExecutionSlotAllocator申请资源,代码有点长略过一部分,大概意思是:先拿出预分配的,后遍历每一个请求,计算出首选位置然后组成Unit再去申请。
public List<SlotExecutionVertexAssignment> allocateSlotsFor(List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {//略Set<AllocationID> allPreviousAllocationIds = computeAllPriorAllocationIds(executionVertexSchedulingRequirements);for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();final SlotRequestId slotRequestId = new SlotRequestId();final SlotSharingGroupId slotSharingGroupId = schedulingRequirements.getSlotSharingGroupId();LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);CompletableFuture<LogicalSlot> slotFuture = calculatePreferredLocations(executionVertexId,schedulingRequirements.getPreferredLocations(),inputsLocationsRetriever).thenCompose((Collection<TaskManagerLocation> preferredLocations) ->slotProviderStrategy.allocateSlot(slotRequestId,new ScheduledUnit(executionVertexId,slotSharingGroupId,schedulingRequirements.getCoLocationConstraint()),SlotProfile.priorAllocation(schedulingRequirements.getTaskResourceProfile(),schedulingRequirements.getPhysicalSlotResourceProfile(),preferredLocations,Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),allPreviousAllocationIds)));//略}
我们看如何为每个ExcutionVertex计算首选位置(略过预分配的)
这个是根据输入源的位置确定的,如果source尚未启动,返回的是空集。static CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs(ExecutionVertexID executionVertexId,InputsLocationsRetriever inputsLocationsRetriever) {CompletableFuture<Collection<TaskManagerLocation>> preferredLocations =CompletableFuture.completedFuture(Collections.emptyList());Collection<CompletableFuture<TaskManagerLocation>> locationsFutures = new ArrayList<>();//获取输入源Collection<Collection<ExecutionVertexID>> allProducers =inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexId);for (Collection<ExecutionVertexID> producers : allProducers) {for (ExecutionVertexID producer : producers) {Optional<CompletableFuture<TaskManagerLocation>> optionalLocationFuture =inputsLocationsRetriever.getTaskManagerLocation(producer);optionalLocationFuture.ifPresent(locationsFutures::add);// If the parallelism is large, wait for all futures coming back may cost a long time.//输入源并行度超过8个的话,不进行计算。if (locationsFutures.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {locationsFutures.clear();break;}}//输入源为空的话,返回空集CompletableFuture<Collection<TaskManagerLocation>> uniqueLocationsFuture =FutureUtils.combineAll(locationsFutures).thenApply(HashSet::new);preferredLocations = preferredLocations.thenCombine(uniqueLocationsFuture,(locationsOnOneEdge, locationsOnAnotherEdge) -> {if ((!locationsOnOneEdge.isEmpty() && locationsOnAnotherEdge.size() > locationsOnOneEdge.size())|| locationsOnAnotherEdge.isEmpty()) {return locationsOnOneEdge;} else {return locationsOnAnotherEdge;}});locationsFutures.clear();}return preferredLocations;}
计算出首选位置后,我们看具体执行分配动作。分配的话有不同策略实现,目前有如下两种策略:
- BatchSlotProviderStrategy:这个只有在ScheduleMode设置为LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST时才是。
- NormalSlotProviderStrategy:默认流,批都是走这个分配策略。
NormalSlotProviderStrategy正常的分配:首先根据是否ShraingGroupId,选择不同的分配流程(group模式相对复杂,暂先不讨论)
private void internalAllocateSlot(CompletableFuture<LogicalSlot> allocationResultFuture,SlotRequestId slotRequestId,ScheduledUnit scheduledUnit,SlotProfile slotProfile,Time allocationTimeout) {CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?allocateSingleSlot(slotRequestId, slotProfile, allocationTimeout) :allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allocationTimeout);}
- 资源申请完事了,deploy相对来说就简单了。根据申请来的slot,deployOption等组成DeploymentHandle。首先LogicalSlot assignSlot,然后构造TDD进行deploy。
```java
private void waitForAllSlotsAndDeploy(final List
deploymentHandles) { FutureUtils.assertNoException(
} //assign 略一部分代码 public boolean tryAssignResource(final LogicalSlot logicalSlot) { // only allow to set the assigned resource in state SCHEDULED or CREATED // note: we also accept resource assignment when being in state CREATED for testing purposes if (state == SCHEDULED || state == CREATED) {assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
} }if (assignedResource == null) {assignedResource = logicalSlot;if (logicalSlot.tryAssignPayload(this)) {// check for concurrent modification (e.g. cancelling call)if ((state == SCHEDULED || state == CREATED) && !taskManagerLocationFuture.isDone()) {taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());assignedAllocationID = logicalSlot.getAllocationId();return true;}}}
//deploy private void deployTaskSafe(final ExecutionVertexID executionVertexId) { try { final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); executionVertexOperations.deploy(executionVertex); } catch (Throwable e) { handleTaskDeploymentFailure(executionVertexId, e); } } //默认直接调用executionVertex::deploy class DefaultExecutionVertexOperations implements ExecutionVertexOperations { @Override public void deploy(final ExecutionVertex executionVertex) throws JobException { executionVertex.deploy(); } }
<a name="Scheduler"></a>### SchedulerScheduler更主要的是slotProvider的实现。负责资源的申请,包括shraing模式的分配,以及根据不同策略申请slot。我们看下slotProvider接口:```javaCompletableFuture<LogicalSlot> allocateSlot();default CompletableFuture<LogicalSlot> allocateBatchSlot();default CompletableFuture<LogicalSlot> allocateSlot();void cancelSlotRequest();
- 上面说到,在不同slotProvdier策略下,最终都会由Scheduler实现最终的资源分配。如下是对应上面,分配单个槽的思路:
当未开启sharingGroup时,会先尝试从本地已有分配,本地没有则时申请slot。
private CompletableFuture<LogicalSlot> allocateSingleSlot(SlotRequestId slotRequestId,SlotProfile slotProfile,@Nullable Time allocationTimeout) {Optional<SlotAndLocality> slotAndLocality = tryAllocateFromAvailable(slotRequestId, slotProfile);if (slotAndLocality.isPresent()) {// already successful from availabletry {return CompletableFuture.completedFuture(completeAllocationByAssigningPayload(slotRequestId, slotAndLocality.get()));} catch (FlinkException e) {return FutureUtils.completedExceptionally(e);}} else {// we allocate by requesting a new slotreturn requestNewAllocatedSlot(slotRequestId, slotProfile, allocationTimeout).thenApply((PhysicalSlot allocatedSlot) -> {try {return completeAllocationByAssigningPayload(slotRequestId, new SlotAndLocality(allocatedSlot, Locality.UNKNOWN));} catch (FlinkException e) {throw new CompletionException(e);}});}}
requestNewSlot根据是否有超时时间,判断走流或批的申请
private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(SlotRequestId slotRequestId,SlotProfile slotProfile,@Nullable Time allocationTimeout) {if (allocationTimeout == null) {return slotPool.requestNewAllocatedBatchSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile());} else {return slotPool.requestNewAllocatedSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile(), allocationTimeout);}}
slotPool中实现最终通过rpc调用向resourceManager申请,这涉及到taskManager等,例如yarn per-job模式时,这里申请slot就会启动taskManager,也就是yarn上的container。
CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(jobMasterId,new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),rpcTimeout);
