版本1.18.1 代码路径:kubernetes/pkg/scheduler/framework/plugins PS:代码是经过我整理、删减,主要是为了自己阅读方便,但是不影响主要逻辑

1、noderesources(关于node资源相关的调度算法)

1.1、前置解析:resourceAllocationScorer

1.1.1、描述

此结构体及相关方法是公用的,NodeResourcesBalancedAllocation、NodeResourcesLeastAllocated、NodeResourcesMostAllocated等均会调用进行资源计算和最终的分值计算

1.1.2、代码解析

  1. var defaultRequestedRatioResources = resourceToWeightMap{v1.ResourceMemory: 1, v1.ResourceCPU: 1}
  2. type resourceToWeightMap map[v1.ResourceName]int64
  3. type resourceAllocationScorer struct {
  4. Name string
  5. scorer func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64
  6. resourceToWeightMap resourceToWeightMap
  7. }
  8. type resourceToValueMap map[v1.ResourceName]int64
  9. func (r *resourceAllocationScorer) score(
  10. pod *v1.Pod,
  11. nodeInfo *schedulernodeinfo.NodeInfo) (int64, *framework.Status) {
  12. node := nodeInfo.Node()
  13. if node == nil {
  14. return 0, framework.NewStatus(framework.Error, "node not found")
  15. }
  16. if r.resourceToWeightMap == nil {
  17. return 0, framework.NewStatus(framework.Error, "resources not found")
  18. }
  19. requested := make(resourceToValueMap, len(r.resourceToWeightMap))
  20. allocatable := make(resourceToValueMap, len(r.resourceToWeightMap))
  21. for resource := range r.resourceToWeightMap {
  22. allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)
  23. }
  24. var score int64
  25. if len(pod.Spec.Volumes) >= 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {
  26. score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
  27. } else {
  28. score = r.scorer(requested, allocatable, false, 0, 0)
  29. }
  30. return score, nil
  31. }
  32. func calculateResourceAllocatableRequest(nodeInfo *schedulernodeinfo.NodeInfo, pod *v1.Pod, resource v1.ResourceName) (int64, int64) {
  33. allocatable := nodeInfo.AllocatableResource()
  34. requested := nodeInfo.RequestedResource()
  35. podRequest := calculatePodResourceRequest(pod, resource)
  36. switch resource {
  37. case v1.ResourceCPU:
  38. return allocatable.MilliCPU, nodeInfo.NonZeroRequest().MilliCPU + podRequest
  39. case v1.ResourceMemory:
  40. return allocatable.Memory, nodeInfo.NonZeroRequest().Memory + podRequest
  41. case v1.ResourceEphemeralStorage:
  42. return allocatable.EphemeralStorage, requested.EphemeralStorage + podRequest
  43. default:
  44. if v1helper.IsScalarResourceName(resource) {
  45. return allocatable.ScalarResources[resource], requested.ScalarResources[resource] + podRequest
  46. }
  47. }
  48. return 0, 0
  49. }
  50. func calculatePodResourceRequest(pod *v1.Pod, resource v1.ResourceName) int64 {
  51. var podRequest int64
  52. for i := range pod.Spec.Containers {
  53. container := &pod.Spec.Containers[i]
  54. value := schedutil.GetNonzeroRequestForResource(resource, &container.Resources.Requests)
  55. podRequest += value
  56. }
  57. for i := range pod.Spec.InitContainers {
  58. initContainer := &pod.Spec.InitContainers[i]
  59. value := schedutil.GetNonzeroRequestForResource(resource, &initContainer.Resources.Requests)
  60. if podRequest < value {
  61. podRequest = value
  62. }
  63. }
  64. if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodOverhead) {
  65. if quantity, found := pod.Spec.Overhead[resource]; found {
  66. podRequest += quantity.Value()
  67. }
  68. }
  69. return podRequest
  70. }

1.2、NodeResourcesFit(PreFilter、Filter扩展点)

1.2.1、算法功能

该算法主要是检验给node是否能够满足pod的资源要求,只出现在预选阶段

