Flink tm的规格不是直接通过参数设置个数以及内存大小的(per job模式),而是通过用户提交sql或者datastream作业时提交的plan中设置的资源来动态计算的,以下是计算的逻辑

计算每一个TM的slot数量

主要代码在yarn和k8s的ResourceManager里面都有一块这个代码来根据ResourceProfile来计算每个tm上的slot个数

org.apache.flink.kubernetes.runtime.clusterframework.KubernetesResourceManager#calculateTaskManagerSlotNumbers(ResourceProfile resourceProfile, int slotNum)

参数中的ResourceProfile是每一个chain起来的一个task节点,对于chain之后的点,CPU取最大的最大值,内存取总和。ResourceProfile可以得到一个slot所需要的内存和cpu的总量,以及第二个参数是这个类型ResourProfile的slot数,这里称为类型是因为flink申请tm的时候是按照ResourceProfile分类,只有同一类的ResourceProfile才会落在一个TM上。

主要有六个参数综合来协调控制

每个tm最大的slot个数

先计算每个tm最大的slot个数,由每个tm的cpu和memory上限决定,参数是

  1. taskmanager.max.cpu.core: 32
  2. taskmanager.max.memory.mb: 128*1024
  1. int maxSlotsPerTm = Math.min((int) Math.floor(maxCorePerContainer / resourceProfile.getCpuCores()),
  2. (int) Math.floor(1.0 * maxMemoryPerContainer / resourceProfile.getMemoryInMB()));

每个tm最小的slot个数

计算每个tm能分配的最小slot个数,由每个tm的cpu和memory的最小下限决定,参数是

  1. taskmanager.min.cpu.core: 0.25
  2. taskmanager.min.memory.mb: 1024
  1. int minSlotsPerTm = Math.min((int) Math.floor(minCorePerContainer / resourceProfile.getCpuCores()),
  2. (int) Math.floor(1.0 * minMemoryPerContainer / resourceProfile.getMemoryInMB()));

每个tm倾向的slot个数

计算每个tm比较倾向的slot个数,由每个tm prefer的cpu和memory大小决定,参数是

  1. taskmanager.multi-slot.preferred.memory.mb: 4096
  2. taskmanager.multi-slot.preferred.cpu.core: 1.0
  1. int preferredSlotsPerTm = Math.min((int) Math.round(preferredCorePerContainer / resourceProfile.getCpuCores()),
  2. (int) Math.round(1.0 * preferredMemoryPerContainer / resourceProfile.getMemoryInMB()));

得到这三组值,取最终的一个合适的值

  1. if (preferredSlotsPerTm > maxSlotsPerTm) {
  2. preferredSlotsPerTm = maxSlotsPerTm;
  3. }
  4. if (preferredSlotsPerTm < minSlotsPerTm) {
  5. preferredSlotsPerTm = minSlotsPerTm;
  6. }
  7. // calculate number of task managers
  8. int tmNum = slotNum / preferredSlotsPerTm;

接下来如果slotNum能被preferredSlotsPerTm整除得到的TM个数和每个tm的slot数就是

  1. Collections.nCopies(tmNum, preferredSlotsPerTm);

如果不能整除

  1. int maxSlotsPerTmPlan1 = slotNum % tmNum > 0 ? slotNum / tmNum + 1 : slotNum / tmNum;
  2. int minSlotsPerTmPlan2 = slotNum / (tmNum + 1);
  3. if (maxSlotsPerTmPlan1 <= maxSlotsPerTm &&
  4. maxSlotsPerTmPlan1 - preferredSlotsPerTm <= preferredSlotsPerTm - minSlotsPerTmPlan2) {
  5. // choose plan 1
  6. result.addAll(Collections.nCopies(slotNum % tmNum, slotNum / tmNum + 1));
  7. result.addAll(Collections.nCopies(tmNum - slotNum % tmNum, slotNum / tmNum));
  8. } else {
  9. // choose plan 2
  10. result.addAll(Collections.nCopies(slotNum % (tmNum + 1), slotNum / (tmNum + 1) + 1));
  11. result.addAll(Collections.nCopies((tmNum + 1) - slotNum % (tmNum + 1), slotNum / (tmNum + 1)));
  12. }

slotNum分配到tmNum上有余,那么有两个方案

plan1
将没有分配到tm的slot,也就是余数,给每个tm上再分一个
result.addAll(Collections.nCopies(slotNum % tmNum, slotNum / tmNum + 1)); 需要多分配一个slot的tm
result.addAll(Collections.nCopies(tmNum - slotNum % tmNum, slotNum / tmNum));

plan2
多申请一个tm,slotNum % (tmNum + 1)得到多申请一个tm后的余数是多少,和上面的计算方式一致,只是tm+1了
result.addAll(Collections.nCopies(slotNum % (tmNum + 1), slotNum / (tmNum + 1) + 1)); 需要多分配一个slot的tm
result.addAll(Collections.nCopies((tmNum + 1) - slotNum % (tmNum + 1), slotNum / (tmNum + 1)));

选择plan1还是plan2的判断的依据就是

  1. maxSlotsPerTmPlan1 <= maxSlotsPerTm 加一后的每个tm的slot数不超过每个tm的slot上限
  2. maxSlotsPerTmPlan1 - preferredSlotsPerTm <= preferredSlotsPerTm - minSlotsPerTmPlan2 并且+1后每个tm的slot个数和prefer的差值要小于prefer和每个TM最小的slot的差值,其实这个就是看向上向下的偏移量哪个大,取变化更小的一种方式,就会更接近我们的prefer的值

到这里tm数量和slot数已经计算得到了,再根据每个tm的slot数,resourceProfile加框架本身设置的native,direct,heap内存,得到最终tm的规格

org.apache.flink.runtime.clusterframework.types.TaskManagerResource#fromConfiguration(org.apache.flink.configuration.Configuration, org.apache.flink.runtime.clusterframework.types.ResourceProfile, int, int)