• Task Slot 的基本概念">Task Slot 的基本概念
  • TaskExecutor 中 Slot 的管理">TaskExecutor 中 Slot 的管理
    • TaskSlot">TaskSlot
  • 小结">小结
  • 参考">参考

    在 Flink 中,计算资源的是以 Slot 作为基本单位进行分配的。本文将对 Flink 中计算资源的管理机制加以分析。

    Task Slot 的基本概念

    我们在前面的文章中了解了 Flink 集群的启动流程。在 Flink 集群中,每个 TaskManager 都是一个单独的 JVM 进程(非 MiniCluster 模式),并且在一个 TaskManager 中可能运行多个子任务,这些子任务都在各自独立的线程中运行。为了控制一个 TaskManager 中可以运行的任务的数量,引入了 Task Slot 的概念。
    每一个 Task Slot 代表了 TaskManager 所拥有的计算资源的一个固定的子集。例如,一个拥有 3 个 slot 的 TaskManager,每个 slot 可以使用 ⁄ 的内存。这样,运行在不同 slot 中的子任务不会竞争内存资源。目前 Flink 还不支持 CPU 的隔离,只支持内存的隔离。
    通过调整 slot 的数量,可以控制子任务的隔离程度。例如,如果每个 TaskManager 只有 1 个 slot,么么就以为者每一组子任务都运行在单独的 JVM 进程中;每个 TaskManager 有多个 slot 的话,就意味着可以有更多的子任务运行在同一个 JVM 中。而在同一个 JVM 进程中的子任务,可以共享TCP连接和心跳消息,减少数据的网络传输,也能共享一些数据结构。一定程度上减少了每个子任务的消耗。
    默认情况下, Flink 允许子任务共享 slot ,前提是,它们属于同一个 Job 并且不是同一个 operator 的子任务。这样的结果是,在同一个 slit 中可能会运行 Job 的一个完整的 pipeline。允许 Slot 共享有两个主要的好处:

    1. Flink 计算一个 Job 所需的 slot 数量时,只需要确定其最大并行度即可,而不用考虑每一个任务的并行度;
    2. 能更好的利用资源。如果没有 slot 共享,那些资源需求不大的子任务和资源需求大的子任务会占用相同的资源,但一旦允许 slot 共享,它们就可能被分配到同一个 slot 中。

    Flink 通过 SlotSharingGroupCoLocationGroup 来确定在调度任务的时候如何进行资源共享,它们俩分别对应两种约束条件:

    • SlotSharingGroup: 相同 SlotSharingGroup 的不同 JobVertex 的子任务可以被分配在同一个 slot 中,但不保证能做到;
    • CoLocationGroup:相同 SlotSharingGroup 的不同 JobVertex ,它们的第 n 个子任务必须保证都在同一个 slot 中,这是一种强制性的约束。

      TaskExecutor 中 Slot 的管理

      TaskSlot

      首先,我们来看下在 TaskManager,也就是 TaskExecutor 中是如何管理 Slot 的。
      SlotID 是一个 slot 的唯一标识,它包含两个属性,其中 ResourceID 表明该 slot 所在的TaskExecutorslotNumber 是该 slot 在 TaskExecutor 中的索引位置。
      TaskSlot 是在 TaskExecutor 中对 slot 的抽象,可能处于 Free, Releasing, Allocated, Active 这四种状态之中。它的主要属性如下:

    | ``` class TaskSlot { / Index of the task slot. */ private final int index; / State of this slot. / private TaskSlotState state; /** Resource characteristics for this slot. / private final ResourceProfile resourceProfile; / Tasks running in this slot. */ //在一个 Slot 中可能执行多个 Task private final Map tasks; / Job id to which the slot has been allocated; null if not allocated. / private JobID jobId; /** Allocation id of this slot; null if not allocated. / private AllocationID allocationId; }

    1. |
    2. | --- |
    3. `TaskSlot` 提供了修改状态的方法,如 `allocate(JobID newJobId, AllocationID newAllocationId)` 方法会将 slot 标记为 `Allocated` 状态;`markFree()` 会将 slot 标记为 `Free` 状态,但只有在所有 Task 都被移除之后才能释放成功。 slot 在切换状态的时候会先判断它当前所处的状态。另外,可以通过 `add(Task task)` slot 中添加 Task,需要保证这些 Task 都来自同一个 Job
    4. <a name="taskslottable"></a>
    5. ### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#taskslottable)TaskSlotTable
    6. `TaskExecutor` 主要通过 `TaskSlotTable` 来管理它所拥有的所有 slot :
    7. |

    class TaskSlotTable implements TimeoutListener { / Timer service used to time out allocated slots. */ private final TimerService timerService; / The list of all task slots. / private final List taskSlots; /** Mapping from allocation id to task slot. / private final Map allocationIDTaskSlotMap; / Mapping from execution attempt id to task and task slot. */ private final Map taskSlotMappings; / Mapping from job id to allocated slots for a job. / private final Map> slotsPerJob; /** Interface for slot actions, such as freeing them or timing them out. / private SlotActions slotActions; }

    1. |
    2. | --- |
    3. 通过 `allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout)` 方法可以将指定 index slot 分配给 `AllocationID` 对应的请求,这个方法会调用 `TaskSlot.allocate(JobID newJobId, AllocationID newAllocationId)` 方法。这里需要注意的是,`allocateSlot` 方法的最后一个参数是一个超时时间。我们注意到,`TaskSlotTable` 有一个成员变量是 `TimerService<AllocationID> timerService`,通过 `timeService` 可以注册定时器,如果定时器在超时时间到达之前没有被取消,那么 `SlotAction.timeout` 方法就会被调用。如果被分配的 slot 关联的 slot 在超时之前没有被取消,那么该 slot 就会被重新释放,标记为 `Free` 状态。
    4. |

    class TaskSlotTable { public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) { checkInit(); TaskSlot taskSlot = taskSlots.get(index); boolean result = taskSlot.allocate(jobId, allocationId); if (result) { // update the allocation id to task slot map allocationIDTaskSlotMap.put(allocationId, taskSlot); // register a timeout for this slot since it’s in state allocated timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit()); // add this slot to the set of job slots Set slots = slotsPerJob.get(jobId); if (slots == null) { slots = new HashSet<>(4); slotsPerJob.put(jobId, slots); } slots.add(allocationId); } return result; } }

    1. |
    2. | --- |
    3. 如果 slot 被标记为 Active,则会取消在分配 slot 的时候关联的定时器:
    4. |

    class TaskSlotTable { public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException { checkInit(); TaskSlot taskSlot = getTaskSlot(allocationId); if (taskSlot != null) { if (taskSlot.markActive()) { // unregister a potential timeout LOG.info(“Activate slot {}.”, allocationId); timerService.unregisterTimeout(allocationId); return true; } else { return false; } } else { throw new SlotNotFoundException(allocationId); } } }

    1. |
    2. | --- |
    3. 通过 `createSlotReport` 可以获得一个 `SlotReport` 对象, `SlotReport` 中包含当前 `TaskExecutor` 中所有 slot 的状态以及它们的分配情况。
    4. <a name="taskexecutor"></a>
    5. ### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#taskexecutor)TaskExecutor
    6. `TaskExecutor` 需要向 `ResourceManager` 报告所有 slot 的状态,这样 `ResourceManager` 就知道了所有 slot 的分配情况。这主要发生在两种情况之下:
    7. - `TaskExecutor` 首次和 `ResourceManager` 建立连接的时候,需要发送 `SlotReport`
    8. - `TaskExecutor` `ResourceManager` 定期发送心跳信息,心跳包中包含 `SlotReport`
    9. 我们看下相关的代码逻辑:
    10. |

    class TaskExecutor { private void establishResourceManagerConnection( ResourceManagerGateway resourceManagerGateway, ResourceID resourceManagerResourceId, InstanceID taskExecutorRegistrationId, ClusterInformation clusterInformation) { //首次建立连接,向 RM 报告 slot 信息 final CompletableFuture slotReportResponseFuture = resourceManagerGateway.sendSlotReport( getResourceID(), taskExecutorRegistrationId, taskSlotTable.createSlotReport(getResourceID()), taskManagerConfiguration.getTimeout()); //……… } private class ResourceManagerHeartbeatListener implements HeartbeatListener { //心跳信息 @Override public CompletableFuture retrievePayload(ResourceID resourceID) { return callAsync( () -> taskSlotTable.createSlotReport(getResourceID()), taskManagerConfiguration.getTimeout()); } } }

    1. |
    2. | --- |
    3. `ResourceManager` 通过 `TaskExecutor.requestSlot` 方法要求 `TaskExecutor` 分配 slot,由于 `ResourceManager` 知道所有 slot 的当前状况,因此分配请求会精确到具体的 SlotID
    4. |

    class TaskExecutor { @Override public CompletableFuture requestSlot( final SlotID slotId, final JobID jobId, final AllocationID allocationId, final String targetAddress, final ResourceManagerId resourceManagerId, final Time timeout) { try { //判断发送请求的 RM 是否是当前 TaskExecutor 注册的 if (!isConnectedToResourceManager(resourceManagerId)) { final String message = String.format(“TaskManager is not connected to the resource manager %s.”, resourceManagerId); log.debug(message); throw new TaskManagerException(message); } if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) { //如果 slot 是 Free 状态,则分配 slot if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) { log.info(“Allocated slot for {}.”, allocationId); } else { log.info(“Could not allocate slot for {}.”, allocationId); throw new SlotAllocationException(“Could not allocate slot.”); } } else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) { //如果 slot 已经被分配了,则抛出异常 final String message = “The slot “ + slotId + “ has already been allocated for a different job.”; log.info(message); final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()); throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID)); } //将分配的 slot 提供给发送请求的 JobManager if (jobManagerTable.contains(jobId)) { //如果和对应的 JobManager 已经建立了连接,则向 JobManager 提供 slot offerSlotsToJobManager(jobId); } else { //否则,先和JobManager 建立连接,连接建立后会调用 offerSlotsToJobManager(jobId) 方法 try { jobLeaderService.addJob(jobId, targetAddress); } catch (Exception e) { // free the allocated slot try { taskSlotTable.freeSlot(allocationId); } catch (SlotNotFoundException slotNotFoundException) { // slot no longer existent, this should actually never happen, because we’ve // just allocated the slot. So let’s fail hard in this case! onFatalError(slotNotFoundException); } // release local state under the allocation id. localStateStoresManager.releaseLocalStateForAllocationId(allocationId); // sanity check if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) { onFatalError(new Exception(“Could not free slot “ + slotId)); } throw new SlotAllocationException(“Could not add job to job leader service.”, e); } } } catch (TaskManagerException taskManagerException) { return FutureUtils.completedExceptionally(taskManagerException); } return CompletableFuture.completedFuture(Acknowledge.get()); } }

    1. |
    2. | --- |
    3. Slot 被分配给之后,TaskExecutor 需要将对应的 slot 提供给 JobManager,而这正是通过 `offerSlotsToJobManager(jobId)` 方法来实现的:
    4. |

    class TaskExecutor { private void offerSlotsToJobManager(final JobID jobId) { final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId); if (jobManagerConnection == null) { log.debug(“There is no job manager connection to the leader of job {}.”, jobId); } else { if (taskSlotTable.hasAllocatedSlots(jobId)) { log.info(“Offer reserved slots to the leader of job {}.”, jobId); final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway(); //获取分配给当前 Job 的 slot,这里只会取得状态为 allocated 的 slot final Iterator reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId); final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId(); final Collection reservedSlots = new HashSet<>(2); while (reservedSlotsIterator.hasNext()) { SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer(); reservedSlots.add(offer); } //通过 RPC 调用,将slot提供给 JobMaster CompletableFuture> acceptedSlotsFuture = jobMasterGateway.offerSlots( getResourceID(), reservedSlots, taskManagerConfiguration.getTimeout()); acceptedSlotsFuture.whenCompleteAsync( (Iterable acceptedSlots, Throwable throwable) -> { if (throwable != null) { //超时,则重试 if (throwable instanceof TimeoutException) { log.info(“Slot offering to JobManager did not finish in time. Retrying the slot offering.”); // We ran into a timeout. Try again. offerSlotsToJobManager(jobId); } else { log.warn(“Slot offering to JobManager failed. Freeing the slots “ + “and returning them to the ResourceManager.”, throwable); // 发生异常,则释放所有的 slot for (SlotOffer reservedSlot: reservedSlots) { freeSlotInternal(reservedSlot.getAllocationId(), throwable); } } } else { //调用成功 // check if the response is still valid if (isJobManagerConnectionValid(jobId, jobMasterId)) { // mark accepted slots active //对于被 JobMaster 确认接受的 slot, 标记为 Active 状态 for (SlotOffer acceptedSlot : acceptedSlots) { try { if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) { // the slot is either free or releasing at the moment final String message = “Could not mark slot “ + jobId + “ active.”; log.debug(message); jobMasterGateway.failSlot( getResourceID(), acceptedSlot.getAllocationId(), new FlinkException(message)); } } catch (SlotNotFoundException e) { final String message = “Could not mark slot “ + jobId + “ active.”; jobMasterGateway.failSlot( getResourceID(), acceptedSlot.getAllocationId(), new FlinkException(message)); } reservedSlots.remove(acceptedSlot); } final Exception e = new Exception(“The slot was rejected by the JobManager.”); //释放剩余没有被接受的 slot for (SlotOffer rejectedSlot : reservedSlots) { freeSlotInternal(rejectedSlot.getAllocationId(), e); } } else { // discard the response since there is a new leader for the job log.debug(“Discard offer slot response since there is a new leader “ + “for the job {}.”, jobId); } } }, getMainThreadExecutor()); } else { log.debug(“There are no unassigned slots for the job {}.”, jobId); } } } }

    1. |
    2. | --- |
    3. 通过 `freeSlot(AllocationID, Throwable)` 方法,可以请求 `TaskExecutor` 释放和 `AllocationID` 关联的 slot
    4. |

    class TaskExecutor { @Override public CompletableFuture freeSlot(AllocationID allocationId, Throwable cause, Time timeout) { freeSlotInternal(allocationId, cause); return CompletableFuture.completedFuture(Acknowledge.get()); } private void freeSlotInternal(AllocationID allocationId, Throwable cause) { checkNotNull(allocationId); try { final JobID jobId = taskSlotTable.getOwningJob(allocationId); //尝试释放 allocationId 绑定的 slot final int slotIndex = taskSlotTable.freeSlot(allocationId, cause); if (slotIndex != -1) { //成功释放 slot if (isConnectedToResourceManager()) { //告知 ResourceManager 当前 slot 可用 // the slot was freed. Tell the RM about it ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway(); resourceManagerGateway.notifySlotAvailable( establishedResourceManagerConnection.getTaskExecutorRegistrationId(), new SlotID(getResourceID(), slotIndex), allocationId); } if (jobId != null) { // 如果和 allocationID 绑定的 Job 已经没有分配的 slot 了,那么可以断开和 JobMaster 的连接了 // check whether we still have allocated slots for the same job if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty()) { // we can remove the job from the job leader service try { jobLeaderService.removeJob(jobId); } catch (Exception e) { log.info(“Could not remove job {} from JobLeaderService.”, jobId, e); } closeJobManagerConnection( jobId, new FlinkException(“TaskExecutor “ + getAddress() + “ has no more allocated slots for job “ + jobId + ‘.’)); } } } } catch (SlotNotFoundException e) { log.debug(“Could not free slot for allocation id {}.”, allocationId, e); } localStateStoresManager.releaseLocalStateForAllocationId(allocationId); } }

    1. |
    2. | --- |
    3. <a name="51dd8bac"></a>
    4. ## [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#resourcemanage-%E4%B8%AD-slot-%E7%9A%84%E7%AE%A1%E7%90%86)ResourceManage 中 Slot 的管理
    5. 上一节分析 `TaskExecutor` slot 的管理,仅仅局限于单个 `TaskExecutor`,而 `ResoureManager` 则需要对所有 `TaskExecutor` slot 进行管理。所有的 JobManager 都是通过 `ResourceManager` 进行资源的申请,`ResourceManager` 则根据当前的集群的计算资源使用情况将请求“转发”给 `TaskExecutor`
    6. <a name="slotmanager"></a>
    7. ### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#slotmanager)SlotManager
    8. `ResourceManager` 借助 `SlotManager` 来管理 slot `SlotManager` 维护了所有已经注册的 `TaskExecutor` 的所有 slot 的状态,它们的分配情况。`SlotManager` 还维护了所有处于等待状态的 slot 请求。每当有一个新的 slot 注册或者一个已经分配的 slot 被释放的时候,`SlotManager` 会试图去满足处于等待状态 slot request。如果可用的 slot 不足以满足要求,`SlotManager` 会通过 `ResourceActions#allocateResource(ResourceProfile)` 来告知 `ResourceManager`, `ResourceManager` 可能会尝试启动新的 `TaskExecutor` (如 Yarn 模式下)。<br />此外,长时间处于空闲状态的 `TaskExecutor` 或者长时间没有被满足的 pending slot request,会触发超时机制进行处理。
    9. <a name="b35fb898"></a>
    10. #### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#slot-%E6%B3%A8%E5%86%8C)Slot 注册
    11. 首先,来看下 `SlotManager` 中一些比较重要的成员变量,主要是 slot 的状态和 slot request 的状态:
    12. |

    class SlotManager { / Map for all registered slots. */ private final HashMap slots; / Index of all currently free slots. / private final LinkedHashMap freeSlots; /** All currently registered task managers. / private final HashMap taskManagerRegistrations; / Map of fulfilled and active allocations for request deduplication purposes. */ private final HashMap fulfilledSlotRequests; / Map of pending/unfulfilled slot allocation requests. / private final HashMap pendingSlotRequests; // 资源不足的时候会通过 ResourceActions#allocateResource(ResourceProfile) 申请新的资源(可能启动新的 TaskManager,也可能什么也不做), // 这些新申请的资源会被封装为 PendingTaskManagerSlot private final HashMap pendingSlots; /** ResourceManager’s id. / private ResourceManagerId resourceManagerId; /* Callbacks for resource (de-)allocations. / private ResourceActions resourceActions; }

    1. |
    2. | --- |
    3. 当一个新的 `TaskManager` 注册的时候,`registerTaskManager` 被调用:
    4. |

    class SlotManager { public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { checkInit(); LOG.debug(“Registering TaskManager {} under {} at the SlotManager.”, taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID()); // we identify task managers by their instance id if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) { reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport); } else { // first register the TaskManager ArrayList reportedSlots = new ArrayList<>(); for (SlotStatus slotStatus : initialSlotReport) { reportedSlots.add(slotStatus.getSlotID()); } TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration( taskExecutorConnection, reportedSlots); taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration); // 依次注册所有的 slot // next register the new slots for (SlotStatus slotStatus : initialSlotReport) { registerSlot( slotStatus.getSlotID(), slotStatus.getAllocationID(), slotStatus.getJobID(), slotStatus.getResourceProfile(), taskExecutorConnection); } } } //注册一个slot private void registerSlot( SlotID slotId, AllocationID allocationId, JobID jobId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) { if (slots.containsKey(slotId)) { // remove the old slot first removeSlot(slotId); } //创建一个 TaskManagerSlot 对象,并加入 slots 中 final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection); final PendingTaskManagerSlot pendingTaskManagerSlot; if (allocationId == null) { //这个 slot 还没有被分配,则找到和当前 slot 的计算资源相匹配的 PendingTaskManagerSlot pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile); } else { //这个 slot 已经被分配了 pendingTaskManagerSlot = null; } if (pendingTaskManagerSlot == null) { //两种可能: 1)slot已经被分配了 2)没有匹配的 PendingTaskManagerSlot updateSlot(slotId, allocationId, jobId); } else { // 新注册的 slot 能够满足 PendingTaskManagerSlot 的要求 pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId()); final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest(); // PendingTaskManagerSlot 可能有关联的 PedningSlotRequest if (assignedPendingSlotRequest == null) { //没有关联的 PedningSlotRequest,则将 slot 是 Free 状态 handleFreeSlot(slot); } else { //有关联的 PedningSlotRequest,则这个 request 可以被满足,分配 slot assignedPendingSlotRequest.unassignPendingTaskManagerSlot(); allocateSlot(slot, assignedPendingSlotRequest); } } } private void handleFreeSlot(TaskManagerSlot freeSlot) { Preconditions.checkState(freeSlot.getState() == TaskManagerSlot.State.FREE); //先查找是否有能够满足的 PendingSlotRequest PendingSlotRequest pendingSlotRequest = findMatchingRequest(freeSlot.getResourceProfile()); if (null != pendingSlotRequest) { //如果有匹配的 PendingSlotRequest,则分配slot allocateSlot(freeSlot, pendingSlotRequest); } else { freeSlots.put(freeSlot.getSlotId(), freeSlot); } } }

    1. |
    2. | --- |
    3. 除此之外,`TaskExecutor` 也会定期通过心跳向 `ResourceManager` 报告 slot 的状态。 `reportSlotStatus` 方法中会更新 slot 的状态。
    4. <a name="96352068"></a>
    5. #### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#%E8%AF%B7%E6%B1%82-slot)请求 Slot
    6. `ResourceManager` 通过 `SlotManager` `registerSlotRequest(SlotRequest slotRequest)` 方法请求 slot`SlotRequest` 中封装了请求的 `JobId`, `AllocationID` 以及请求的资源描述 `ResourceProfile``SlotManager` 会将 slot request 进一步封装为 `PendingSlotRequest`, 意为一个尚未被满足要求的 slot request
    7. |

    class SlotManager { public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { checkInit(); if (checkDuplicateRequest(slotRequest.getAllocationId())) { LOG.debug(“Ignoring a duplicate slot request with allocation id {}.”, slotRequest.getAllocationId()); return false; } else { //将请求封装为 PendingSlotRequest PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); try { //执行请求分配slot的逻辑 internalRequestSlot(pendingSlotRequest); } catch (ResourceManagerException e) { // requesting the slot failed —> remove pending slot request pendingSlotRequests.remove(slotRequest.getAllocationId()); throw new SlotManagerException(“Could not fulfill slot request “ + slotRequest.getAllocationId() + ‘.’, e); } return true; } } private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile(); //首先从 FREE 状态的已注册的 slot 中选择符合要求的 slot TaskManagerSlot taskManagerSlot = findMatchingSlot(resourceProfile); if (taskManagerSlot != null) { //找到了符合条件的slot,分配 allocateSlot(taskManagerSlot, pendingSlotRequest); } else { //从 PendingTaskManagerSlot 中选择 Optional pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile); //如果连 PendingTaskManagerSlot 中都没有 if (!pendingTaskManagerSlotOptional.isPresent()) { //请求 ResourceManager 分配资源,通过 ResourceActions#allocateResource(ResourceProfile) 回调进行 pendingTaskManagerSlotOptional = allocateResource(resourceProfile); } //将 PendingTaskManagerSlot 指派给 PendingSlotRequest pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot)); } } }

    1. |
    2. | --- |
    3. 那么具体的 `allocatSlot` 的逻辑是怎么样的?在上一节我们提到过,通过 RPC 调用可以请求 `TaskExecutor` 分配 slot,这个 RPC 调用就是在 `SlotManager#allocateSlot` 中发生的:
    4. |

    class SlotManager { private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) { Preconditions.checkState(taskManagerSlot.getState() == TaskManagerSlot.State.FREE); TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection(); TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway(); final CompletableFuture completableFuture = new CompletableFuture<>(); final AllocationID allocationId = pendingSlotRequest.getAllocationId(); final SlotID slotId = taskManagerSlot.getSlotId(); final InstanceID instanceID = taskManagerSlot.getInstanceId(); //taskManagerSlot 状态变为 PENDING taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest); pendingSlotRequest.setRequestFuture(completableFuture); //如果有 PendingTaskManager 指派给当前 pendingSlotRequest,要先解除关联 returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest); TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID); if (taskManagerRegistration == null) { throw new IllegalStateException(“Could not find a registered task manager for instance id “ + instanceID + ‘.’); } taskManagerRegistration.markUsed(); // RPC call to the task manager // 通过 RPC 调用向 TaskExecutor 请求 slot CompletableFuture requestFuture = gateway.requestSlot( slotId, pendingSlotRequest.getJobId(), allocationId, pendingSlotRequest.getTargetAddress(), resourceManagerId, taskManagerRequestTimeout); //RPC调用的请求完成 requestFuture.whenComplete( (Acknowledge acknowledge, Throwable throwable) -> { if (acknowledge != null) { completableFuture.complete(acknowledge); } else { completableFuture.completeExceptionally(throwable); } }); //PendingSlotRequest 请求完成的回调函数 //PendingSlotRequest 请求完成可能是由于上面 RPC 调用完成,也可能是因为 PendingSlotRequest 被取消 completableFuture.whenCompleteAsync( (Acknowledge acknowledge, Throwable throwable) -> { try { if (acknowledge != null) { //如果请求成功,则取消 pendingSlotRequest,并更新 slot 状态 PENDING -> ALLOCATED updateSlot(slotId, allocationId, pendingSlotRequest.getJobId()); } else { if (throwable instanceof SlotOccupiedException) { //这个 slot 已经被占用了,更新状态 SlotOccupiedException exception = (SlotOccupiedException) throwable; updateSlot(slotId, exception.getAllocationId(), exception.getJobId()); } else { //请求失败,将 pendingSlotRequest 从 TaskManagerSlot 中移除 removeSlotRequestFromSlot(slotId, allocationId); } if (!(throwable instanceof CancellationException)) { //slot request 请求失败,会进行重试 handleFailedSlotRequest(slotId, allocationId, throwable); } else { //主动取消 LOG.debug(“Slot allocation request {} has been cancelled.”, allocationId, throwable); } } } catch (Exception e) { LOG.error(“Error while completing the slot allocation.”, e); } }, mainThreadExecutor); } }

    1. |
    2. | --- |
    3. <a name="85fdd9ff"></a>
    4. #### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#%E5%8F%96%E6%B6%88-slot-%E8%AF%B7%E6%B1%82)取消 slot 请求
    5. 通过 `unregisterSlotRequest` 可以取消一个 slot request
    6. |

    class SlotManager { public boolean unregisterSlotRequest(AllocationID allocationId) { checkInit(); //从 pendingSlotRequests 中移除 PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); if (null != pendingSlotRequest) { LOG.debug(“Cancel slot request {}.”, allocationId); //取消请求 cancelPendingSlotRequest(pendingSlotRequest); return true; } else { LOG.debug(“No pending slot request with allocation id {} found. Ignoring unregistration request.”, allocationId); return false; } } private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) { CompletableFuture request = pendingSlotRequest.getRequestFuture(); returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest); if (null != request) { request.cancel(false); } } }

    1. |
    2. | --- |
    3. <a name="ba79c7ad"></a>
    4. #### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#%E8%B6%85%E6%97%B6%E8%AE%BE%E7%BD%AE)超时设置
    5. `SlotManager` 在启动的时候会启动两个超时检测任务,一个是对 slot request 超时的检测,一个是对 TaskManager 长时间处于空闲状态的检测:
    6. |

    class SlotManager { public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) { LOG.info(“Starting the SlotManager.”); this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); resourceActions = Preconditions.checkNotNull(newResourceActions); started = true; //检查 TaskExecutor 是否长时间处于 idle 状态 taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkTaskManagerTimeouts()), 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); //检查 slot request 是否超时 slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkSlotRequestTimeouts()), 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); } }

    1. |
    2. | --- |
    3. 一旦 `TaskExecutor` 长时间处于空闲状态,则会通过 `ResourceActions#releaseResource()` 回调函数释放资源:
    4. |

    class SlotManager { private void releaseTaskExecutor(InstanceID timedOutTaskManagerId) { final FlinkException cause = new FlinkException(“TaskExecutor exceeded the idle timeout.”); LOG.debug(“Release TaskExecutor {} because it exceeded the idle timeout.”, timedOutTaskManagerId); resourceActions.releaseResource(timedOutTaskManagerId, cause); } }

    1. |
    2. | --- |
    3. 如果一个 slot request 超时,则会取消 PendingSlotRequest,并通过 `ResourceActions#notifyAllocationFailure()` 告知 `ResourceManager`:
    4. |

    class SlotManager { private void checkSlotRequestTimeouts() { if (!pendingSlotRequests.isEmpty()) { long currentTime = System.currentTimeMillis(); Iterator> slotRequestIterator = pendingSlotRequests.entrySet().iterator(); while (slotRequestIterator.hasNext()) { PendingSlotRequest slotRequest = slotRequestIterator.next().getValue(); if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) { slotRequestIterator.remove(); if (slotRequest.isAssigned()) { cancelPendingSlotRequest(slotRequest); } resourceActions.notifyAllocationFailure( slotRequest.getJobId(), slotRequest.getAllocationId(), new TimeoutException(“The allocation could not be fulfilled in time.”)); } } } } }

    1. |
    2. | --- |
    3. <a name="resourcemanager"></a>
    4. ### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#resourcemanager)ResourceManager
    5. 我们已经知道了, `ResoureManager` 实际上是通过 `SlotManager` 来管理 `TaskExecutor` 所注册的所有 slot,但 `ResourceManagee` 自身需要对外提供 RPC 调用方法,从而将 slot 管理相关的方法暴露给 `JobMaster` `TaskExecutor`
    6. <a name="416d4578"></a>
    7. #### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#rpc-%E6%8E%A5%E5%8F%A3)RPC 接口
    8. 首先,我们来看下 `ResourceManager` 提供了哪些 slot 管理相关的 RPC 方法:
    9. |

    interface ResouceManagerGateway { CompletableFuture requestSlot( JobMasterId jobMasterId, SlotRequest slotRequest, @RpcTimeout Time timeout); void cancelSlotRequest(AllocationID allocationID); CompletableFuture sendSlotReport( ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, @RpcTimeout Time timeout); void notifySlotAvailable( InstanceID instanceId, SlotID slotID, AllocationID oldAllocationId); }

    1. |
    2. | --- |
    3. 上述的四个方法,`requestSlot` `cancelSlotRequest` 主要供 `JobMaster` 进行调用,而 `sendSlotReport` `notifySlotAvailable` 则主要供 `TaskExecutor` 调用。`ResourceManager` 在接收到上述 RPC 调用后,会通过 `SlotManager` 完成具体的工作。
    4. <a name="74e0a5d4"></a>
    5. #### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#%E5%8A%A8%E6%80%81%E8%B5%84%E6%BA%90%E7%AE%A1%E7%90%86)动态资源管理
    6. [FLIP-6](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077) 引入的一个重要特性是,`ResourceManager` 支持动态管理 `TaskExecutor` 计算资源,从而可以更好地和 Yarn、 Mesos、Kubernetes 等框架进行集成,动态管理计算资源。下面我们来介绍下这个特性是怎么实现的。<br />前面在介绍 [SlotManager](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#SlotManager) 的时候提到:如果当前注册的 slot 不能满足 slot request 的要求,那么 `SlotManager` 会通过 `ResourceActions#allocateResource` 回调告知 `ResourceManager`;当一个 `SlotManager` 检查到一个 `TaskExecutor` 长时间处于 Idle 状态时,也会通过 `ResourceActions#releaseResource` 回调告知 `ResourceManager`。通过这两个回调, `ResourceManager` 就可以动态申请资源及释放资源:
    7. |

    class ResourceManager { private class ResourceActionsImpl implements ResourceActions { @Override public void releaseResource(InstanceID instanceId, Exception cause) { validateRunsInMainThread(); //释放资源 ResourceManager.this.releaseResource(instanceId, cause); } @Override public Collection allocateResource(ResourceProfile resourceProfile) { validateRunsInMainThread(); //申请新的资源,具体行为和不同的 ResourceManager 的实现有关。其返回的列表相当于是承诺即将分配的资源 return startNewWorker(resourceProfile); } @Override public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) { validateRunsInMainThread(); JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId); if (jobManagerRegistration != null) { jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId, cause); } } } protected void releaseResource(InstanceID instanceId, Exception cause) { WorkerType worker = null; // TODO: Improve performance by having an index on the instanceId for (Map.Entry> entry : taskExecutors.entrySet()) { if (entry.getValue().getInstanceID().equals(instanceId)) { worker = entry.getValue().getWorker(); break; } } if (worker != null) { //停止对应的worker,具体行为和不同的 ResourceManager 的实现有关 if (stopWorker(worker)) { closeTaskManagerConnection(worker.getResourceID(), cause); } else { log.debug(“Worker {} could not be stopped.”, worker.getResourceID()); } } else { // unregister in order to clean up potential left over state slotManager.unregisterTaskManager(instanceId); } } public abstract Collection startNewWorker(ResourceProfile resourceProfile); public abstract boolean stopWorker(WorkerType worker); }

    1. |
    2. | --- |
    3. `startNewWorker` `stopWorker` 这两个抽象方法是实现动态申请和释放资源的关键。对于 Standalone 模式而言,`TaskExecutor` 是固定的,不支持动态启动和释放;而对于在 Yarn 上运行的 Flink `YarnResourceManager` 中这两个方法的具体实现就涉及到启动新的 container 和释放已经申请的 container
    4. |

    class YarnResourceManager { @Override public Collection startNewWorker(ResourceProfile resourceProfile) { Preconditions.checkArgument( ResourceProfile.UNKNOWN.equals(resourceProfile), “The YarnResourceManager does not support custom ResourceProfiles yet. It assumes that all containers have the same resources.”); //申请 container requestYarnContainer(); return slotsPerWorker; } @Override public boolean stopWorker(final YarnWorkerNode workerNode) { final Container container = workerNode.getContainer(); log.info(“Stopping container {}.”, container.getId()); try { nodeManagerClient.stopContainer(container.getId(), container.getNodeId()); } catch (final Exception e) { log.warn(“Error while calling YARN Node Manager to stop container”, e); } //释放container resourceManagerClient.releaseAssignedContainer(container.getId()); workerNodeMap.remove(workerNode.getResourceID()); return true; } }

    1. |
    2. | --- |
    3. <a name="7a5b20c4"></a>
    4. ## [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#jobmanager-%E4%B8%AD-slot-%E7%9A%84%E7%AE%A1%E7%90%86)JobManager 中 Slot 的管理
    5. 相比于 `TaskExecutor` `ResourceManager`, `JobManager` 中资源管理的部分可能要相对更为复杂一下,这主要是由于 Flink 允许通过 `SlotSharingGroup` `CoLocationGroup` 约束使得多个子任务在相同的的 slot 中运行。在 `JobMaster` 中,主要通过 `SlotPool` `ResourceManager` `TaskExecutor` 进行通信,并管理分配给当前 `JobMaster` slot;而具体到当前 Job 的所有子任务的调度和资源分配,则主要依赖 `Scheduler` `SlotSharingManager`
    6. <a name="physicalslot-vs-logicalslot-vs-multitaskslot"></a>
    7. ### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#physicalslot-vs-logicalslot-vs-multitaskslot)PhysicalSlot vs. LogicalSlot vs. MultiTaskSlot
    8. 首先要区分一下 `PhysicalSlot` `LogicalSlot` 这两个概念:`PhysicalSlot` 表征的是物理意义上 `TaskExecutor` 上的一个 slot,而 `LogicalSlot` 表征逻辑上的一个 slot,一个 task 可以部署到一个 `LogicalSlot` 上,但它和物理上一个具体的 slot 并不是一一对应的。由于资源共享等机制的存在,多个 `LogicalSlot` 可能被映射到同一个 `PhysicalSlot` 上。<br />`PhysicalSlot` 接口唯一的实现类是 `AllocatedSlot`
    9. |

    public interface PhysicalSlot extends SlotContext { boolean tryAssignPayload(Payload payload); /**

    1. * Payload which can be assigned to an {@link AllocatedSlot}.
    2. */
    3. interface Payload {
    4. /**
    5. * Releases the payload
    6. *
    7. * @param cause of the payload release
    8. */
    9. void release(Throwable cause);
    10. }

    } class AllocatedSlot implements PhysicalSlot { / The ID under which the slot is allocated. Uniquely identifies the slot. */ private final AllocationID allocationId; / The location information of the TaskManager to which this slot belongs / private final TaskManagerLocation taskManagerLocation; /** The resource profile of the slot provides / private final ResourceProfile resourceProfile; / RPC gateway to call the TaskManager that holds this slot */ private final TaskManagerGateway taskManagerGateway; / The number of the slot on the TaskManager to which slot belongs. Purely informational. */ private final int physicalSlotNumber; private final AtomicReference payloadReference; }

    1. |
    2. | --- |
    3. `LogicSlot` 接口和它的实现类 `SingleLogicalSlot`:
    4. |

    public interface LogicalSlot { TaskManagerLocation getTaskManagerLocation(); TaskManagerGateway getTaskManagerGateway(); int getPhysicalSlotNumber(); AllocationID getAllocationId(); SlotRequestId getSlotRequestId(); Locality getLocality(); CompletableFuture<?> releaseSlot(@Nullable Throwable cause); @Nullable SlotSharingGroupId getSlotSharingGroupId(); boolean tryAssignPayload(Payload payload); Payload getPayload(); /**

    1. * Payload for a logical slot.
    2. */
    3. interface Payload {
    4. void fail(Throwable cause);
    5. CompletableFuture<?> getTerminalStateFuture();
    6. }

    } public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload { private final SlotRequestId slotRequestId; private final SlotContext slotContext; // null if the logical slot does not belong to a slot sharing group, otherwise non-null @Nullable private final SlotSharingGroupId slotSharingGroupId; // locality of this slot wrt the requested preferred locations private final Locality locality; // owner of this slot to which it is returned upon release private final SlotOwner slotOwner; private final CompletableFuture releaseFuture; private volatile State state; // LogicalSlot.Payload of this slot private volatile Payload payload; }

    1. |
    2. | --- |
    3. 注意,`SingleLogicalSlot` 实现了 `PhysicalSlot.Payload` 接口,就是说说 `SingleLogicalSlot` 可以作为 payload 被分配给 `PhysicalSlot`。类似地, `LogicalSlot` 同样规定了其所能承载的 payload , `LogicalSlot.Payload` 接口的实现类是 `Execution`,也就是需要被调度执行的一个 task。<br />同样需要关注一下 `AllocationID` `SlotRequestID` 的区别:`AllocationID` 是用来区分物理内存的分配,它总是和 `AllocatedSlot` 向关联的;而 `SlotRequestID` 是任务调度执行的时候请求 `LogicalSlot`,是和 `LogicalSlot` 关联的。<br />我们已经知道,为了实现 slot 资源的共享,我们需要把多个 `LogicalSlot` 映射到同一个 `PhysicalSlot` 上,那么这个映射是如何实现的呢?这里就需要引入 `PhysicalSlot.Payload` 接口的另一个实现:`SlotSharingManager` 的内部类 `SlotSharingManager.MultiTaskSlot`。<br />`MultiTaskSlot` `SingleTaskSlot` 的公共父类是 `TaskSlot`,通过构造一个由 `TaskSlot` 构成的树形结构来实现 slot 共享和 `CoLocationGroup` 的强制约束。`MultiTaskSlot` 对应树形结构的内部节点,它可以包含多个子节点(可以是`MultiTaskSlot`,也可以是 `SingleTaskSlot`);而 `SingleTaskSlot` 对应树形结构的叶子结点。<br />树的根节点是 `MultiTaskSlot`,根节点会被分配一个 `SlotContext``SlotContext` 代表了其所分配的 `TaskExecutor` 中的一个物理 slot,这棵树中所有的任务都会在同一个 slot 中运行。一个 `MultiTaskSlot` 可以包含多个叶子节点,只要用来区分这些叶子节点 `TaskSlot` `AbstractID` 不同即可(可能是 `JobVertexID`,也可能是 CoLocationGroup ID)。<br />先来看一下 `TaskSlot` 封装的两个 ID,其一是 `SlotRequestId`,其二是 `AbstractID`
    4. |

    class TaskSlot { // every TaskSlot has an associated slot request id private final SlotRequestId slotRequestId; // all task slots except for the root slots have a group id assigned // 除了 root 节点,每个节点都有一个 groupId 用来区分一个 TaskSlot。可能是 JobVertexID,也可能是 CoLocationGroup 的 ID @Nullable private final AbstractID groupId; public boolean contains(AbstractID groupId) { return Objects.equals(this.groupId, groupId); } public abstract void release(Throwable cause); }

    1. |
    2. | --- |
    3. `MultiTaskSlot` 继承了 `TaskSlot``MultiTaskSlot` 可以有多个子节点。`MultiTaskSlot` 可以作为根节点,也可以作为内部节点。`MultiTaskSlot` 也实现了 `PhysicalSlot.Payload` 接口,可以分配给 `PhysicalSlot`(在作为根节点的情况下):
    4. |

    public final class MultiTaskSlot extends TaskSlot implements PhysicalSlot.Payload { private final Map children; // the root node has its parent set to null @Nullable private final MultiTaskSlot parent; // underlying allocated slot private final CompletableFuture<? extends SlotContext> slotContextFuture; // slot request id of the allocated slot @Nullable private final SlotRequestId allocatedSlotRequestId; }

    1. |
    2. | --- |
    3. `SingleTaskSlot` 只能作为叶子节点,它拥有一个 `LogicalSlot`,后续可以用来分配具体的 task
    4. |

    public final class SingleTaskSlot extends TaskSlot { private final MultiTaskSlot parent; // future containing a LogicalSlot which is completed once the underlying SlotContext future is completed private final CompletableFuture singleLogicalSlotFuture; private SingleTaskSlot( SlotRequestId slotRequestId, AbstractID groupId, MultiTaskSlot parent, Locality locality) { super(slotRequestId, groupId); this.parent = Preconditions.checkNotNull(parent); Preconditions.checkNotNull(locality); singleLogicalSlotFuture = parent.getSlotContextFuture() .thenApply( (SlotContext slotContext) -> { //在父节点被分配了 PhysicalSlot 后,创建 SingleLogicalSlot LOG.trace(“Fulfill single task slot [{}] with slot [{}].”, slotRequestId, slotContext.getAllocationId()); return new SingleLogicalSlot( slotRequestId, slotContext, slotSharingGroupId, locality, slotOwner); }); } }

    1. |
    2. | --- |
    3. 更具体一点地说,对于普通的 `SlotShargingGroup` 的约束,形成的树形结构是: `MultiTaskSlot` 作为根节点,多个 `SingleTaskSlot` 作为叶子节点,这些叶子节点分别代表不同的任务,用来区分它们的 `JobVertextID` 不同。对于 `CoLocationGroup` 强制约束,会在 `MultiTaskSlot` 根节点的下一级创建一个 `MultiTaskSlot` 节点(用 CoLocationGroup ID 来区分,同一个 `CoLocationGroup` 约束下的子任务进一步作为第二层 `MultiTaskSlot` 的叶子节点。
    4. <a name="slotpool"></a>
    5. ### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#slotpool)SlotPool
    6. JobManager 使用 `SlotPool` 来向 `ResourceManager` 申请 slot 并管理所有分配给该 JobManager slots。这一节所说的 slot 指的都是 physical slot。<br />`SlotPool` 接口的唯一实现类是 `SlotPoolImpl`,先来看一下几个关键的成员变量:
    7. |

    class SlotPoolImpl implements SlotPool { / The book-keeping of all allocated slots. */ //所有分配给当前 JobManager 的 slots private final AllocatedSlots allocatedSlots; / The book-keeping of all available slots. / //所有可用的 slots(已经分配给该 JobManager,但还没有装载 payload) private final AvailableSlots availableSlots; /** All pending requests waiting for slots. / //所有处于等待状态的slot request(已经发送请求给 ResourceManager) private final DualKeyMap pendingRequests; /* The requests that are waiting for the resource manager to be connected. / //处于等待状态的 slot request (还没有发送请求给 ResourceManager,此时没有和 ResourceManager 建立连接) private final HashMap waitingForResourceManager; }

    1. |
    2. | --- |
    3. 每一个分配给 `SlotPool` slot 都通过 `AllocationID` 进行唯一区分。 `getAvailableSlotsInformation` 方法可以获取当前可用的 slots(还没有 payload),而后可以通过 `allocateAvailableSlot` 将特定 `AllocationID` 关联的 `AlocatedSlot` 分配给指定的 `SlotRequestID` 对应的请求:
    4. |

    class SlotPoolImpl implements SlotPool { //列出当前可用的 slot @Override public Collection getAvailableSlotsInformation() { return availableSlots.listSlotInfo(); } //将 allocationID 关联的 slot 分配给 slotRequestId 对应的请求 @Override public Optional allocateAvailableSlot( @Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID) { componentMainThreadExecutor.assertRunningInMainThread(); //从 availableSlots 中移除 AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID); if (allocatedSlot != null) { //加入已分配的映射关系中 allocatedSlots.add(slotRequestId, allocatedSlot); return Optional.of(allocatedSlot); } else { return Optional.empty(); } } }

    1. |
    2. | --- |
    3. 如果当前没有可用的 slot,则可以要求 `SlotPool` `ResourceManager` 进行申请:
    4. |

    class SlotPoolImpl implements SlotPool { //向RM申请新的 slot @Override public CompletableFuture requestNewAllocatedSlot( @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, Time timeout) { return requestNewAllocatedSlotInternal(slotRequestId, resourceProfile, timeout) .thenApply((Function.identity())); } private CompletableFuture requestNewAllocatedSlotInternal( @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, @Nonnull Time timeout) { componentMainThreadExecutor.assertRunningInMainThread(); //构造一个 PendingRequest final PendingRequest pendingRequest = new PendingRequest( slotRequestId, resourceProfile); // register request timeout FutureUtils .orTimeout( pendingRequest.getAllocatedSlotFuture(), timeout.toMilliseconds(), TimeUnit.MILLISECONDS, componentMainThreadExecutor) .whenComplete( (AllocatedSlot ignored, Throwable throwable) -> { if (throwable instanceof TimeoutException) { //超时处理 timeoutPendingSlotRequest(slotRequestId); } }); if (resourceManagerGateway == null) { //如果当前没有和 RM 建立连接,则需要等待 RM 建立连接 stashRequestWaitingForResourceManager(pendingRequest); } else { //当前已经和 RM 建立了连接,向 RM 申请slot requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); } return pendingRequest.getAllocatedSlotFuture(); } //如果当前没有和 RM 建立连接,则需要等待 RM 建立连接,加入 waitingForResourceManager //一旦和 RM 建立连接,就会向 RM 发送请求 private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) { log.info(“Cannot serve slot request, no ResourceManager connected. “ + “Adding as pending request [{}]”, pendingRequest.getSlotRequestId()); waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest); } //当前已经和 RM 建立了连接,向 RM 申请slot private void requestSlotFromResourceManager( final ResourceManagerGateway resourceManagerGateway, final PendingRequest pendingRequest) { //生成一个 AllocationID,后面分配的 slot 通过 AllocationID 区分 final AllocationID allocationId = new AllocationID(); //添加到等待处理的请求中 pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest); pendingRequest.getAllocatedSlotFuture().whenComplete( (AllocatedSlot allocatedSlot, Throwable throwable) -> { if (throwable != null || !allocationId.equals(allocatedSlot.getAllocationId())) { // cancel the slot request if there is a failure or if the pending request has // been completed with another allocated slot resourceManagerGateway.cancelSlotRequest(allocationId); } }); //通过 RPC 调用向 RM 请求 slot,RM 的处理流程在前面已经介绍过 CompletableFuture rmResponse = resourceManagerGateway.requestSlot( jobMasterId, new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), rpcTimeout); FutureUtils.whenCompleteAsyncIfNotDone( rmResponse, componentMainThreadExecutor, (Acknowledge ignored, Throwable failure) -> { // on failure, fail the request future if (failure != null) { slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure); } }); } }

    1. |
    2. | --- |
    3. 一旦 `ResourceManager` 完成了 slot 分配的处理流程,会将分配的 slot 提供给 JobManager,最终 `SlotPool.offerSlots()` 方法会被调用:
    4. |

    class SlotPoolImpl { //向 SlotPool 分配 slot,返回已经被接受的 slot 集合。没有被接受的 slot,RM 可以分配给其他 Job。 @Override public Collection offerSlots( TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection offers) { ArrayList result = new ArrayList<>(offers.size()); //SlotPool 可以确定是否接受每一个 slot for (SlotOffer offer : offers) { if (offerSlot( taskManagerLocation, taskManagerGateway, offer)) { result.add(offer); } } return result; } boolean offerSlot( final TaskManagerLocation taskManagerLocation, final TaskManagerGateway taskManagerGateway, final SlotOffer slotOffer) { componentMainThreadExecutor.assertRunningInMainThread(); // check if this TaskManager is valid final ResourceID resourceID = taskManagerLocation.getResourceID(); final AllocationID allocationID = slotOffer.getAllocationId(); if (!registeredTaskManagers.contains(resourceID)) { log.debug(“Received outdated slot offering [{}] from unregistered TaskManager: {}”, slotOffer.getAllocationId(), taskManagerLocation); return false; } // check whether we have already using this slot // 如果当前 slot 关联的 AllocationID 已经在 SlotPool 中出现 AllocatedSlot existingSlot; if ((existingSlot = allocatedSlots.get(allocationID)) != null || (existingSlot = availableSlots.get(allocationID)) != null) { // we need to figure out if this is a repeated offer for the exact same slot, // or another offer that comes from a different TaskManager after the ResourceManager // re-tried the request // we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of // the actual slots on the TaskManagers // Note: The slotOffer should have the SlotID final SlotID existingSlotId = existingSlot.getSlotId(); final SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex()); if (existingSlotId.equals(newSlotId)) { //这个 slot 在之前已经被 SlotPool 接受了,相当于 TaskExecutor 发送了一个重复的 offer log.info(“Received repeated offer for slot [{}]. Ignoring.”, allocationID); // return true here so that the sender will get a positive acknowledgement to the retry // and mark the offering as a success return true; } else { //已经有一个其他的 AllocatedSlot 和 这个 AllocationID 关联了,因此不能接受当前的这个 slot // the allocation has been fulfilled by another slot, reject the offer so the task executor // will offer the slot to the resource manager return false; } } //这个 slot 关联的 AllocationID 此前没有出现过 //新建一个 AllocatedSlot 对象,表示新分配的 slot final AllocatedSlot allocatedSlot = new AllocatedSlot( allocationID, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), taskManagerGateway); // check whether we have request waiting for this slot // 检查是否有一个 request 和 这个 AllocationID 关联 PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); if (pendingRequest != null) { // we were waiting for this! //有一个pending request 正在等待这个 slot allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); //尝试去完成那个等待的请求 if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) { // we could not complete the pending slot future —> try to fulfill another pending request //失败了 allocatedSlots.remove(pendingRequest.getSlotRequestId()); //尝试去满足其他在等待的请求 tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } else { log.debug(“Fulfilled slot request [{}] with allocated slot [{}].”, pendingRequest.getSlotRequestId(), allocationID); } } else { //没有请求在等待这个slot,可能请求已经被满足了 // we were actually not waiting for this: // - could be that this request had been fulfilled // - we are receiving the slots from TaskManagers after becoming leaders //尝试去满足其他在等待的请求 tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } // we accepted the request in any case. slot will be released after it idled for // too long and timed out return true; } }

    1. |
    2. | --- |
    3. 一旦有新的可用的 `AllocatedSlot` 的时候,`SlotPoolImpl` 会尝试用这个 `AllocatedSlot` 去提前满足其他还在等待响应的请求:
    4. |

    class SlotManagerImpl implements SlotPool { private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) { Preconditions.checkState(!allocatedSlot.isUsed(), “Provided slot is still in use.”); //查找和当前 AllocatedSlot 的计算资源相匹配的还在等待的请求 final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); if (pendingRequest != null) { //如果有匹配的请求,那么将 AllocatedSlot 分配给等待的请求 log.debug(“Fulfilling pending slot request [{}] early with returned slot [{}]”, pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); } else { //如果没有,那么这个 AllocatedSlot 变成 available 的 log.debug(“Adding returned slot [{}] to available slots”, allocatedSlot.getAllocationId()); availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); } } private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) { final ResourceProfile slotResources = slot.getResourceProfile(); // try the requests sent to the resource manager first for (PendingRequest request : pendingRequests.values()) { if (slotResources.isMatching(request.getResourceProfile())) { pendingRequests.removeKeyA(request.getSlotRequestId()); return request; } } // try the requests waiting for a resource manager connection next for (PendingRequest request : waitingForResourceManager.values()) { if (slotResources.isMatching(request.getResourceProfile())) { waitingForResourceManager.remove(request.getSlotRequestId()); return request; } } // no request pending, or no request matches return null; } }

    1. |
    2. | --- |
    3. `slotPool` 启动的时候会开启一个定时调度的任务,周期性地检查空闲的 slot,如果 slot 空闲时间过长,会将该 slot 归还给 TaskManager:
    4. |

    class SlotPoolImpl { private void checkIdleSlot() { // The timestamp in SlotAndTimestamp is relative final long currentRelativeTimeMillis = clock.relativeTimeMillis(); final List expiredSlots = new ArrayList<>(availableSlots.size()); for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) { if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) { expiredSlots.add(slotAndTimestamp.slot); } } final FlinkException cause = new FlinkException(“Releasing idle slot.”); for (AllocatedSlot expiredSlot : expiredSlots) { final AllocationID allocationID = expiredSlot.getAllocationId(); if (availableSlots.tryRemove(allocationID) != null) { //将空闲的 slot 归还给 TaskManager log.info(“Releasing idle slot [{}].”, allocationID); final CompletableFuture freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot( allocationID, cause, rpcTimeout); FutureUtils.whenCompleteAsyncIfNotDone( freeSlotFuture, componentMainThreadExecutor, (Acknowledge ignored, Throwable throwable) -> { if (throwable != null) { if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) { log.debug(“Releasing slot [{}] of registered TaskExecutor {} failed. “ + “Trying to fulfill a different slot request.”, allocationID, expiredSlot.getTaskManagerId(), throwable); tryFulfillSlotRequestOrMakeAvailable(expiredSlot); } else { log.debug(“Releasing slot [{}] failed and owning TaskExecutor {} is no “ + “longer registered. Discarding slot.”, allocationID, expiredSlot.getTaskManagerId()); } } }); } } scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout); } }

    1. |
    2. | --- |
    3. <a name="cf03a49d"></a>
    4. ### [](https://blog.jrwang.me/2019/flink-source-code-resource-manager/#scheduler-%E5%92%8C-slotsharingmanager)Scheduler 和 SlotSharingManager
    5. 我们已经了解,`SlotPool` 主要负责的是分配给当前 `JobMaster` PhysicalSlot 的管理。但是,具体到每一个 Task 所需要的计算资源的调度和管理,是按照 `LogicalSlot` 进行组织的,不同的 Task 所分配的 LogicalSlot 各不相同,但它们底层的 PhysicalSlot 可能是同一个。主要的逻辑都封装在 `SlotSharingManager` `Scheduler` 中。<br />前面已经提到过,通过构造一个由 `TaskSlot` 构成的树形结构可以实现 `SlotSharingGroup` 内的资源共享以及 `CoLocationGroup` 的强制约束,这主要就是通过 `SlotSharingManager` 来完成的。每一个 `SlotSharingGroup` 都会有一个与其对应的 `SlotSharingManager`。<br />`SlotSharingManager` 主要的成员变量如下,除了关联的 `SlotSharingGroupId` 以外,最重要的就是用于管理 `TaskSlot` 的三个 Map
    6. |

    class SlotSharingManager { private final SlotSharingGroupId slotSharingGroupId; / Actions to release allocated slots after a complete multi task slot hierarchy has been released. */ private final AllocatedSlotActions allocatedSlotActions; / Owner of the slots to which to return them when they are released from the outside. / private final SlotOwner slotOwner; //所有的 TaskSlot,包括 root 和 inner 和 leaf private final Map allTaskSlots; //root MultiTaskSlot,但底层的 Physical Slot 还没有分配好 /** Root nodes which have not been completed because the allocated slot is still pending. / private final Map unresolvedRootSlots; //root MultiTaskSlot,底层的 physical slot 也已经分配好了,按照两层 map 的方式组织, //可以通过已分配的 Physical slot 所在的TaskManager 的位置进行查找 /* Root nodes which have been completed (the underlying allocated slot has been assigned). / private final Map> resolvedRootSlots; }

    1. |
    2. | --- |
    3. 当需要构造一个新的 `TaskSlot` 树的时候,需要调用 `createRootSlot` 来创建根节点:
    4. |

    class SlotSharingManager { MultiTaskSlot createRootSlot( SlotRequestId slotRequestId, CompletableFuture<? extends SlotContext> slotContextFuture, SlotRequestId allocatedSlotRequestId) { final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot( slotRequestId, slotContextFuture, allocatedSlotRequestId); LOG.debug(“Create multi task slot [{}] in slot [{}].”, slotRequestId, allocatedSlotRequestId); allTaskSlots.put(slotRequestId, rootMultiTaskSlot); //先加入到 unresolvedRootSlots 中 unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot); // add the root node to the set of resolved root nodes once the SlotContext future has // been completed and we know the slot’s TaskManagerLocation slotContextFuture.whenComplete( (SlotContext slotContext, Throwable throwable) -> { if (slotContext != null) { //一旦physical slot完成分配,就从 unresolvedRootSlots 中移除,加入到 resolvedRootSlots 中 final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId); if (resolvedRootNode != null) { final AllocationID allocationId = slotContext.getAllocationId(); LOG.trace(“Fulfill multi task slot [{}] with slot [{}].”, slotRequestId, allocationId); final Map innerMap = resolvedRootSlots.computeIfAbsent( slotContext.getTaskManagerLocation(), taskManagerLocation -> new HashMap<>(4)); MultiTaskSlot previousValue = innerMap.put(allocationId, resolvedRootNode); Preconditions.checkState(previousValue == null); } } else { rootMultiTaskSlot.release(throwable); } }); return rootMultiTaskSlot; } }

    1. |
    2. | --- |
    3. 另外,Flink 中不同 Task 只要在同一个 `SlotSharingGroup` 中就可以进行资源共享,但有一个隐含的条件是,这两个 Task 需要是不同的 Operator 的子任务。 例如,如果 map 算子的并行度为三,map[1] 子任务和 map[2] 子任务是不能落在同一个 PhysicalSlot 中的。在 `listResolvedRootSlotInfo` `getUnresolvedRootSlot` 中,都有 `!multiTaskSlot.contains(groupId)` 的逻辑,也就是说要确保一棵 `TaskSlot` 构成的树中不会出现同一个算子的不同子任务。
    4. |

    class SlotSharingManager { @Nonnull public Collection listResolvedRootSlotInfo(@Nullable AbstractID groupId) { //列出已经分配了physical slot 的root MultiTaskSlot,但要求 MultiTaskSlot 不包含指定的 groupId return resolvedRootSlots .values() .stream() .flatMap((Map map) -> map.values().stream()) .filter((MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId)) .map((MultiTaskSlot multiTaskSlot) -> (SlotInfo) multiTaskSlot.getSlotContextFuture().join()) .collect(Collectors.toList()); } @Nullable public MultiTaskSlot getResolvedRootSlot(@Nonnull SlotInfo slotInfo) { //根据 SlotInfo(TasManagerLocation 和 AllocationId)找到 MultiTaskSlot Map forLocationEntry = resolvedRootSlots.get(slotInfo.getTaskManagerLocation()); return forLocationEntry != null ? forLocationEntry.get(slotInfo.getAllocationId()) : null; } /**

    1. * Gets an unresolved slot which does not yet contain the given groupId. An unresolved
    2. * slot is a slot whose underlying allocated slot has not been allocated yet.
    3. *
    4. * @param groupId which the returned slot must not contain
    5. * @return the unresolved slot or null if there was no root slot with free capacities
    6. */
    7. @Nullable
    8. MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
    9. //找到一个不包含指定 groupId 的 root MultiTaskSlot
    10. for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
    11. if (!multiTaskSlot.contains(groupId)) {
    12. return multiTaskSlot;
    13. }
    14. }
    15. return null;
    16. }

    }

    1. |
    2. | --- |
    3. 任务调度时 `LogicalSlot` 资源的申请通过 `Scheduler` 接口进行管理,`Scheduler` 接口继承了 `SlotProvider` 接口, 它的唯一实现类是 `SchuedulerImpl`
    4. |

    public interface SlotProvider { //申请 slot,返回值一个 LogicalSlot 的 future CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout); void cancelSlotRequest( SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause); } public interface Scheduler extends SlotProvider, SlotOwner { void start(@Nonnull ComponentMainThreadExecutor mainThreadExecutor); boolean requiresPreviousExecutionGraphAllocations(); }

    1. |
    2. | --- |
    3. `SchedulerImpl` 的主要成员变量有:
    4. |

    class SchedulerImpl implements Scheduler { private final SlotSelectionStrategy slotSelectionStrategy; private final SlotPool slotPool; private final Map slotSharingManagers; }

    1. |
    2. | --- |
    3. 很明显,`SchedulerImpl` 借助 `SlotPool` 来申请 PhysicalSlot,借助 `SlotSharingManager` 实现 slot 共享。`SlotSelectionStrategy` 接口主要用于从一组 slot 中选出最符合资源申请偏好的一个。
    4. |

    class SchedulerImpl { public CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) { log.debug(“Received slot request [{}] for task: {}”, slotRequestId, scheduledUnit.getTaskToExecute()); componentMainThreadExecutor.assertRunningInMainThread(); final CompletableFuture allocationResultFuture = new CompletableFuture<>(); //如果没有指定 SlotSharingGroupId,说明这个任务不运行 slot 共享,要独占一个 slot CompletableFuture allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ? allocateSingleSlot(slotRequestId, slotProfile, allowQueuedScheduling, allocationTimeout) : allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout); allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> { if (failure != null) { cancelSlotRequest( slotRequestId, scheduledUnit.getSlotSharingGroupId(), failure); allocationResultFuture.completeExceptionally(failure); } else { allocationResultFuture.complete(slot); } }); return allocationResultFuture; } @Override public void cancelSlotRequest( SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { componentMainThreadExecutor.assertRunningInMainThread(); if (slotSharingGroupId != null) { releaseSharedSlot(slotRequestId, slotSharingGroupId, cause); } else { slotPool.releaseSlot(slotRequestId, cause); } } @Override public void returnLogicalSlot(LogicalSlot logicalSlot) { SlotRequestId slotRequestId = logicalSlot.getSlotRequestId(); SlotSharingGroupId slotSharingGroupId = logicalSlot.getSlotSharingGroupId(); FlinkException cause = new FlinkException(“Slot is being returned to the SlotPool.”); cancelSlotRequest(slotRequestId, slotSharingGroupId, cause); } }

    1. |
    2. | --- |
    3. 这几个对外暴露的方法的逻辑都比较清晰,接着来看下内部的具体实现。<br />如果不允许资源共享,那么直接从 `SlotPool` 中获取 PhysicalSlot,然后创建一个 `LogicalSlot` 即可:
    4. |

    class SchedulerImpl { private CompletableFuture allocateSingleSlot( SlotRequestId slotRequestId, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) { //先尝试从 SlotPool 可用的 AllocatedSlot 中获取 Optional slotAndLocality = tryAllocateFromAvailable(slotRequestId, slotProfile); if (slotAndLocality.isPresent()) { //如果有已经有可用的了,就创建一个 SingleLogicalSlot,并作为 AllocatedSlot 的payload // already successful from available try { return CompletableFuture.completedFuture( completeAllocationByAssigningPayload(slotRequestId, slotAndLocality.get())); } catch (FlinkException e) { return FutureUtils.completedExceptionally(e); } } else if (allowQueuedScheduling) { //暂时没有可用的,如果允许排队的话,可以要求 SlotPool 向 RM 申请一个新的 slot // we allocate by requesting a new slot return slotPool .requestNewAllocatedSlot(slotRequestId, slotProfile.getResourceProfile(), allocationTimeout) .thenApply((PhysicalSlot allocatedSlot) -> { try { return completeAllocationByAssigningPayload(slotRequestId, new SlotAndLocality(allocatedSlot, Locality.UNKNOWN)); } catch (FlinkException e) { throw new CompletionException(e); } }); } else { // failed to allocate return FutureUtils.completedExceptionally( new NoResourceAvailableException(“Could not allocate a simple slot for “ + slotRequestId + ‘.’)); } } private Optional tryAllocateFromAvailable( @Nonnull SlotRequestId slotRequestId, @Nonnull SlotProfile slotProfile) { Collection slotInfoList = slotPool.getAvailableSlotsInformation(); Optional selectedAvailableSlot = slotSelectionStrategy.selectBestSlotForProfile(slotInfoList, slotProfile); return selectedAvailableSlot.flatMap(slotInfoAndLocality -> { Optional optionalAllocatedSlot = slotPool.allocateAvailableSlot( slotRequestId, slotInfoAndLocality.getSlotInfo().getAllocationId()); return optionalAllocatedSlot.map( allocatedSlot -> new SlotAndLocality(allocatedSlot, slotInfoAndLocality.getLocality())); }); } }

    1. |
    2. | --- |
    3. 如果需要进行资源共享,那么还要进一步考虑 `CoLocationGroup` 强制约束的情况,它的核心就在于构造 `TaskSlot` 构成的树,然后在树上创建一个叶子节点,叶子节点里封装了需要的 `LogicalSlot`。更详细的流程参考下面代码和添加的注释:
    4. |

    class SchedulerImpl { private CompletableFuture allocateSharedSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) { //每一个 SlotSharingGroup 对应一个 SlotSharingManager // allocate slot with slot sharing final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( scheduledUnit.getSlotSharingGroupId(), id -> new SlotSharingManager( id, slotPool, this)); //分配 MultiTaskSlot final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality; try { if (scheduledUnit.getCoLocationConstraint() != null) { //存在 ColLocation 约束 multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot( scheduledUnit.getCoLocationConstraint(), multiTaskSlotManager, slotProfile, allowQueuedScheduling, allocationTimeout); } else { multiTaskSlotLocality = allocateMultiTaskSlot( scheduledUnit.getJobVertexId(), multiTaskSlotManager, slotProfile, allowQueuedScheduling, allocationTimeout); } } catch (NoResourceAvailableException noResourceException) { return FutureUtils.completedExceptionally(noResourceException); } // sanity check Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(scheduledUnit.getJobVertexId())); //在 MultiTaskSlot 下创建叶子节点 SingleTaskSlot,并获取可以分配给任务的 SingleLogicalSlot final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot( slotRequestId, scheduledUnit.getJobVertexId(), multiTaskSlotLocality.getLocality()); return leaf.getLogicalSlotFuture(); } private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot( CoLocationConstraint coLocationConstraint, SlotSharingManager multiTaskSlotManager, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) throws NoResourceAvailableException { //coLocationConstraint 会和分配给它的 MultiTaskSlot(不是root) 的 SlotRequestId 绑定 //这个绑定关系只有在分配了 MultiTaskSlot 之后才会生成 //可以根据 SlotRequestId 直接定位到 MultiTaskSlot final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId(); if (coLocationSlotRequestId != null) { // we have a slot assigned —> try to retrieve it final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId); if (taskSlot != null) { Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot); return SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot) taskSlot), Locality.LOCAL); } else { // the slot may have been cancelled in the mean time coLocationConstraint.setSlotRequestId(null); } } if (coLocationConstraint.isAssigned()) { // refine the preferred locations of the slot profile slotProfile = new SlotProfile( slotProfile.getResourceProfile(), Collections.singleton(coLocationConstraint.getLocation()), slotProfile.getPreferredAllocations()); } //为这个 coLocationConstraint 分配 MultiTaskSlot,先找到符合要求的root MultiTaskSlot // get a new multi task slot SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = allocateMultiTaskSlot( coLocationConstraint.getGroupId(), multiTaskSlotManager, slotProfile, allowQueuedScheduling, allocationTimeout); // check whether we fulfill the co-location constraint if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() != Locality.LOCAL) { multiTaskSlotLocality.getMultiTaskSlot().release( new FlinkException(“Multi task slot is not local and, thus, does not fulfill the co-location constraint.”)); throw new NoResourceAvailableException(“Could not allocate a local multi task slot for the “ + “co location constraint “ + coLocationConstraint + ‘.’); } //在 root MultiTaskSlot 下面创建一个二级的 MultiTaskSlot,分配给这个 coLocationConstraint final SlotRequestId slotRequestId = new SlotRequestId(); final SlotSharingManager.MultiTaskSlot coLocationSlot = multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot( slotRequestId, coLocationConstraint.getGroupId()); //为 coLocationConstraint 绑定 slotRequestId,后续就可以直接通过这个 slotRequestId 定位到 MultiTaskSlot // mark the requested slot as co-located slot for other co-located tasks coLocationConstraint.setSlotRequestId(slotRequestId); // lock the co-location constraint once we have obtained the allocated slot coLocationSlot.getSlotContextFuture().whenComplete( (SlotContext slotContext, Throwable throwable) -> { if (throwable == null) { // check whether we are still assigned to the co-location constraint if (Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId)) { //为这个 coLocationConstraint 绑定位置 coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation()); } else { log.debug(“Failed to lock colocation constraint {} because assigned slot “ + “request {} differs from fulfilled slot request {}.”, coLocationConstraint.getGroupId(), coLocationConstraint.getSlotRequestId(), slotRequestId); } } else { log.debug(“Failed to lock colocation constraint {} because the slot “ + “allocation for slot request {} failed.”, coLocationConstraint.getGroupId(), coLocationConstraint.getSlotRequestId(), throwable); } }); return SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot, multiTaskSlotLocality.getLocality()); } private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot( AbstractID groupId, SlotSharingManager slotSharingManager, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) throws NoResourceAvailableException { //找到符合要求的已经分配了 AllocatedSlot 的 root MultiTaskSlot 集合, //这里的符合要求是指 root MultiTaskSlot 不含有当前 groupId, 避免把 groupId 相同(同一个 JobVertex)的不同 task 分配到同一个 slot 中 Collection resolvedRootSlotsInfo = slotSharingManager.listResolvedRootSlotInfo(groupId); //由 slotSelectionStrategy 选出最符合条件的 SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality = slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null); //对 MultiTaskSlot 和 Locality 做一层封装 final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = bestResolvedRootSlotWithLocality != null ? new SlotSharingManager.MultiTaskSlotLocality( slotSharingManager.getResolvedRootSlot(bestResolvedRootSlotWithLocality.getSlotInfo()), bestResolvedRootSlotWithLocality.getLocality()) : null; //如果 MultiTaskSlot 对应的 AllocatedSlot 和请求偏好的 slot 落在同一个 TaskManager,那么就选择这个 MultiTaskSlot if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) { return multiTaskSlotLocality; } //这里由两种可能: // 1)multiTaskSlotLocality == null,说明没有找到符合条件的 root MultiTaskSlot // 2) multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL,不符合 Locality 偏好 //尝试从 SlotPool 中未使用的 slot 中选择 final SlotRequestId allocatedSlotRequestId = new SlotRequestId(); final SlotRequestId multiTaskSlotRequestId = new SlotRequestId(); Optional optionalPoolSlotAndLocality = tryAllocateFromAvailable(allocatedSlotRequestId, slotProfile); if (optionalPoolSlotAndLocality.isPresent()) { //如果从 SlotPool 中找到了未使用的 slot SlotAndLocality poolSlotAndLocality = optionalPoolSlotAndLocality.get(); //如果未使用的 AllocatedSlot 符合 Locality 偏好,或者前一步没有找到可用的 MultiTaskSlot if (poolSlotAndLocality.getLocality() == Locality.LOCAL || bestResolvedRootSlotWithLocality == null) { //基于 新分配的 AllocatedSlot 创建一个 root MultiTaskSlot final PhysicalSlot allocatedSlot = poolSlotAndLocality.getSlot(); final SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.createRootSlot( multiTaskSlotRequestId, CompletableFuture.completedFuture(poolSlotAndLocality.getSlot()), allocatedSlotRequestId); //将新创建的 root MultiTaskSlot 作为 AllocatedSlot 的 payload if (allocatedSlot.tryAssignPayload(multiTaskSlot)) { return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, poolSlotAndLocality.getLocality()); } else { multiTaskSlot.release(new FlinkException(“Could not assign payload to allocated slot “ + allocatedSlot.getAllocationId() + ‘.’)); } } } if (multiTaskSlotLocality != null) { //如果都不符合 Locality 偏好,或者 SlotPool 中没有可用的 slot 了 // prefer slot sharing group slots over unused slots if (optionalPoolSlotAndLocality.isPresent()) { slotPool.releaseSlot( allocatedSlotRequestId, new FlinkException(“Locality constraint is not better fulfilled by allocated slot.”)); } return multiTaskSlotLocality; } //到这里,说明 1)slotSharingManager 中没有符合要求的 root MultiTaskSlot && 2)slotPool 中没有可用的 slot 了 if (allowQueuedScheduling) { //先检查 slotSharingManager 中是不是还有没完成 slot 分配的 root MultiTaskSlot // there is no slot immediately available —> check first for uncompleted slots at the slot sharing group SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId); if (multiTaskSlot == null) { //如果没有,就需要 slotPool 向 RM 请求新的 slot 了 // it seems as if we have to request a new slot from the resource manager, this is always the last resort!!! final CompletableFuture slotAllocationFuture = slotPool.requestNewAllocatedSlot( allocatedSlotRequestId, slotProfile.getResourceProfile(), allocationTimeout); //请求分配后,就是同样的流程的,创建一个 root MultiTaskSlot,并作为新分配的 AllocatedSlot 的负载 multiTaskSlot = slotSharingManager.createRootSlot( multiTaskSlotRequestId, slotAllocationFuture, allocatedSlotRequestId); slotAllocationFuture.whenComplete( (PhysicalSlot allocatedSlot, Throwable throwable) -> { final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId); if (taskSlot != null) { // still valid if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) { taskSlot.release(throwable); } else { if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) { taskSlot.release(new FlinkException(“Could not assign payload to allocated slot “ + allocatedSlot.getAllocationId() + ‘.’)); } } } else { slotPool.releaseSlot( allocatedSlotRequestId, new FlinkException(“Could not find task slot with “ + multiTaskSlotRequestId + ‘.’)); } }); } return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN); } throw new NoResourceAvailableException(“Could not allocate a shared slot for “ + groupId + ‘.’); } } ``` | | —- |

    小结

    在分布式计算引擎,一个非常核心的功能在于对计算资源的管理。在这篇文章里我们了解了 Flink 以 Task slot 为基本单位的资源管理模式,从 TaskExecutor, ResourceManagerJobMaster,对于每一个组件内部的 slot 管理方式都进行了较为细致的分解,包括 ResourceManager 的动态资源管理,SlotSharingGroupCoLocationGroup 的共享约束等。动态资源管理机制使得 Flink 可以更为方便地和 Yarn、Kubernetes 等资源管理框架进行继承,而计算资源共享机制则使得作业内部子任务之间的网络开销大大降低。
    最后,附上两张图以加深理解:
    Flink 源码阅读笔记(6)- 计算资源管理 - 图1 Flink 源码阅读笔记(6)- 计算资源管理 - 图2

    参考