1.2.2、代码解析

  1. const (
  2. FitName = "NodeResourcesFit"
  3. preFilterStateKey = "PreFilter" + FitName
  4. )
  5. type Fit struct {
  6. ignoredResources sets.String
  7. }
  8. type FitArgs struct {
  9. // 用以忽略一些资源的校验
  10. IgnoredResources []string `json:"ignoredResources,omitempty"`
  11. }
  12. func NewFit(plArgs *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
  13. args := &FitArgs{}
  14. if err := framework.DecodeInto(plArgs, args); err != nil {
  15. return nil, err
  16. }
  17. fit := &Fit{}
  18. fit.ignoredResources = sets.NewString(args.IgnoredResources...)
  19. return fit, nil
  20. }
  21. func (f *Fit) Name() string {
  22. return FitName
  23. }
  24. // 将pod的资源申请写入CycleState供Filter扩展点使用
  25. func (f *Fit) PreFilter(_ context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
  26. cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))
  27. return nil
  28. }
  29. func (f *Fit) PreFilterExtensions() framework.PreFilterExtensions {
  30. return nil
  31. }
  32. func (f *Fit) Filter(_ context.Context, cycleState *framework.CycleState, _ *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
  33. // 获取pod资源数据
  34. s, err := getPreFilterState(cycleState)
  35. if err != nil {
  36. return framework.NewStatus(framework.Error, err.Error())
  37. }
  38. // 判断资源是否满足
  39. insufficientResources := fitsRequest(s, nodeInfo, f.ignoredResources)
  40. // 返回结果
  41. if len(insufficientResources) != 0 {
  42. failureReasons := make([]string, 0, len(insufficientResources))
  43. for _, r := range insufficientResources {
  44. failureReasons = append(failureReasons, r.Reason)
  45. }
  46. return framework.NewStatus(framework.Unschedulable, failureReasons...)
  47. }
  48. return nil
  49. }
  50. type preFilterState struct {
  51. schedulernodeinfo.Resource
  52. }
  53. func (s *preFilterState) Clone() framework.StateData {
  54. return s
  55. }
  56. // 何种资源导致filter失败
  57. type InsufficientResource struct {
  58. ResourceName v1.ResourceName
  59. Reason string
  60. Requested int64
  61. Used int64
  62. Capacity int64
  63. }
  64. func Fits(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ignoredExtendedResources sets.String) []InsufficientResource {
  65. return fitsRequest(computePodResourceRequest(pod), nodeInfo, ignoredExtendedResources)
  66. }
  67. func fitsRequest(podRequest *preFilterState, nodeInfo *schedulernodeinfo.NodeInfo, ignoredExtendedResources sets.String) []InsufficientResource {
  68. // 此处cap设置为4应该是默认一般不包含扩展资源
  69. // 因此极大概率不会发送切片扩容
  70. insufficientResources := make([]InsufficientResource, 0, 4)
  71. allowedPodNumber := nodeInfo.AllowedPodNumber()
  72. // pod太多
  73. if len(nodeInfo.Pods())+1 > allowedPodNumber {
  74. insufficientResources = append(insufficientResources, InsufficientResource{
  75. v1.ResourcePods,
  76. "Too many pods",
  77. 1,
  78. int64(len(nodeInfo.Pods())),
  79. int64(allowedPodNumber),
  80. })
  81. }
  82. if ignoredExtendedResources == nil {
  83. ignoredExtendedResources = sets.NewString()
  84. }
  85. // pod未申请任何资源
  86. if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.EphemeralStorage == 0 && len(podRequest.ScalarResources) == 0 {
  87. return insufficientResources
  88. }
  89. allocatable := nodeInfo.AllocatableResource()
  90. // 判断可分配cpu是否满足
  91. if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
  92. insufficientResources = append(insufficientResources, InsufficientResource{
  93. v1.ResourceCPU,
  94. "Insufficient cpu",
  95. podRequest.MilliCPU,
  96. nodeInfo.RequestedResource().MilliCPU,
  97. allocatable.MilliCPU,
  98. })
  99. }
  100. // 判断可分配内存是否满足
  101. if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
  102. insufficientResources = append(insufficientResources, InsufficientResource{
  103. v1.ResourceMemory,
  104. "Insufficient memory",
  105. podRequest.Memory,
  106. nodeInfo.RequestedResource().Memory,
  107. allocatable.Memory,
  108. })
  109. }
  110. // 判断可分配临时存储是否满足
  111. if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
  112. insufficientResources = append(insufficientResources, InsufficientResource{
  113. v1.ResourceEphemeralStorage,
  114. "Insufficient ephemeral-storage",
  115. podRequest.EphemeralStorage,
  116. nodeInfo.RequestedResource().EphemeralStorage,
  117. allocatable.EphemeralStorage,
  118. })
  119. }
  120. // 判断扩展资源,诸如gpu
  121. for rName, rQuant := range podRequest.ScalarResources {
  122. if v1helper.IsExtendedResourceName(rName) {
  123. if ignoredExtendedResources.Has(string(rName)) {
  124. continue
  125. }
  126. }
  127. if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
  128. insufficientResources = append(insufficientResources, InsufficientResource{
  129. rName,
  130. fmt.Sprintf("Insufficient %v", rName),
  131. podRequest.ScalarResources[rName],
  132. nodeInfo.RequestedResource().ScalarResources[rName],
  133. allocatable.ScalarResources[rName],
  134. })
  135. }
  136. }
  137. return insufficientResources
  138. }
  139. // 计算pod所需资源
  140. // 首先计算所有容器需要的资源
  141. // 然后根据init容器所需资源计算最大值
  142. // 最后计算额外资源(类似于pause容器等资源的消耗)
  143. func computePodResourceRequest(pod *v1.Pod) *preFilterState {
  144. result := &preFilterState{}
  145. for _, container := range pod.Spec.Containers {
  146. result.Add(container.Resources.Requests)
  147. }
  148. for _, container := range pod.Spec.InitContainers {
  149. result.SetMaxResource(container.Resources.Requests)
  150. }
  151. if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodOverhead) {
  152. result.Add(pod.Spec.Overhead)
  153. }
  154. return result
  155. }
  156. // 获取preFilter填充的状态
  157. func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
  158. c, err := cycleState.Read(preFilterStateKey)
  159. if err != nil {
  160. return nil, fmt.Errorf("error reading %q from cycleState: %v", preFilterStateKey, err)
  161. }
  162. s, ok := c.(*preFilterState)
  163. if !ok {
  164. return nil, fmt.Errorf("%+v convert to NodeResourcesFit.preFilterState error", c)
  165. }
  166. return s, nil
  167. }

