- 1、noderesources(关于node资源相关的调度算法)
- 2、nodeaffinity(关于node亲和性相关的调度算法)
版本1.18.1 代码路径:kubernetes/pkg/scheduler/framework/plugins PS:代码是经过我整理、删减,主要是为了自己阅读方便,但是不影响主要逻辑
1、noderesources(关于node资源相关的调度算法)
1.1、前置解析:resourceAllocationScorer
1.1.1、描述
此结构体及相关方法是公用的,NodeResourcesBalancedAllocation、NodeResourcesLeastAllocated、NodeResourcesMostAllocated等均会调用进行资源计算和最终的分值计算
1.1.2、代码解析
var defaultRequestedRatioResources = resourceToWeightMap{v1.ResourceMemory: 1, v1.ResourceCPU: 1}type resourceToWeightMap map[v1.ResourceName]int64type resourceAllocationScorer struct {Name stringscorer func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64resourceToWeightMap resourceToWeightMap}type resourceToValueMap map[v1.ResourceName]int64func (r *resourceAllocationScorer) score(pod *v1.Pod,nodeInfo *schedulernodeinfo.NodeInfo) (int64, *framework.Status) {node := nodeInfo.Node()if node == nil {return 0, framework.NewStatus(framework.Error, "node not found")}if r.resourceToWeightMap == nil {return 0, framework.NewStatus(framework.Error, "resources not found")}requested := make(resourceToValueMap, len(r.resourceToWeightMap))allocatable := make(resourceToValueMap, len(r.resourceToWeightMap))for resource := range r.resourceToWeightMap {allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)}var score int64if len(pod.Spec.Volumes) >= 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)} else {score = r.scorer(requested, allocatable, false, 0, 0)}return score, nil}func calculateResourceAllocatableRequest(nodeInfo *schedulernodeinfo.NodeInfo, pod *v1.Pod, resource v1.ResourceName) (int64, int64) {allocatable := nodeInfo.AllocatableResource()requested := nodeInfo.RequestedResource()podRequest := calculatePodResourceRequest(pod, resource)switch resource {case v1.ResourceCPU:return allocatable.MilliCPU, nodeInfo.NonZeroRequest().MilliCPU + podRequestcase v1.ResourceMemory:return allocatable.Memory, nodeInfo.NonZeroRequest().Memory + podRequestcase v1.ResourceEphemeralStorage:return allocatable.EphemeralStorage, requested.EphemeralStorage + podRequestdefault:if v1helper.IsScalarResourceName(resource) {return allocatable.ScalarResources[resource], requested.ScalarResources[resource] + podRequest}}return 0, 0}func calculatePodResourceRequest(pod *v1.Pod, resource v1.ResourceName) int64 {var podRequest int64for i := range pod.Spec.Containers {container := &pod.Spec.Containers[i]value := schedutil.GetNonzeroRequestForResource(resource, &container.Resources.Requests)podRequest += value}for i := range pod.Spec.InitContainers {initContainer := &pod.Spec.InitContainers[i]value := schedutil.GetNonzeroRequestForResource(resource, &initContainer.Resources.Requests)if podRequest < value {podRequest = value}}if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodOverhead) {if quantity, found := pod.Spec.Overhead[resource]; found {podRequest += quantity.Value()}}return podRequest}
1.2、NodeResourcesFit(PreFilter、Filter扩展点)
1.2.1、算法功能
该算法主要是检验给node是否能够满足pod的资源要求,只出现在预选阶段
1.2.2、代码解析
const (FitName = "NodeResourcesFit"preFilterStateKey = "PreFilter" + FitName)type Fit struct {ignoredResources sets.String}type FitArgs struct {// 用以忽略一些资源的校验IgnoredResources []string `json:"ignoredResources,omitempty"`}func NewFit(plArgs *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {args := &FitArgs{}if err := framework.DecodeInto(plArgs, args); err != nil {return nil, err}fit := &Fit{}fit.ignoredResources = sets.NewString(args.IgnoredResources...)return fit, nil}func (f *Fit) Name() string {return FitName}// 将pod的资源申请写入CycleState供Filter扩展点使用func (f *Fit) PreFilter(_ context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))return nil}func (f *Fit) PreFilterExtensions() framework.PreFilterExtensions {return nil}func (f *Fit) Filter(_ context.Context, cycleState *framework.CycleState, _ *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {// 获取pod资源数据s, err := getPreFilterState(cycleState)if err != nil {return framework.NewStatus(framework.Error, err.Error())}// 判断资源是否满足insufficientResources := fitsRequest(s, nodeInfo, f.ignoredResources)// 返回结果if len(insufficientResources) != 0 {failureReasons := make([]string, 0, len(insufficientResources))for _, r := range insufficientResources {failureReasons = append(failureReasons, r.Reason)}return framework.NewStatus(framework.Unschedulable, failureReasons...)}return nil}type preFilterState struct {schedulernodeinfo.Resource}func (s *preFilterState) Clone() framework.StateData {return s}// 何种资源导致filter失败type InsufficientResource struct {ResourceName v1.ResourceNameReason stringRequested int64Used int64Capacity int64}func Fits(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ignoredExtendedResources sets.String) []InsufficientResource {return fitsRequest(computePodResourceRequest(pod), nodeInfo, ignoredExtendedResources)}func fitsRequest(podRequest *preFilterState, nodeInfo *schedulernodeinfo.NodeInfo, ignoredExtendedResources sets.String) []InsufficientResource {// 此处cap设置为4应该是默认一般不包含扩展资源// 因此极大概率不会发送切片扩容insufficientResources := make([]InsufficientResource, 0, 4)allowedPodNumber := nodeInfo.AllowedPodNumber()// pod太多if len(nodeInfo.Pods())+1 > allowedPodNumber {insufficientResources = append(insufficientResources, InsufficientResource{v1.ResourcePods,"Too many pods",1,int64(len(nodeInfo.Pods())),int64(allowedPodNumber),})}if ignoredExtendedResources == nil {ignoredExtendedResources = sets.NewString()}// pod未申请任何资源if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.EphemeralStorage == 0 && len(podRequest.ScalarResources) == 0 {return insufficientResources}allocatable := nodeInfo.AllocatableResource()// 判断可分配cpu是否满足if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {insufficientResources = append(insufficientResources, InsufficientResource{v1.ResourceCPU,"Insufficient cpu",podRequest.MilliCPU,nodeInfo.RequestedResource().MilliCPU,allocatable.MilliCPU,})}// 判断可分配内存是否满足if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {insufficientResources = append(insufficientResources, InsufficientResource{v1.ResourceMemory,"Insufficient memory",podRequest.Memory,nodeInfo.RequestedResource().Memory,allocatable.Memory,})}// 判断可分配临时存储是否满足if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {insufficientResources = append(insufficientResources, InsufficientResource{v1.ResourceEphemeralStorage,"Insufficient ephemeral-storage",podRequest.EphemeralStorage,nodeInfo.RequestedResource().EphemeralStorage,allocatable.EphemeralStorage,})}// 判断扩展资源,诸如gpufor rName, rQuant := range podRequest.ScalarResources {if v1helper.IsExtendedResourceName(rName) {if ignoredExtendedResources.Has(string(rName)) {continue}}if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {insufficientResources = append(insufficientResources, InsufficientResource{rName,fmt.Sprintf("Insufficient %v", rName),podRequest.ScalarResources[rName],nodeInfo.RequestedResource().ScalarResources[rName],allocatable.ScalarResources[rName],})}}return insufficientResources}// 计算pod所需资源// 首先计算所有容器需要的资源// 然后根据init容器所需资源计算最大值// 最后计算额外资源(类似于pause容器等资源的消耗)func computePodResourceRequest(pod *v1.Pod) *preFilterState {result := &preFilterState{}for _, container := range pod.Spec.Containers {result.Add(container.Resources.Requests)}for _, container := range pod.Spec.InitContainers {result.SetMaxResource(container.Resources.Requests)}if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodOverhead) {result.Add(pod.Spec.Overhead)}return result}// 获取preFilter填充的状态func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {c, err := cycleState.Read(preFilterStateKey)if err != nil {return nil, fmt.Errorf("error reading %q from cycleState: %v", preFilterStateKey, err)}s, ok := c.(*preFilterState)if !ok {return nil, fmt.Errorf("%+v convert to NodeResourcesFit.preFilterState error", c)}return s, nil}
1.3、NodeResourcesLeastAllocated(Score扩展点)
1.3.1、算法功能
该算法主要是用于优选当前资源利用低的node节点且一般和NodeResourcesBalancedAllocation一起使用
1.3.2、代码解析
const LeastAllocatedName = "NodeResourcesLeastAllocated"type LeastAllocated struct {resourceAllocationScorerhandle framework.FrameworkHandle}func NewLeastAllocated(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {return &LeastAllocated{handle: h,resourceAllocationScorer: resourceAllocationScorer{LeastAllocatedName,// 核心函数,用以计算node分值leastResourceScorer,defaultRequestedRatioResources,},}, nil}func (la *LeastAllocated) Name() string {return LeastAllocatedName}func (la *LeastAllocated) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {nodeInfo, err := la.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)if err != nil {return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))}return la.score(pod, nodeInfo)}func (la *LeastAllocated) ScoreExtensions() framework.ScoreExtensions {return nil}// 计算方式为每种资源得分乘以权重并进行累加// 然后再根据权重综合进行除法得到最终得分// 假设:// node机器内存资源得分为50,权重为1// node机器cpu资源得分为60,权重为1// 则机器总分为50 * 1 + 60 * 1 = 110// 最终得分为110 / 2 = 55分// 相当于根据各个资源的得分进行总体平局得分计算func leastResourceScorer(requested, allocable resourceToValueMap, _ bool, _ int, _ int) int64 {var nodeScore, weightSum int64for resource, weight := range defaultRequestedRatioResources {resourceScore := leastRequestedScore(requested[resource], allocable[resource])nodeScore += resourceScore * weightweightSum += weight}return nodeScore / weightSum}// 算法计算方式// 剩余资源越多,得分越高// 以内存为例:// 假设当前可用内存总量8G,POD申请4G,则Node得分50// 假设当前可用内存总量8G,POD申请2G,则Node得分75分// 资源利用低的node更容器被选择func leastRequestedScore(requested, capacity int64) int64 {if capacity == 0 {return 0}if requested > capacity {return 0}return ((capacity - requested) * framework.MaxNodeScore) / capacity}
1.4、NodeResourcesBalancedAllocation(Score扩展点)
1.4.1、算法功能
该算法主要是用于优选当前cpu利用、内存利用率相对均衡的机器。且一般和NodeResourcesLeastAllocated使用
假设:
机器A cpu利用率80%,内存利用率20%
机器B cpu利用率60%,内存利用率50%
则机器B更容器被选择
1.4.2、代码解析
const BalancedAllocationName = "NodeResourcesBalancedAllocation"type BalancedAllocation struct {resourceAllocationScorerhandle framework.FrameworkHandle}func NewBalancedAllocation(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {return &BalancedAllocation{handle: h,resourceAllocationScorer: resourceAllocationScorer{BalancedAllocationName,// 核心函数,用以计算node分值balancedResourceScorer,defaultRequestedRatioResources,},}, nil}func (ba *BalancedAllocation) Name() string {return BalancedAllocationName}func (ba *BalancedAllocation) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {nodeInfo, err := ba.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)if err != nil {return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))}return ba.score(pod, nodeInfo)}func (ba *BalancedAllocation) ScoreExtensions() framework.ScoreExtensions {return nil}// 核心函数func balancedResourceScorer(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {// 计算cpu/mem各自的申请比例cpuFraction := fractionOfCapacity(requested[v1.ResourceCPU], allocable[v1.ResourceCPU])memoryFraction := fractionOfCapacity(requested[v1.ResourceMemory], allocable[v1.ResourceMemory])// 如果有一个大于或者等于1,则得分为0if cpuFraction >= 1 || memoryFraction >= 1 {return 0}// 包含volume的判断if includeVolumes && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && allocatableVolumes > 0 {// 计算volume比例volumeFraction := float64(requestedVolumes) / float64(allocatableVolumes)// 大于等于1,则得分为0if volumeFraction >= 1 {return 0}// 计算3种资源的变化幅度mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)// 本质上是通过方差来获取一个离散程度描述// 假设:// 机器A:cpu申请比例0.5,mem申请0.6,volume申请0.1,则mean为0.4// 机器B:cpu申请比例0.8,mem申请0.2,volume申请0.5,则mean为0.5// 则机器A variance = (0.1*0.1 + 0.2*0.2 + 0.3*0.3)/3=0.5// 则机器B variance = (0.3*0.3 + 0.3*0.3 + 0*0)/3=0.6// 则机器A更容器被选择,因此机器A各个资源偏离mean相对均衡// 而机器B有一个极端值0,导致机器B的偏离程度不均衡variance := (((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3)return int64((1 - variance) * float64(framework.MaxNodeScore))}// diff越大代表,cpu和mem本次资源申请占用比例越不均衡// 假设:// 机器A:cpu申请比例0.5,mem申请0.6,则diff为0.2// 机器B:cpu申请比例0.8,mem申请0.2,则diff为0.6// 则机器A得分:(1 - 0.2) * 100 = 80// 则机器B得分:(1 - 0.6) * 100 = 40// 显然,资源申请之后,cpu和mem剩余资源越均衡,越容易被选择diff := math.Abs(cpuFraction - memoryFraction)return int64((1 - diff) * float64(framework.MaxNodeScore))}// 计算申请和可用的比例func fractionOfCapacity(requested, capacity int64) float64 {if capacity == 0 {return 1}return float64(requested) / float64(capacity)}
1.5、NodeResourcesMostAllocated(Score扩展点)
1.5.1、算法功能
该算法主要是用于优选当前资源利用高的node节点,这样做的选择是为了给大资源申请对象预留。比如ai场景。如果资源分布比较平均,可能总可分配资源可以达到要求,但是单台node可能资源不能满足,导致无法调度
1.5.2、代码解析
const MostAllocatedName = "NodeResourcesMostAllocated"type MostAllocated struct {resourceAllocationScorerhandle framework.FrameworkHandle}func NewMostAllocated(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {return &MostAllocated{handle: h,resourceAllocationScorer: resourceAllocationScorer{MostAllocatedName,// 核心函数,用以计算node分值mostResourceScorer,defaultRequestedRatioResources,},}, nil}func (ma *MostAllocated) Name() string {return MostAllocatedName}func (ma *MostAllocated) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {nodeInfo, err := ma.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)if err != nil || nodeInfo.Node() == nil {return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))}return ma.score(pod, nodeInfo)}func (ma *MostAllocated) ScoreExtensions() framework.ScoreExtensions {return nil}// 计算方式为每种资源得分乘以权重并进行累加// 然后再根据权重综合进行除法得到最终得分// 假设:// node机器内存资源得分为50,权重为1// node机器cpu资源得分为60,权重为1// 则机器总分为50 * 1 + 60 * 1 = 110// 最终得分为110 / 2 = 55分// 相当于根据各个资源的得分进行总体平局得分计算func mostResourceScorer(requested, allocable resourceToValueMap, _ bool, _ int, _ int) int64 {var nodeScore, weightSum int64for resource, weight := range defaultRequestedRatioResources {resourceScore := mostRequestedScore(requested[resource], allocable[resource])nodeScore += resourceScore * weightweightSum += weight}return nodeScore / weightSum}// 算法计算方式// 剩余资源越少,得分越高// 以内存为例:// 假设当前可用内存总量8G,POD申请4G,则Node得分50// 假设当前可用内存总量8G,POD申请2G,则Node得分25分// 资源利用低的node更容器被选择func mostRequestedScore(requested, capacity int64) int64 {if capacity == 0 {return 0}if requested > capacity {return 0}return (requested * framework.MaxNodeScore) / capacity}
1.6、NodeResourceLimits(PreScore/Score扩展点)
1.6.1、算法功能
该算法主要是用于优选pod配置的limit小于node可分配资源的节点
1.6.2、代码解析
const (ResourceLimitsName = "NodeResourceLimits"preScoreStateKey = "PreScore" + ResourceLimitsName)type ResourceLimits struct {handle framework.FrameworkHandle}func NewResourceLimits(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {return &ResourceLimits{handle: h}, nil}func (rl *ResourceLimits) Name() string {return ResourceLimitsName}func (rl *ResourceLimits) PreScore(_ context.Context,cycleState *framework.CycleState,pod *v1.Pod,nodes []*v1.Node,) *framework.Status {if len(nodes) == 0 {return nil}if rl.handle.SnapshotSharedLister() == nil {return framework.NewStatus(framework.Error, fmt.Sprintf("empty shared lister"))}s := &preScoreState{podResourceRequest: getResourceLimits(pod),}cycleState.Write(preScoreStateKey, s)return nil}func (rl *ResourceLimits) Score(_ context.Context, state *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {nodeInfo, err := rl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)if err != nil || nodeInfo.Node() == nil {return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))}allocatableResources := nodeInfo.AllocatableResource()podLimits, err := getPodResource(state)if err != nil {return 0, framework.NewStatus(framework.Error, err.Error())}cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU)memScore := computeScore(podLimits.Memory, allocatableResources.Memory)// 只要cpu或者mem满足,就得一分score := int64(0)if cpuScore == 1 || memScore == 1 {score = 1}return score, nil}func (rl *ResourceLimits) ScoreExtensions() framework.ScoreExtensions {return nil}type preScoreState struct {podResourceRequest *schedulernodeinfo.Resource}func (s *preScoreState) Clone() framework.StateData {return s}func getPodResource(cycleState *framework.CycleState) (*schedulernodeinfo.Resource, error) {c, err := cycleState.Read(preScoreStateKey)if err != nil {return nil, fmt.Errorf("Error reading %q from cycleState: %v", preScoreStateKey, err)}s, ok := c.(*preScoreState)if !ok {return nil, fmt.Errorf("%+v convert to ResourceLimits.preScoreState error", c)}return s.podResourceRequest, nil}// 获取Containers的limit// 并根据InitContainers计算最大值func getResourceLimits(pod *v1.Pod) *schedulernodeinfo.Resource {result := &schedulernodeinfo.Resource{}for _, container := range pod.Spec.Containers {result.Add(container.Resources.Limits)}for _, container := range pod.Spec.InitContainers {result.SetMaxResource(container.Resources.Limits)}return result}// 若limit配置小于可分配资源,则得一分func computeScore(limit, allocatable int64) int64 {if limit != 0 && allocatable != 0 && limit <= allocatable {return 1}return 0}
1.7、RequestedToCapacityRatio(Score扩展点)
1.7.1、算法功能
1.7.2、代码解析
2、nodeaffinity(关于node亲和性相关的调度算法)
2.1、NodeAffinity(Filter/Score扩展点)
2.1.1、算法功能
该算法主要描述node亲和性,主要用来解决pod绑定到给定条件的node上面,实现pod和node的亲和性调度。比如:pod1需要调度到有ssd盘的node上面,pod2需要调度到cpu配置较好的node上面等
2.1.2、代码解析
const (Name = "NodeAffinity"ErrReason = "node(s) didn't match node selector")type NodeAffinity struct {handle framework.FrameworkHandle}func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {return &NodeAffinity{handle: h}, nil}func (pl *NodeAffinity) Name() string {return Name}// 预选插件func (pl *NodeAffinity) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {node := nodeInfo.Node()if node == nil {return framework.NewStatus(framework.Error, "node not found")}if !pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) {return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason)}return nil}// 优选插件func (pl *NodeAffinity) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)if err != nil {return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))}node := nodeInfo.Node()if node == nil {return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))}affinity := pod.Spec.Affinityvar count int64if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {// 依次遍历所有的优选配置// 若符合则累加相应的权重// 最后返回得分for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]// 权重为0的不处理if preferredSchedulingTerm.Weight == 0 {continue}nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)if err != nil {return 0, framework.NewStatus(framework.Error, err.Error())}if nodeSelector.Matches(labels.Set(node.Labels)) {count += int64(preferredSchedulingTerm.Weight)}}}return count, nil}func (pl *NodeAffinity) NormalizeScore(_ context.Context, _ *framework.CycleState, _ *v1.Pod, scores framework.NodeScoreList) *framework.Status {return pluginhelper.DefaultNormalizeScore(framework.MaxNodeScore, false, scores)}func (pl *NodeAffinity) ScoreExtensions() framework.ScoreExtensions {return pl}// 预选阶段判断node亲和性// 1、node的label必须与pod的NodeSelector一致// 2、在1的基础上再判断node亲和性func PodMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool {// 判断labelif len(pod.Spec.NodeSelector) > 0 {selector := labels.SelectorFromSet(pod.Spec.NodeSelector)if !selector.Matches(labels.Set(node.Labels)) {return false}}// 只要有一个NodeSelectorTerm符合条件,就return true// 且MatchFields目前只支持node的metadata.namenodeAffinityMatches := trueaffinity := pod.Spec.Affinityif affinity != nil && affinity.NodeAffinity != nil {nodeAffinity := affinity.NodeAffinity// 若没有设置RequiredDuringSchedulingIgnoredDuringExecution,直接返回if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {return true}// 查看node的label是否符合pod配置的亲和性nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTermsnodeAffinityMatches = nodeAffinityMatches && nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)}return nodeAffinityMatches}func nodeMatchesNodeSelectorTerms(node *v1.Node, nodeSelectorTerms []v1.NodeSelectorTerm) bool {return v1helper.MatchNodeSelectorTerms(nodeSelectorTerms, node.Labels, fields.Set{"metadata.name": node.Name,})}// DefaultNormalizeScore generates a Normalize Score function that can normalize the// scores to [0, maxPriority]. If reverse is set to true, it reverses the scores by// subtracting it from maxPriority.func DefaultNormalizeScore(maxPriority int64, reverse bool, scores framework.NodeScoreList) *framework.Status {var maxCount int64for i := range scores {if scores[i].Score > maxCount {maxCount = scores[i].Score}}if maxCount == 0 {if reverse {for i := range scores {scores[i].Score = maxPriority}}return nil}for i := range scores {score := scores[i].Scorescore = maxPriority * score / maxCountif reverse {score = maxPriority - score}scores[i].Score = score}return nil}
