准备
- 首先了解调度相关的配置
SlotSharingGroup
CoLocationGroup
要了解一下Flink官网的Jobs and Scheduling 。熟悉JobGraph和ExecutionGraph及其附属的数据结构。 此外我们基于默认的配置进行源码解析。
public static final ConfigOption<SchedulerType> SCHEDULER =
key("jobmanager.scheduler")
.enumType(SchedulerType.class)
.defaultValue(SchedulerType.Ng)
//org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory#fromConfiguration public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration(
Configuration configuration, JobType jobType) {
··· switch (schedulerType) {
case Ng:
schedulerNGFactory = new DefaultSchedulerFactory();
slotPoolServiceFactory =
new DeclarativeSlotPoolBridgeServiceFactory(
SystemClock.getInstance(),
rpcTimeout,
slotIdleTimeout,
batchSlotTimeout,
getRequestSlotMatchingStrategy(configuration, jobType));
break;
··· } ``` 有jobmanager.scheduler默认值为Ng,可以得到。
SchedulerNG:DefaultScheduler
SchedulingStrategy:PipelinedRegionSchedulingStrategy,具体调度策略。
优化点:1.防止deadlock。2.统一流批逻辑。
按region分配资源,整个shuffled streaming task 为一个region。批处理根据shuffle 分为多个region。一个region的slot都满足才会执行。Declarative Resource management
任务调度
在上一文Flink Application模式任务提交中分析了JobGraph提交到Dispatch的过程。
本文分析一下Dispatch调度到TaskManager启动任务的过程。
在Dispatch部分,submitJob过程主要是创建并启动jobManagerRunner。
jobManagerRunner lead选举,回调创建JobMasterServiceProcess和JobMaster,此过程和Dispatch启动类似,在Flink任务提交一文中已经介绍过了。在此从JobMaster startJobExecution()任务调度开始分析。
JobMaster初始化
- SlotPoolService初始化,JobMaster本地持有一个slotpool,优先从此pool申请slot。
- ResourceManagerLeaderRetriever初始化,监测获取resource manager的地址,初始化ResourceManagerConnection并向RM注册JobMaster,如果注册成功回调slotPoolService.connectToResourceManager(resourceManagerGateway),slotPoolService连接RM,以便后续向RM申请资源。
调度部署
JobMaster通过DefaultScheduler#startScheduling开始执行任务调度。
- transitionToRunning把JobStatus改为Running。
- schedulingStrategy.startScheduling()。此处是PipelinedRegionSchedulingStrategy,主要实现基于regions的调度策略。
- 计算获取region。
- schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
schedulerOperations是Scheduler接口,把deploy信息传给scheduler申请资源和部署。
DefaultScheduler#allocateSlotsAndDeploy。资源的申请是异步的,发请求-》wait-》收到资源回调-》deploy
- transitionToScheduled把ExecutionState改为SCHEDULED状态
- allocateSlots(executionVertexDeploymentOptions)申请资源,此处调用逻辑比较复杂,简单罗列一下调用链,不具体分析:
SlotSharingExecutionSlotAllocator#allocateSlotsFor-》SlotSharingExecutionSlotAllocator#getOrAllocateSharedSlot-》PhysicalSlotProviderImpl#allocatePhysicalSlot-》PhysicalSlotProviderImpl#requestNewSlot-》DeclarativeSlotPoolBridge#requestNewAllocatedSlot-》DeclarativeSlotPoolBridge#internalRequestNewSlot-》DeclarativeSlotPoolBridge#internalRequestNewAllocatedSlot-》DefaultDeclarativeSlotPool#increaseResourceRequirementsBy-》DefaultDeclarativeSlotPool#declareResourceRequirements
最终会通过DeclarativeSlotPoolService#declareResourceRequirements调用resourceManagerGateway.declareRequiredResources向resource manager申请资源。
ResourceManager资源申请看ResourceManager这篇文章。 上面DeclarativeSlotPoolBridge#internalRequestNewAllocatedSlot这一步,会记录下来pendingRequest,里面包含slot申请的future slotFuture,后续offerSlots(见下面备注)时会回调slotFuture.complete。
资源申请失败由ResourceManager回调JobMaster。
申请成功task manger回调JobMaster#offerSlots通知JobMaster。JM继续调用DefaultDeclarativeSlotPool#offerSlots-》DefaultDeclarativeSlotPool#internalOfferSlots,在此处回调NewSlotsListener,也就是DeclarativeSlotPoolBridge#newSlotsAreAvailable方法。
此处主要逻辑是1. 匹配slot满足的pendingRequest
1. 调用pendingRequest.fulfill(slot)去slotFuture.complete(slot);
waitForAllSlotsAndDeploy等待3.b里的资源申请完成并部署
资源申请wait的直接future是CompletableFuturelogicalSlotFuture,logicalSlotFuture最终是由pendingRequest里的CompletableFuture slotFuture转化来的。
Deploy逻辑ExecutionVertex#deploy
- ExecutionVertex#deploy
taskManagerGateway.submitTask(deployment, rpcTimeout)提交任务到TM。