1.3、NodeResourcesLeastAllocated(Score扩展点)

1.3.1、算法功能

该算法主要是用于优选当前资源利用低的node节点且一般和NodeResourcesBalancedAllocation一起使用

1.3.2、代码解析

  1. const LeastAllocatedName = "NodeResourcesLeastAllocated"
  2. type LeastAllocated struct {
  3. resourceAllocationScorer
  4. handle framework.FrameworkHandle
  5. }
  6. func NewLeastAllocated(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
  7. return &LeastAllocated{
  8. handle: h,
  9. resourceAllocationScorer: resourceAllocationScorer{
  10. LeastAllocatedName,
  11. // 核心函数,用以计算node分值
  12. leastResourceScorer,
  13. defaultRequestedRatioResources,
  14. },
  15. }, nil
  16. }
  17. func (la *LeastAllocated) Name() string {
  18. return LeastAllocatedName
  19. }
  20. func (la *LeastAllocated) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  21. nodeInfo, err := la.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
  22. if err != nil {
  23. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
  24. }
  25. return la.score(pod, nodeInfo)
  26. }
  27. func (la *LeastAllocated) ScoreExtensions() framework.ScoreExtensions {
  28. return nil
  29. }
  30. // 计算方式为每种资源得分乘以权重并进行累加
  31. // 然后再根据权重综合进行除法得到最终得分
  32. // 假设:
  33. // node机器内存资源得分为50,权重为1
  34. // node机器cpu资源得分为60,权重为1
  35. // 则机器总分为50 * 1 + 60 * 1 = 110
  36. // 最终得分为110 / 2 = 55分
  37. // 相当于根据各个资源的得分进行总体平局得分计算
  38. func leastResourceScorer(requested, allocable resourceToValueMap, _ bool, _ int, _ int) int64 {
  39. var nodeScore, weightSum int64
  40. for resource, weight := range defaultRequestedRatioResources {
  41. resourceScore := leastRequestedScore(requested[resource], allocable[resource])
  42. nodeScore += resourceScore * weight
  43. weightSum += weight
  44. }
  45. return nodeScore / weightSum
  46. }
  47. // 算法计算方式
  48. // 剩余资源越多,得分越高
  49. // 以内存为例:
  50. // 假设当前可用内存总量8G,POD申请4G,则Node得分50
  51. // 假设当前可用内存总量8G,POD申请2G,则Node得分75分
  52. // 资源利用低的node更容器被选择
  53. func leastRequestedScore(requested, capacity int64) int64 {
  54. if capacity == 0 {
  55. return 0
  56. }
  57. if requested > capacity {
  58. return 0
  59. }
  60. return ((capacity - requested) * framework.MaxNodeScore) / capacity
  61. }

