Slot 概念

Flink中的Slot是一组资源的集合,包含CPU核心数task堆内存task对外内存管理内存网络内存。同时slot也是Flink的资源分配单位
一个TaskManager中包含一个或者多个Slot。根据slot共享配置,一个slot中可同时运行多个task。这些task以工作线程的形式存在于slot中
TaskManager,Slot,Task和并行度parallelism的关系如下图所示(引用官网的图):
09-Flink 源码之 Slot - 图1

Slot 相关的一些类

SchedulerImpl

SchedulerImpl负责为Execution节点的任务执行分配slot
在后面的分析中涉及到的SchedulerImpl两个最重要的方法为allocateSlotallocateBatchSlot。这两个方法的逻辑基本相同,只是前一个方法参数中多了分配slot超时时间。
具体分配slot的流程较为复杂,在后面分析slot申请流程的时候再讲解。

SlotSharingManager

SlotSharingManager负责Slot共享。Slot共享指的是不同的task在同一个slot中运行。
SlotSharingManager维护了一个slot层级结构:其中根节点层级结构的中间节点为MultiTaskSlot。MultiTaskSlot可从属于另一个MultiTaskSlot,同时它又包含多个MultiTaskSlot或SingleTaskSlot,这样就形成了层级结构。SingleTaskSlot是slot层级结构中的最底层节点只能拥有一个parent作为它的父节点
Slot共享正是通过这种层级结构体现出来的。一个Slot被多个task共享,以Slot层级结构表示就是一个MultiTaskSlot包含多个SingleTaskSlot。
下面我们分析下几个重要的方法。

createRootSlot

创建一个根节点slot,该Slot的类型为MultiTaskSlot

  1. @Nonnull
  2. MultiTaskSlot createRootSlot(
  3. SlotRequestId slotRequestId,
  4. CompletableFuture<? extends SlotContext> slotContextFuture,
  5. SlotRequestId allocatedSlotRequestId) {
  6. LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId);
  7. final CompletableFuture<SlotContext> slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();
  8. // 创建一个根节点
  9. // 这个方法同时将创建出的MultiTaskSlot存入到allTaskSlots和unresolvedRootSlots集合中
  10. final MultiTaskSlot rootMultiTaskSlot = createAndRegisterRootSlot(
  11. slotRequestId,
  12. allocatedSlotRequestId,
  13. slotContextFutureAfterRootSlotResolution);
  14. // 当slotContextFuture完成后执行
  15. // slotContextFuture是向SlotPool申请slot的过程
  16. // 这个future在SlotPoolImpl的tryFulfillSlotRequestOrMakeAvailable方法中complete
  17. FutureUtils.forward(
  18. slotContextFuture.thenApply(
  19. (SlotContext slotContext) -> {
  20. // add the root node to the set of resolved root nodes once the SlotContext future has
  21. // been completed and we know the slot's TaskManagerLocation
  22. // 此时slot已经分配完毕,将该slot从unresolvedRootSlots集合移除
  23. // 存入到resolvedRootSlots集合中
  24. tryMarkSlotAsResolved(slotRequestId, slotContext);
  25. return slotContext;
  26. }),
  27. slotContextFutureAfterRootSlotResolution);
  28. return rootMultiTaskSlot;
  29. }

SlotPool

SlotPool用于缓存slot。它接收ExecutionGraph发起的slot申请,为其分配执行任务所需的slot。如果SlotPool无法处理slot请求,他会尝试去连接ResourceManager获取新的slot。如果ResourceManager目前状态不可用,被ResourceManager拒绝或者是请求超时,则slot申请失败。SlotPool缓存了一部分slot,在ResourceManager不可用的时候,SlotPool仍然可以提供已注册的空闲slot。这些Slot只会在它们不再被使用的时候释放掉。比如说作业在运行但仍有空闲slot这种情况。

启动方法

SlotPoolJobMasterstartJobMasterServices中启动。该方法中注册了两个周期任务:检测空闲的slot和批量检测超时的slot

  1. public void start(
  2. @Nonnull JobMasterId jobMasterId,
  3. @Nonnull String newJobManagerAddress,
  4. @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
  5. this.jobMasterId = jobMasterId;
  6. this.jobManagerAddress = newJobManagerAddress;
  7. this.componentMainThreadExecutor = componentMainThreadExecutor;
  8. scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
  9. scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);
  10. if (log.isDebugEnabled()) {
  11. scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
  12. }
  13. }