ResourceManager简介

StandaloneResourceManager: Standalone模式
ActiveResourceManager:外部资源管理模式(yarn、k8s)

ResourceManager组件SlotManager:默认DeclarativeSlotManager。 根据cluster.fine-grained-resource-management.enabled配置为FineGrainedSlotManager。

ResourceManager资源管理

入口在ResourceManagerGateway#declareRequiredResources。
具体进入SlotManager#processResourceRequirements。
以FineGrainedSlotManager为例,最终会定时check resource。主要逻辑就在这个方法FineGrainedSlotManager#checkResourceRequirements。

  1. private void checkResourceRequirements() {
  1. //ResourceAllocationStrategy分配资源
  2. final ResourceAllocationResult result =
  3. resourceAllocationStrategy.tryFulfillRequirements(
  4. missingResources, taskManagerTracker);
  5. // Allocate slots according to the result
  6. allocateSlotsAccordingTo(result.getAllocationsOnRegisteredResources());
  7. // Allocate task managers according to the result
  8. final Set<PendingTaskManagerId> failAllocations =
  9. allocateTaskManagersAccordingTo(result.getPendingTaskManagersToAllocate());
  10. // Record slot allocation of pending task managers
  11. final Map<PendingTaskManagerId, Map<JobID, ResourceCounter>>
  12. pendingResourceAllocationResult =
  13. new HashMap<>(result.getAllocationsOnPendingResources());
  14. pendingResourceAllocationResult.keySet().removeAll(failAllocations);
  15. taskManagerTracker.replaceAllPendingAllocations(pendingResourceAllocationResult);
  16. unfulfillableJobs.clear();
  17. unfulfillableJobs.addAll(result.getUnfulfillableJobs());
  18. for (PendingTaskManagerId pendingTaskManagerId : failAllocations) {
  19. unfulfillableJobs.addAll(
  20. result.getAllocationsOnPendingResources().get(pendingTaskManagerId).keySet());
  21. }
  22. // Notify jobs that can not be fulfilled
  23. if (sendNotEnoughResourceNotifications) {
  24. for (JobID jobId : unfulfillableJobs) {
  25. LOG.warn("Could not fulfill resource requirements of job {}.", jobId);
  26. resourceActions.notifyNotEnoughResourcesAvailable(
  27. jobId, resourceTracker.getAcquiredResources(jobId));
  28. }
  29. }
  30. }

```

资源分配

资源分配策略在DefaultResourceAllocationStrategy里实现,DefaultResourceAllocationStrategy#tryFulfillRequirements。

  1. 先从已注册的资源分配
  2. 还不满足从pending资源分配(已申请未完成的pending task manger)
  3. pending还不满足,新申请taskmanager

分配的结果保存在ResourceAllocationResult。

TaskManager部署

根据ResourceAllocationResult 的pending taskManager 调用allocateTaskManagersAccordingTo。
申请动作通过ResourceActions的allocateResource方法从SlotManager调回ResourceManager。其实现为ResourceManager私有内部类ResourceActionsImpl,最终调到ActiveResourceManager#startNewWorker抽象方法。
最终的资源申请会根据集群deploy类型加载不同的ResourceManagerDriver。k8s会通过KubernetesResourceManagerDriver 起一个taskmanager pod。
TM container的入口为kubernetes-taskmanager.sh

ResourceManager资源申请不成功,通过ResourceActions#notifyNotEnoughResourcesAvailable和ResourceActions#notifyAllocationFailure异步通知JobMaster。 申请成功,由task manager 回调JobMaster#offerSlots通知JobMaster。