1.4、NodeResourcesBalancedAllocation(Score扩展点)

1.4.1、算法功能

该算法主要是用于优选当前cpu利用、内存利用率相对均衡的机器。且一般和NodeResourcesLeastAllocated使用
假设:
机器A cpu利用率80%,内存利用率20%
机器B cpu利用率60%,内存利用率50%
则机器B更容器被选择

1.4.2、代码解析

  1. const BalancedAllocationName = "NodeResourcesBalancedAllocation"
  2. type BalancedAllocation struct {
  3. resourceAllocationScorer
  4. handle framework.FrameworkHandle
  5. }
  6. func NewBalancedAllocation(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
  7. return &BalancedAllocation{
  8. handle: h,
  9. resourceAllocationScorer: resourceAllocationScorer{
  10. BalancedAllocationName,
  11. // 核心函数,用以计算node分值
  12. balancedResourceScorer,
  13. defaultRequestedRatioResources,
  14. },
  15. }, nil
  16. }
  17. func (ba *BalancedAllocation) Name() string {
  18. return BalancedAllocationName
  19. }
  20. func (ba *BalancedAllocation) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  21. nodeInfo, err := ba.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
  22. if err != nil {
  23. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
  24. }
  25. return ba.score(pod, nodeInfo)
  26. }
  27. func (ba *BalancedAllocation) ScoreExtensions() framework.ScoreExtensions {
  28. return nil
  29. }
  30. // 核心函数
  31. func balancedResourceScorer(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
  32. // 计算cpu/mem各自的申请比例
  33. cpuFraction := fractionOfCapacity(requested[v1.ResourceCPU], allocable[v1.ResourceCPU])
  34. memoryFraction := fractionOfCapacity(requested[v1.ResourceMemory], allocable[v1.ResourceMemory])
  35. // 如果有一个大于或者等于1,则得分为0
  36. if cpuFraction >= 1 || memoryFraction >= 1 {
  37. return 0
  38. }
  39. // 包含volume的判断
  40. if includeVolumes && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && allocatableVolumes > 0 {
  41. // 计算volume比例
  42. volumeFraction := float64(requestedVolumes) / float64(allocatableVolumes)
  43. // 大于等于1,则得分为0
  44. if volumeFraction >= 1 {
  45. return 0
  46. }
  47. // 计算3种资源的变化幅度
  48. mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)
  49. // 本质上是通过方差来获取一个离散程度描述
  50. // 假设:
  51. // 机器A:cpu申请比例0.5,mem申请0.6,volume申请0.1,则mean为0.4
  52. // 机器B:cpu申请比例0.8,mem申请0.2,volume申请0.5,则mean为0.5
  53. // 则机器A variance = (0.1*0.1 + 0.2*0.2 + 0.3*0.3)/3=0.5
  54. // 则机器B variance = (0.3*0.3 + 0.3*0.3 + 0*0)/3=0.6
  55. // 则机器A更容器被选择,因此机器A各个资源偏离mean相对均衡
  56. // 而机器B有一个极端值0,导致机器B的偏离程度不均衡
  57. variance := (((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3)
  58. return int64((1 - variance) * float64(framework.MaxNodeScore))
  59. }
  60. // diff越大代表,cpu和mem本次资源申请占用比例越不均衡
  61. // 假设:
  62. // 机器A:cpu申请比例0.5,mem申请0.6,则diff为0.2
  63. // 机器B:cpu申请比例0.8,mem申请0.2,则diff为0.6
  64. // 则机器A得分:(1 - 0.2) * 100 = 80
  65. // 则机器B得分:(1 - 0.6) * 100 = 40
  66. // 显然,资源申请之后,cpu和mem剩余资源越均衡,越容易被选择
  67. diff := math.Abs(cpuFraction - memoryFraction)
  68. return int64((1 - diff) * float64(framework.MaxNodeScore))
  69. }
  70. // 计算申请和可用的比例
  71. func fractionOfCapacity(requested, capacity int64) float64 {
  72. if capacity == 0 {
  73. return 1
  74. }
  75. return float64(requested) / float64(capacity)
  76. }

1.5、NodeResourcesMostAllocated(Score扩展点)

1.5.1、算法功能

该算法主要是用于优选当前资源利用高的node节点,这样做的选择是为了给大资源申请对象预留。比如ai场景。如果资源分布比较平均,可能总可分配资源可以达到要求,但是单台node可能资源不能满足,导致无法调度

1.5.2、代码解析

  1. const MostAllocatedName = "NodeResourcesMostAllocated"
  2. type MostAllocated struct {
  3. resourceAllocationScorer
  4. handle framework.FrameworkHandle
  5. }
  6. func NewMostAllocated(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
  7. return &MostAllocated{
  8. handle: h,
  9. resourceAllocationScorer: resourceAllocationScorer{
  10. MostAllocatedName,
  11. // 核心函数,用以计算node分值
  12. mostResourceScorer,
  13. defaultRequestedRatioResources,
  14. },
  15. }, nil
  16. }
  17. func (ma *MostAllocated) Name() string {
  18. return MostAllocatedName
  19. }
  20. func (ma *MostAllocated) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  21. nodeInfo, err := ma.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
  22. if err != nil || nodeInfo.Node() == nil {
  23. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
  24. }
  25. return ma.score(pod, nodeInfo)
  26. }
  27. func (ma *MostAllocated) ScoreExtensions() framework.ScoreExtensions {
  28. return nil
  29. }
  30. // 计算方式为每种资源得分乘以权重并进行累加
  31. // 然后再根据权重综合进行除法得到最终得分
  32. // 假设:
  33. // node机器内存资源得分为50,权重为1
  34. // node机器cpu资源得分为60,权重为1
  35. // 则机器总分为50 * 1 + 60 * 1 = 110
  36. // 最终得分为110 / 2 = 55分
  37. // 相当于根据各个资源的得分进行总体平局得分计算
  38. func mostResourceScorer(requested, allocable resourceToValueMap, _ bool, _ int, _ int) int64 {
  39. var nodeScore, weightSum int64
  40. for resource, weight := range defaultRequestedRatioResources {
  41. resourceScore := mostRequestedScore(requested[resource], allocable[resource])
  42. nodeScore += resourceScore * weight
  43. weightSum += weight
  44. }
  45. return nodeScore / weightSum
  46. }
  47. // 算法计算方式
  48. // 剩余资源越少,得分越高
  49. // 以内存为例:
  50. // 假设当前可用内存总量8G,POD申请4G,则Node得分50
  51. // 假设当前可用内存总量8G,POD申请2G,则Node得分25分
  52. // 资源利用低的node更容器被选择
  53. func mostRequestedScore(requested, capacity int64) int64 {
  54. if capacity == 0 {
  55. return 0
  56. }
  57. if requested > capacity {
  58. return 0
  59. }
  60. return (requested * framework.MaxNodeScore) / capacity
  61. }

