ResourceManager简介
StandaloneResourceManager: Standalone模式
ActiveResourceManager:外部资源管理模式(yarn、k8s)
ResourceManager组件SlotManager:默认DeclarativeSlotManager。 根据cluster.fine-grained-resource-management.enabled配置为FineGrainedSlotManager。
ResourceManager资源管理
入口在ResourceManagerGateway#declareRequiredResources。
具体进入SlotManager#processResourceRequirements。
以FineGrainedSlotManager为例,最终会定时check resource。主要逻辑就在这个方法FineGrainedSlotManager#checkResourceRequirements。
private void checkResourceRequirements() {
//ResourceAllocationStrategy分配资源final ResourceAllocationResult result =resourceAllocationStrategy.tryFulfillRequirements(missingResources, taskManagerTracker);// Allocate slots according to the resultallocateSlotsAccordingTo(result.getAllocationsOnRegisteredResources());// Allocate task managers according to the resultfinal Set<PendingTaskManagerId> failAllocations =allocateTaskManagersAccordingTo(result.getPendingTaskManagersToAllocate());// Record slot allocation of pending task managersfinal Map<PendingTaskManagerId, Map<JobID, ResourceCounter>>pendingResourceAllocationResult =new HashMap<>(result.getAllocationsOnPendingResources());pendingResourceAllocationResult.keySet().removeAll(failAllocations);taskManagerTracker.replaceAllPendingAllocations(pendingResourceAllocationResult);unfulfillableJobs.clear();unfulfillableJobs.addAll(result.getUnfulfillableJobs());for (PendingTaskManagerId pendingTaskManagerId : failAllocations) {unfulfillableJobs.addAll(result.getAllocationsOnPendingResources().get(pendingTaskManagerId).keySet());}// Notify jobs that can not be fulfilledif (sendNotEnoughResourceNotifications) {for (JobID jobId : unfulfillableJobs) {LOG.warn("Could not fulfill resource requirements of job {}.", jobId);resourceActions.notifyNotEnoughResourcesAvailable(jobId, resourceTracker.getAcquiredResources(jobId));}}}
资源分配
资源分配策略在DefaultResourceAllocationStrategy里实现,DefaultResourceAllocationStrategy#tryFulfillRequirements。
- 先从已注册的资源分配
- 还不满足从pending资源分配(已申请未完成的pending task manger)
- pending还不满足,新申请taskmanager
分配的结果保存在ResourceAllocationResult。
TaskManager部署
根据ResourceAllocationResult 的pending taskManager 调用allocateTaskManagersAccordingTo。
申请动作通过ResourceActions的allocateResource方法从SlotManager调回ResourceManager。其实现为ResourceManager私有内部类ResourceActionsImpl,最终调到ActiveResourceManager#startNewWorker抽象方法。
最终的资源申请会根据集群deploy类型加载不同的ResourceManagerDriver。k8s会通过KubernetesResourceManagerDriver 起一个taskmanager pod。
TM container的入口为kubernetes-taskmanager.sh
ResourceManager资源申请不成功,通过ResourceActions#notifyNotEnoughResourcesAvailable和ResourceActions#notifyAllocationFailure异步通知JobMaster。 申请成功,由task manager 回调JobMaster#offerSlots通知JobMaster。
