Flink tm的规格不是直接通过参数设置个数以及内存大小的(per job模式),而是通过用户提交sql或者datastream作业时提交的plan中设置的资源来动态计算的,以下是计算的逻辑
计算每一个TM的slot数量
主要代码在yarn和k8s的ResourceManager里面都有一块这个代码来根据ResourceProfile来计算每个tm上的slot个数
org.apache.flink.kubernetes.runtime.clusterframework.KubernetesResourceManager#calculateTaskManagerSlotNumbers(ResourceProfile resourceProfile, int slotNum)
参数中的ResourceProfile是每一个chain起来的一个task节点,对于chain之后的点,CPU取最大的最大值,内存取总和。ResourceProfile可以得到一个slot所需要的内存和cpu的总量,以及第二个参数是这个类型ResourProfile的slot数,这里称为类型是因为flink申请tm的时候是按照ResourceProfile分类,只有同一类的ResourceProfile才会落在一个TM上。
主要有六个参数综合来协调控制
每个tm最大的slot个数
先计算每个tm最大的slot个数,由每个tm的cpu和memory上限决定,参数是
taskmanager.max.cpu.core: 32
taskmanager.max.memory.mb: 128*1024
int maxSlotsPerTm = Math.min((int) Math.floor(maxCorePerContainer / resourceProfile.getCpuCores()),
(int) Math.floor(1.0 * maxMemoryPerContainer / resourceProfile.getMemoryInMB()));
每个tm最小的slot个数
计算每个tm能分配的最小slot个数,由每个tm的cpu和memory的最小下限决定,参数是
taskmanager.min.cpu.core: 0.25
taskmanager.min.memory.mb: 1024
int minSlotsPerTm = Math.min((int) Math.floor(minCorePerContainer / resourceProfile.getCpuCores()),
(int) Math.floor(1.0 * minMemoryPerContainer / resourceProfile.getMemoryInMB()));
每个tm倾向的slot个数
计算每个tm比较倾向的slot个数,由每个tm prefer的cpu和memory大小决定,参数是
taskmanager.multi-slot.preferred.memory.mb: 4096
taskmanager.multi-slot.preferred.cpu.core: 1.0
int preferredSlotsPerTm = Math.min((int) Math.round(preferredCorePerContainer / resourceProfile.getCpuCores()),
(int) Math.round(1.0 * preferredMemoryPerContainer / resourceProfile.getMemoryInMB()));
得到这三组值,取最终的一个合适的值
if (preferredSlotsPerTm > maxSlotsPerTm) {
preferredSlotsPerTm = maxSlotsPerTm;
}
if (preferredSlotsPerTm < minSlotsPerTm) {
preferredSlotsPerTm = minSlotsPerTm;
}
// calculate number of task managers
int tmNum = slotNum / preferredSlotsPerTm;
接下来如果slotNum能被preferredSlotsPerTm整除得到的TM个数和每个tm的slot数就是
Collections.nCopies(tmNum, preferredSlotsPerTm);
如果不能整除
int maxSlotsPerTmPlan1 = slotNum % tmNum > 0 ? slotNum / tmNum + 1 : slotNum / tmNum;
int minSlotsPerTmPlan2 = slotNum / (tmNum + 1);
if (maxSlotsPerTmPlan1 <= maxSlotsPerTm &&
maxSlotsPerTmPlan1 - preferredSlotsPerTm <= preferredSlotsPerTm - minSlotsPerTmPlan2) {
// choose plan 1
result.addAll(Collections.nCopies(slotNum % tmNum, slotNum / tmNum + 1));
result.addAll(Collections.nCopies(tmNum - slotNum % tmNum, slotNum / tmNum));
} else {
// choose plan 2
result.addAll(Collections.nCopies(slotNum % (tmNum + 1), slotNum / (tmNum + 1) + 1));
result.addAll(Collections.nCopies((tmNum + 1) - slotNum % (tmNum + 1), slotNum / (tmNum + 1)));
}
slotNum分配到tmNum上有余,那么有两个方案
plan1
将没有分配到tm的slot,也就是余数,给每个tm上再分一个
result.addAll(Collections.nCopies(slotNum % tmNum, slotNum / tmNum + 1)); 需要多分配一个slot的tm
result.addAll(Collections.nCopies(tmNum - slotNum % tmNum, slotNum / tmNum));
plan2
多申请一个tm,slotNum % (tmNum + 1)得到多申请一个tm后的余数是多少,和上面的计算方式一致,只是tm+1了
result.addAll(Collections.nCopies(slotNum % (tmNum + 1), slotNum / (tmNum + 1) + 1)); 需要多分配一个slot的tm
result.addAll(Collections.nCopies((tmNum + 1) - slotNum % (tmNum + 1), slotNum / (tmNum + 1)));
选择plan1还是plan2的判断的依据就是
- maxSlotsPerTmPlan1 <= maxSlotsPerTm 加一后的每个tm的slot数不超过每个tm的slot上限
- maxSlotsPerTmPlan1 - preferredSlotsPerTm <= preferredSlotsPerTm - minSlotsPerTmPlan2 并且+1后每个tm的slot个数和prefer的差值要小于prefer和每个TM最小的slot的差值,其实这个就是看向上向下的偏移量哪个大,取变化更小的一种方式,就会更接近我们的prefer的值
到这里tm数量和slot数已经计算得到了,再根据每个tm的slot数,resourceProfile加框架本身设置的native,direct,heap内存,得到最终tm的规格
org.apache.flink.runtime.clusterframework.types.TaskManagerResource#fromConfiguration(org.apache.flink.configuration.Configuration, org.apache.flink.runtime.clusterframework.types.ResourceProfile, int, int)