1.6、NodeResourceLimits(PreScore/Score扩展点)

1.6.1、算法功能

该算法主要是用于优选pod配置的limit小于node可分配资源的节点

1.6.2、代码解析

  1. const (
  2. ResourceLimitsName = "NodeResourceLimits"
  3. preScoreStateKey = "PreScore" + ResourceLimitsName
  4. )
  5. type ResourceLimits struct {
  6. handle framework.FrameworkHandle
  7. }
  8. func NewResourceLimits(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
  9. return &ResourceLimits{handle: h}, nil
  10. }
  11. func (rl *ResourceLimits) Name() string {
  12. return ResourceLimitsName
  13. }
  14. func (rl *ResourceLimits) PreScore(
  15. _ context.Context,
  16. cycleState *framework.CycleState,
  17. pod *v1.Pod,
  18. nodes []*v1.Node,
  19. ) *framework.Status {
  20. if len(nodes) == 0 {
  21. return nil
  22. }
  23. if rl.handle.SnapshotSharedLister() == nil {
  24. return framework.NewStatus(framework.Error, fmt.Sprintf("empty shared lister"))
  25. }
  26. s := &preScoreState{
  27. podResourceRequest: getResourceLimits(pod),
  28. }
  29. cycleState.Write(preScoreStateKey, s)
  30. return nil
  31. }
  32. func (rl *ResourceLimits) Score(_ context.Context, state *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
  33. nodeInfo, err := rl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
  34. if err != nil || nodeInfo.Node() == nil {
  35. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
  36. }
  37. allocatableResources := nodeInfo.AllocatableResource()
  38. podLimits, err := getPodResource(state)
  39. if err != nil {
  40. return 0, framework.NewStatus(framework.Error, err.Error())
  41. }
  42. cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU)
  43. memScore := computeScore(podLimits.Memory, allocatableResources.Memory)
  44. // 只要cpu或者mem满足,就得一分
  45. score := int64(0)
  46. if cpuScore == 1 || memScore == 1 {
  47. score = 1
  48. }
  49. return score, nil
  50. }
  51. func (rl *ResourceLimits) ScoreExtensions() framework.ScoreExtensions {
  52. return nil
  53. }
  54. type preScoreState struct {
  55. podResourceRequest *schedulernodeinfo.Resource
  56. }
  57. func (s *preScoreState) Clone() framework.StateData {
  58. return s
  59. }
  60. func getPodResource(cycleState *framework.CycleState) (*schedulernodeinfo.Resource, error) {
  61. c, err := cycleState.Read(preScoreStateKey)
  62. if err != nil {
  63. return nil, fmt.Errorf("Error reading %q from cycleState: %v", preScoreStateKey, err)
  64. }
  65. s, ok := c.(*preScoreState)
  66. if !ok {
  67. return nil, fmt.Errorf("%+v convert to ResourceLimits.preScoreState error", c)
  68. }
  69. return s.podResourceRequest, nil
  70. }
  71. // 获取Containers的limit
  72. // 并根据InitContainers计算最大值
  73. func getResourceLimits(pod *v1.Pod) *schedulernodeinfo.Resource {
  74. result := &schedulernodeinfo.Resource{}
  75. for _, container := range pod.Spec.Containers {
  76. result.Add(container.Resources.Limits)
  77. }
  78. for _, container := range pod.Spec.InitContainers {
  79. result.SetMaxResource(container.Resources.Limits)
  80. }
  81. return result
  82. }
  83. // 若limit配置小于可分配资源,则得一分
  84. func computeScore(limit, allocatable int64) int64 {
  85. if limit != 0 && allocatable != 0 && limit <= allocatable {
  86. return 1
  87. }
  88. return 0
  89. }

