Slot 概念
Flink中的Slot是一组资源的集合,包含CPU核心数,task堆内存,task对外内存,管理内存和网络内存。同时slot也是Flink的资源分配单位。
一个TaskManager中包含一个或者多个Slot。根据slot共享配置,一个slot中可同时运行多个task。这些task以工作线程的形式存在于slot中。
TaskManager,Slot,Task和并行度parallelism的关系如下图所示(引用官网的图):
Slot 相关的一些类
SchedulerImpl
SchedulerImpl负责为Execution节点的任务执行分配slot。
在后面的分析中涉及到的SchedulerImpl两个最重要的方法为allocateSlot和allocateBatchSlot。这两个方法的逻辑基本相同,只是前一个方法参数中多了分配slot超时时间。
具体分配slot的流程较为复杂,在后面分析slot申请流程的时候再讲解。
SlotSharingManager
SlotSharingManager负责Slot共享。Slot共享指的是不同的task在同一个slot中运行。
SlotSharingManager维护了一个slot层级结构:其中根节点和层级结构的中间节点为MultiTaskSlot。MultiTaskSlot可从属于另一个MultiTaskSlot,同时它又包含多个MultiTaskSlot或SingleTaskSlot,这样就形成了层级结构。SingleTaskSlot是slot层级结构中的最底层节点,只能拥有一个parent作为它的父节点。
Slot共享正是通过这种层级结构体现出来的。一个Slot被多个task共享,以Slot层级结构表示就是一个MultiTaskSlot包含多个SingleTaskSlot。
下面我们分析下几个重要的方法。
createRootSlot
创建一个根节点slot,该Slot的类型为MultiTaskSlot。
@Nonnull
MultiTaskSlot createRootSlot(
SlotRequestId slotRequestId,
CompletableFuture<? extends SlotContext> slotContextFuture,
SlotRequestId allocatedSlotRequestId) {
LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId);
final CompletableFuture<SlotContext> slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();
// 创建一个根节点
// 这个方法同时将创建出的MultiTaskSlot存入到allTaskSlots和unresolvedRootSlots集合中
final MultiTaskSlot rootMultiTaskSlot = createAndRegisterRootSlot(
slotRequestId,
allocatedSlotRequestId,
slotContextFutureAfterRootSlotResolution);
// 当slotContextFuture完成后执行
// slotContextFuture是向SlotPool申请slot的过程
// 这个future在SlotPoolImpl的tryFulfillSlotRequestOrMakeAvailable方法中complete
FutureUtils.forward(
slotContextFuture.thenApply(
(SlotContext slotContext) -> {
// add the root node to the set of resolved root nodes once the SlotContext future has
// been completed and we know the slot's TaskManagerLocation
// 此时slot已经分配完毕,将该slot从unresolvedRootSlots集合移除
// 存入到resolvedRootSlots集合中
tryMarkSlotAsResolved(slotRequestId, slotContext);
return slotContext;
}),
slotContextFutureAfterRootSlotResolution);
return rootMultiTaskSlot;
}
SlotPool
SlotPool用于缓存slot。它接收ExecutionGraph发起的slot申请,为其分配执行任务所需的slot。如果SlotPool无法处理slot请求,他会尝试去连接ResourceManager获取新的slot。如果ResourceManager目前状态不可用,被ResourceManager拒绝或者是请求超时,则slot申请失败。SlotPool缓存了一部分slot,在ResourceManager不可用的时候,SlotPool仍然可以提供已注册的空闲slot。这些Slot只会在它们不再被使用的时候释放掉。比如说作业在运行但仍有空闲slot这种情况。
启动方法
SlotPool在JobMaster的startJobMasterServices中启动。该方法中注册了两个周期任务:检测空闲的slot和批量检测超时的slot
public void start(
@Nonnull JobMasterId jobMasterId,
@Nonnull String newJobManagerAddress,
@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
this.jobMasterId = jobMasterId;
this.jobManagerAddress = newJobManagerAddress;
this.componentMainThreadExecutor = componentMainThreadExecutor;
scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);
if (log.isDebugEnabled()) {
scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}