ResourceManager简介
StandaloneResourceManager: Standalone模式
ActiveResourceManager:外部资源管理模式(yarn、k8s)
ResourceManager组件SlotManager:默认DeclarativeSlotManager。 根据cluster.fine-grained-resource-management.enabled配置为FineGrainedSlotManager。
ResourceManager资源管理
入口在ResourceManagerGateway#declareRequiredResources。
具体进入SlotManager#processResourceRequirements。
以FineGrainedSlotManager为例,最终会定时check resource。主要逻辑就在这个方法FineGrainedSlotManager#checkResourceRequirements。
private void checkResourceRequirements() {
//ResourceAllocationStrategy分配资源
final ResourceAllocationResult result =
resourceAllocationStrategy.tryFulfillRequirements(
missingResources, taskManagerTracker);
// Allocate slots according to the result
allocateSlotsAccordingTo(result.getAllocationsOnRegisteredResources());
// Allocate task managers according to the result
final Set<PendingTaskManagerId> failAllocations =
allocateTaskManagersAccordingTo(result.getPendingTaskManagersToAllocate());
// Record slot allocation of pending task managers
final Map<PendingTaskManagerId, Map<JobID, ResourceCounter>>
pendingResourceAllocationResult =
new HashMap<>(result.getAllocationsOnPendingResources());
pendingResourceAllocationResult.keySet().removeAll(failAllocations);
taskManagerTracker.replaceAllPendingAllocations(pendingResourceAllocationResult);
unfulfillableJobs.clear();
unfulfillableJobs.addAll(result.getUnfulfillableJobs());
for (PendingTaskManagerId pendingTaskManagerId : failAllocations) {
unfulfillableJobs.addAll(
result.getAllocationsOnPendingResources().get(pendingTaskManagerId).keySet());
}
// Notify jobs that can not be fulfilled
if (sendNotEnoughResourceNotifications) {
for (JobID jobId : unfulfillableJobs) {
LOG.warn("Could not fulfill resource requirements of job {}.", jobId);
resourceActions.notifyNotEnoughResourcesAvailable(
jobId, resourceTracker.getAcquiredResources(jobId));
}
}
}
资源分配
资源分配策略在DefaultResourceAllocationStrategy里实现,DefaultResourceAllocationStrategy#tryFulfillRequirements。
- 先从已注册的资源分配
- 还不满足从pending资源分配(已申请未完成的pending task manger)
- pending还不满足,新申请taskmanager
分配的结果保存在ResourceAllocationResult。
TaskManager部署
根据ResourceAllocationResult 的pending taskManager 调用allocateTaskManagersAccordingTo。
申请动作通过ResourceActions的allocateResource方法从SlotManager调回ResourceManager。其实现为ResourceManager私有内部类ResourceActionsImpl,最终调到ActiveResourceManager#startNewWorker抽象方法。
最终的资源申请会根据集群deploy类型加载不同的ResourceManagerDriver。k8s会通过KubernetesResourceManagerDriver 起一个taskmanager pod。
TM container的入口为kubernetes-taskmanager.sh
ResourceManager资源申请不成功,通过ResourceActions#notifyNotEnoughResourcesAvailable和ResourceActions#notifyAllocationFailure异步通知JobMaster。 申请成功,由task manager 回调JobMaster#offerSlots通知JobMaster。