1.7、RequestedToCapacityRatio(Score扩展点)

1.7.1、算法功能

1.7.2、代码解析

2、nodeaffinity(关于node亲和性相关的调度算法)

2.1、NodeAffinity(Filter/Score扩展点)

2.1.1、算法功能

该算法主要描述node亲和性,主要用来解决pod绑定到给定条件的node上面,实现pod和node的亲和性调度。比如:pod1需要调度到有ssd盘的node上面,pod2需要调度到cpu配置较好的node上面等

2.1.2、代码解析

  1. const (
  2. Name = "NodeAffinity"
  3. ErrReason = "node(s) didn't match node selector"
  4. )
  5. type NodeAffinity struct {
  6. handle framework.FrameworkHandle
  7. }
  8. func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
  9. return &NodeAffinity{handle: h}, nil
  10. }
  11. func (pl *NodeAffinity) Name() string {
  12. return Name
  13. }
  14. // 预选插件
  15. func (pl *NodeAffinity) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
  16. node := nodeInfo.Node()
  17. if node == nil {
  18. return framework.NewStatus(framework.Error, "node not found")
  19. }
  20. if !pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
  21. return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason)
  22. }
  23. return nil
  24. }
  25. // 优选插件
  26. func (pl *NodeAffinity) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  27. nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
  28. if err != nil {
  29. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
  30. }
  31. node := nodeInfo.Node()
  32. if node == nil {
  33. return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
  34. }
  35. affinity := pod.Spec.Affinity
  36. var count int64
  37. if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
  38. // 依次遍历所有的优选配置
  39. // 若符合则累加相应的权重
  40. // 最后返回得分
  41. for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
  42. preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
  43. // 权重为0的不处理
  44. if preferredSchedulingTerm.Weight == 0 {
  45. continue
  46. }
  47. nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
  48. if err != nil {
  49. return 0, framework.NewStatus(framework.Error, err.Error())
  50. }
  51. if nodeSelector.Matches(labels.Set(node.Labels)) {
  52. count += int64(preferredSchedulingTerm.Weight)
  53. }
  54. }
  55. }
  56. return count, nil
  57. }
  58. func (pl *NodeAffinity) NormalizeScore(_ context.Context, _ *framework.CycleState, _ *v1.Pod, scores framework.NodeScoreList) *framework.Status {
  59. return pluginhelper.DefaultNormalizeScore(framework.MaxNodeScore, false, scores)
  60. }
  61. func (pl *NodeAffinity) ScoreExtensions() framework.ScoreExtensions {
  62. return pl
  63. }
  64. // 预选阶段判断node亲和性
  65. // 1、node的label必须与pod的NodeSelector一致
  66. // 2、在1的基础上再判断node亲和性
  67. func PodMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool {
  68. // 判断label
  69. if len(pod.Spec.NodeSelector) > 0 {
  70. selector := labels.SelectorFromSet(pod.Spec.NodeSelector)
  71. if !selector.Matches(labels.Set(node.Labels)) {
  72. return false
  73. }
  74. }
  75. // 只要有一个NodeSelectorTerm符合条件,就return true
  76. // 且MatchFields目前只支持node的metadata.name
  77. nodeAffinityMatches := true
  78. affinity := pod.Spec.Affinity
  79. if affinity != nil && affinity.NodeAffinity != nil {
  80. nodeAffinity := affinity.NodeAffinity
  81. // 若没有设置RequiredDuringSchedulingIgnoredDuringExecution,直接返回
  82. if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
  83. return true
  84. }
  85. // 查看node的label是否符合pod配置的亲和性
  86. nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
  87. nodeAffinityMatches = nodeAffinityMatches && nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)
  88. }
  89. return nodeAffinityMatches
  90. }
  91. func nodeMatchesNodeSelectorTerms(node *v1.Node, nodeSelectorTerms []v1.NodeSelectorTerm) bool {
  92. return v1helper.MatchNodeSelectorTerms(nodeSelectorTerms, node.Labels, fields.Set{
  93. "metadata.name": node.Name,
  94. })
  95. }
  96. // DefaultNormalizeScore generates a Normalize Score function that can normalize the
  97. // scores to [0, maxPriority]. If reverse is set to true, it reverses the scores by
  98. // subtracting it from maxPriority.
  99. func DefaultNormalizeScore(maxPriority int64, reverse bool, scores framework.NodeScoreList) *framework.Status {
  100. var maxCount int64
  101. for i := range scores {
  102. if scores[i].Score > maxCount {
  103. maxCount = scores[i].Score
  104. }
  105. }
  106. if maxCount == 0 {
  107. if reverse {
  108. for i := range scores {
  109. scores[i].Score = maxPriority
  110. }
  111. }
  112. return nil
  113. }
  114. for i := range scores {
  115. score := scores[i].Score
  116. score = maxPriority * score / maxCount
  117. if reverse {
  118. score = maxPriority - score
  119. }
  120. scores[i].Score = score
  121. }
  122. return nil
  123. }