- 1、noderesources(关于node资源相关的调度算法)
- 2、nodeaffinity(关于node亲和性相关的调度算法)
版本1.18.1 代码路径:kubernetes/pkg/scheduler/framework/plugins PS:代码是经过我整理、删减,主要是为了自己阅读方便,但是不影响主要逻辑
1、noderesources(关于node资源相关的调度算法)
1.1、前置解析:resourceAllocationScorer
1.1.1、描述
此结构体及相关方法是公用的,NodeResourcesBalancedAllocation、NodeResourcesLeastAllocated、NodeResourcesMostAllocated等均会调用进行资源计算和最终的分值计算
1.1.2、代码解析
var defaultRequestedRatioResources = resourceToWeightMap{v1.ResourceMemory: 1, v1.ResourceCPU: 1}
type resourceToWeightMap map[v1.ResourceName]int64
type resourceAllocationScorer struct {
Name string
scorer func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64
resourceToWeightMap resourceToWeightMap
}
type resourceToValueMap map[v1.ResourceName]int64
func (r *resourceAllocationScorer) score(
pod *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo) (int64, *framework.Status) {
node := nodeInfo.Node()
if node == nil {
return 0, framework.NewStatus(framework.Error, "node not found")
}
if r.resourceToWeightMap == nil {
return 0, framework.NewStatus(framework.Error, "resources not found")
}
requested := make(resourceToValueMap, len(r.resourceToWeightMap))
allocatable := make(resourceToValueMap, len(r.resourceToWeightMap))
for resource := range r.resourceToWeightMap {
allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)
}
var score int64
if len(pod.Spec.Volumes) >= 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {
score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)
} else {
score = r.scorer(requested, allocatable, false, 0, 0)
}
return score, nil
}
func calculateResourceAllocatableRequest(nodeInfo *schedulernodeinfo.NodeInfo, pod *v1.Pod, resource v1.ResourceName) (int64, int64) {
allocatable := nodeInfo.AllocatableResource()
requested := nodeInfo.RequestedResource()
podRequest := calculatePodResourceRequest(pod, resource)
switch resource {
case v1.ResourceCPU:
return allocatable.MilliCPU, nodeInfo.NonZeroRequest().MilliCPU + podRequest
case v1.ResourceMemory:
return allocatable.Memory, nodeInfo.NonZeroRequest().Memory + podRequest
case v1.ResourceEphemeralStorage:
return allocatable.EphemeralStorage, requested.EphemeralStorage + podRequest
default:
if v1helper.IsScalarResourceName(resource) {
return allocatable.ScalarResources[resource], requested.ScalarResources[resource] + podRequest
}
}
return 0, 0
}
func calculatePodResourceRequest(pod *v1.Pod, resource v1.ResourceName) int64 {
var podRequest int64
for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]
value := schedutil.GetNonzeroRequestForResource(resource, &container.Resources.Requests)
podRequest += value
}
for i := range pod.Spec.InitContainers {
initContainer := &pod.Spec.InitContainers[i]
value := schedutil.GetNonzeroRequestForResource(resource, &initContainer.Resources.Requests)
if podRequest < value {
podRequest = value
}
}
if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodOverhead) {
if quantity, found := pod.Spec.Overhead[resource]; found {
podRequest += quantity.Value()
}
}
return podRequest
}
1.2、NodeResourcesFit(PreFilter、Filter扩展点)
1.2.1、算法功能
该算法主要是检验给node是否能够满足pod的资源要求,只出现在预选阶段
1.2.2、代码解析
const (
FitName = "NodeResourcesFit"
preFilterStateKey = "PreFilter" + FitName
)
type Fit struct {
ignoredResources sets.String
}
type FitArgs struct {
// 用以忽略一些资源的校验
IgnoredResources []string `json:"ignoredResources,omitempty"`
}
func NewFit(plArgs *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
args := &FitArgs{}
if err := framework.DecodeInto(plArgs, args); err != nil {
return nil, err
}
fit := &Fit{}
fit.ignoredResources = sets.NewString(args.IgnoredResources...)
return fit, nil
}
func (f *Fit) Name() string {
return FitName
}
// 将pod的资源申请写入CycleState供Filter扩展点使用
func (f *Fit) PreFilter(_ context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))
return nil
}
func (f *Fit) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
func (f *Fit) Filter(_ context.Context, cycleState *framework.CycleState, _ *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
// 获取pod资源数据
s, err := getPreFilterState(cycleState)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
// 判断资源是否满足
insufficientResources := fitsRequest(s, nodeInfo, f.ignoredResources)
// 返回结果
if len(insufficientResources) != 0 {
failureReasons := make([]string, 0, len(insufficientResources))
for _, r := range insufficientResources {
failureReasons = append(failureReasons, r.Reason)
}
return framework.NewStatus(framework.Unschedulable, failureReasons...)
}
return nil
}
type preFilterState struct {
schedulernodeinfo.Resource
}
func (s *preFilterState) Clone() framework.StateData {
return s
}
// 何种资源导致filter失败
type InsufficientResource struct {
ResourceName v1.ResourceName
Reason string
Requested int64
Used int64
Capacity int64
}
func Fits(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ignoredExtendedResources sets.String) []InsufficientResource {
return fitsRequest(computePodResourceRequest(pod), nodeInfo, ignoredExtendedResources)
}
func fitsRequest(podRequest *preFilterState, nodeInfo *schedulernodeinfo.NodeInfo, ignoredExtendedResources sets.String) []InsufficientResource {
// 此处cap设置为4应该是默认一般不包含扩展资源
// 因此极大概率不会发送切片扩容
insufficientResources := make([]InsufficientResource, 0, 4)
allowedPodNumber := nodeInfo.AllowedPodNumber()
// pod太多
if len(nodeInfo.Pods())+1 > allowedPodNumber {
insufficientResources = append(insufficientResources, InsufficientResource{
v1.ResourcePods,
"Too many pods",
1,
int64(len(nodeInfo.Pods())),
int64(allowedPodNumber),
})
}
if ignoredExtendedResources == nil {
ignoredExtendedResources = sets.NewString()
}
// pod未申请任何资源
if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.EphemeralStorage == 0 && len(podRequest.ScalarResources) == 0 {
return insufficientResources
}
allocatable := nodeInfo.AllocatableResource()
// 判断可分配cpu是否满足
if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
insufficientResources = append(insufficientResources, InsufficientResource{
v1.ResourceCPU,
"Insufficient cpu",
podRequest.MilliCPU,
nodeInfo.RequestedResource().MilliCPU,
allocatable.MilliCPU,
})
}
// 判断可分配内存是否满足
if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
insufficientResources = append(insufficientResources, InsufficientResource{
v1.ResourceMemory,
"Insufficient memory",
podRequest.Memory,
nodeInfo.RequestedResource().Memory,
allocatable.Memory,
})
}
// 判断可分配临时存储是否满足
if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
insufficientResources = append(insufficientResources, InsufficientResource{
v1.ResourceEphemeralStorage,
"Insufficient ephemeral-storage",
podRequest.EphemeralStorage,
nodeInfo.RequestedResource().EphemeralStorage,
allocatable.EphemeralStorage,
})
}
// 判断扩展资源,诸如gpu
for rName, rQuant := range podRequest.ScalarResources {
if v1helper.IsExtendedResourceName(rName) {
if ignoredExtendedResources.Has(string(rName)) {
continue
}
}
if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
insufficientResources = append(insufficientResources, InsufficientResource{
rName,
fmt.Sprintf("Insufficient %v", rName),
podRequest.ScalarResources[rName],
nodeInfo.RequestedResource().ScalarResources[rName],
allocatable.ScalarResources[rName],
})
}
}
return insufficientResources
}
// 计算pod所需资源
// 首先计算所有容器需要的资源
// 然后根据init容器所需资源计算最大值
// 最后计算额外资源(类似于pause容器等资源的消耗)
func computePodResourceRequest(pod *v1.Pod) *preFilterState {
result := &preFilterState{}
for _, container := range pod.Spec.Containers {
result.Add(container.Resources.Requests)
}
for _, container := range pod.Spec.InitContainers {
result.SetMaxResource(container.Resources.Requests)
}
if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodOverhead) {
result.Add(pod.Spec.Overhead)
}
return result
}
// 获取preFilter填充的状态
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
c, err := cycleState.Read(preFilterStateKey)
if err != nil {
return nil, fmt.Errorf("error reading %q from cycleState: %v", preFilterStateKey, err)
}
s, ok := c.(*preFilterState)
if !ok {
return nil, fmt.Errorf("%+v convert to NodeResourcesFit.preFilterState error", c)
}
return s, nil
}
1.3、NodeResourcesLeastAllocated(Score扩展点)
1.3.1、算法功能
该算法主要是用于优选当前资源利用低的node节点且一般和NodeResourcesBalancedAllocation一起使用
1.3.2、代码解析
const LeastAllocatedName = "NodeResourcesLeastAllocated"
type LeastAllocated struct {
resourceAllocationScorer
handle framework.FrameworkHandle
}
func NewLeastAllocated(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
return &LeastAllocated{
handle: h,
resourceAllocationScorer: resourceAllocationScorer{
LeastAllocatedName,
// 核心函数,用以计算node分值
leastResourceScorer,
defaultRequestedRatioResources,
},
}, nil
}
func (la *LeastAllocated) Name() string {
return LeastAllocatedName
}
func (la *LeastAllocated) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := la.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
return la.score(pod, nodeInfo)
}
func (la *LeastAllocated) ScoreExtensions() framework.ScoreExtensions {
return nil
}
// 计算方式为每种资源得分乘以权重并进行累加
// 然后再根据权重综合进行除法得到最终得分
// 假设:
// node机器内存资源得分为50,权重为1
// node机器cpu资源得分为60,权重为1
// 则机器总分为50 * 1 + 60 * 1 = 110
// 最终得分为110 / 2 = 55分
// 相当于根据各个资源的得分进行总体平局得分计算
func leastResourceScorer(requested, allocable resourceToValueMap, _ bool, _ int, _ int) int64 {
var nodeScore, weightSum int64
for resource, weight := range defaultRequestedRatioResources {
resourceScore := leastRequestedScore(requested[resource], allocable[resource])
nodeScore += resourceScore * weight
weightSum += weight
}
return nodeScore / weightSum
}
// 算法计算方式
// 剩余资源越多,得分越高
// 以内存为例:
// 假设当前可用内存总量8G,POD申请4G,则Node得分50
// 假设当前可用内存总量8G,POD申请2G,则Node得分75分
// 资源利用低的node更容器被选择
func leastRequestedScore(requested, capacity int64) int64 {
if capacity == 0 {
return 0
}
if requested > capacity {
return 0
}
return ((capacity - requested) * framework.MaxNodeScore) / capacity
}
1.4、NodeResourcesBalancedAllocation(Score扩展点)
1.4.1、算法功能
该算法主要是用于优选当前cpu利用、内存利用率相对均衡的机器。且一般和NodeResourcesLeastAllocated使用
假设:
机器A cpu利用率80%,内存利用率20%
机器B cpu利用率60%,内存利用率50%
则机器B更容器被选择
1.4.2、代码解析
const BalancedAllocationName = "NodeResourcesBalancedAllocation"
type BalancedAllocation struct {
resourceAllocationScorer
handle framework.FrameworkHandle
}
func NewBalancedAllocation(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
return &BalancedAllocation{
handle: h,
resourceAllocationScorer: resourceAllocationScorer{
BalancedAllocationName,
// 核心函数,用以计算node分值
balancedResourceScorer,
defaultRequestedRatioResources,
},
}, nil
}
func (ba *BalancedAllocation) Name() string {
return BalancedAllocationName
}
func (ba *BalancedAllocation) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := ba.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
return ba.score(pod, nodeInfo)
}
func (ba *BalancedAllocation) ScoreExtensions() framework.ScoreExtensions {
return nil
}
// 核心函数
func balancedResourceScorer(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {
// 计算cpu/mem各自的申请比例
cpuFraction := fractionOfCapacity(requested[v1.ResourceCPU], allocable[v1.ResourceCPU])
memoryFraction := fractionOfCapacity(requested[v1.ResourceMemory], allocable[v1.ResourceMemory])
// 如果有一个大于或者等于1,则得分为0
if cpuFraction >= 1 || memoryFraction >= 1 {
return 0
}
// 包含volume的判断
if includeVolumes && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && allocatableVolumes > 0 {
// 计算volume比例
volumeFraction := float64(requestedVolumes) / float64(allocatableVolumes)
// 大于等于1,则得分为0
if volumeFraction >= 1 {
return 0
}
// 计算3种资源的变化幅度
mean := (cpuFraction + memoryFraction + volumeFraction) / float64(3)
// 本质上是通过方差来获取一个离散程度描述
// 假设:
// 机器A:cpu申请比例0.5,mem申请0.6,volume申请0.1,则mean为0.4
// 机器B:cpu申请比例0.8,mem申请0.2,volume申请0.5,则mean为0.5
// 则机器A variance = (0.1*0.1 + 0.2*0.2 + 0.3*0.3)/3=0.5
// 则机器B variance = (0.3*0.3 + 0.3*0.3 + 0*0)/3=0.6
// 则机器A更容器被选择,因此机器A各个资源偏离mean相对均衡
// 而机器B有一个极端值0,导致机器B的偏离程度不均衡
variance := (((cpuFraction - mean) * (cpuFraction - mean)) + ((memoryFraction - mean) * (memoryFraction - mean)) + ((volumeFraction - mean) * (volumeFraction - mean))) / float64(3)
return int64((1 - variance) * float64(framework.MaxNodeScore))
}
// diff越大代表,cpu和mem本次资源申请占用比例越不均衡
// 假设:
// 机器A:cpu申请比例0.5,mem申请0.6,则diff为0.2
// 机器B:cpu申请比例0.8,mem申请0.2,则diff为0.6
// 则机器A得分:(1 - 0.2) * 100 = 80
// 则机器B得分:(1 - 0.6) * 100 = 40
// 显然,资源申请之后,cpu和mem剩余资源越均衡,越容易被选择
diff := math.Abs(cpuFraction - memoryFraction)
return int64((1 - diff) * float64(framework.MaxNodeScore))
}
// 计算申请和可用的比例
func fractionOfCapacity(requested, capacity int64) float64 {
if capacity == 0 {
return 1
}
return float64(requested) / float64(capacity)
}
1.5、NodeResourcesMostAllocated(Score扩展点)
1.5.1、算法功能
该算法主要是用于优选当前资源利用高的node节点,这样做的选择是为了给大资源申请对象预留。比如ai场景。如果资源分布比较平均,可能总可分配资源可以达到要求,但是单台node可能资源不能满足,导致无法调度
1.5.2、代码解析
const MostAllocatedName = "NodeResourcesMostAllocated"
type MostAllocated struct {
resourceAllocationScorer
handle framework.FrameworkHandle
}
func NewMostAllocated(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
return &MostAllocated{
handle: h,
resourceAllocationScorer: resourceAllocationScorer{
MostAllocatedName,
// 核心函数,用以计算node分值
mostResourceScorer,
defaultRequestedRatioResources,
},
}, nil
}
func (ma *MostAllocated) Name() string {
return MostAllocatedName
}
func (ma *MostAllocated) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := ma.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil || nodeInfo.Node() == nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
}
return ma.score(pod, nodeInfo)
}
func (ma *MostAllocated) ScoreExtensions() framework.ScoreExtensions {
return nil
}
// 计算方式为每种资源得分乘以权重并进行累加
// 然后再根据权重综合进行除法得到最终得分
// 假设:
// node机器内存资源得分为50,权重为1
// node机器cpu资源得分为60,权重为1
// 则机器总分为50 * 1 + 60 * 1 = 110
// 最终得分为110 / 2 = 55分
// 相当于根据各个资源的得分进行总体平局得分计算
func mostResourceScorer(requested, allocable resourceToValueMap, _ bool, _ int, _ int) int64 {
var nodeScore, weightSum int64
for resource, weight := range defaultRequestedRatioResources {
resourceScore := mostRequestedScore(requested[resource], allocable[resource])
nodeScore += resourceScore * weight
weightSum += weight
}
return nodeScore / weightSum
}
// 算法计算方式
// 剩余资源越少,得分越高
// 以内存为例:
// 假设当前可用内存总量8G,POD申请4G,则Node得分50
// 假设当前可用内存总量8G,POD申请2G,则Node得分25分
// 资源利用低的node更容器被选择
func mostRequestedScore(requested, capacity int64) int64 {
if capacity == 0 {
return 0
}
if requested > capacity {
return 0
}
return (requested * framework.MaxNodeScore) / capacity
}
1.6、NodeResourceLimits(PreScore/Score扩展点)
1.6.1、算法功能
该算法主要是用于优选pod配置的limit小于node可分配资源的节点
1.6.2、代码解析
const (
ResourceLimitsName = "NodeResourceLimits"
preScoreStateKey = "PreScore" + ResourceLimitsName
)
type ResourceLimits struct {
handle framework.FrameworkHandle
}
func NewResourceLimits(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
return &ResourceLimits{handle: h}, nil
}
func (rl *ResourceLimits) Name() string {
return ResourceLimitsName
}
func (rl *ResourceLimits) PreScore(
_ context.Context,
cycleState *framework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
) *framework.Status {
if len(nodes) == 0 {
return nil
}
if rl.handle.SnapshotSharedLister() == nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("empty shared lister"))
}
s := &preScoreState{
podResourceRequest: getResourceLimits(pod),
}
cycleState.Write(preScoreStateKey, s)
return nil
}
func (rl *ResourceLimits) Score(_ context.Context, state *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := rl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil || nodeInfo.Node() == nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
}
allocatableResources := nodeInfo.AllocatableResource()
podLimits, err := getPodResource(state)
if err != nil {
return 0, framework.NewStatus(framework.Error, err.Error())
}
cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU)
memScore := computeScore(podLimits.Memory, allocatableResources.Memory)
// 只要cpu或者mem满足,就得一分
score := int64(0)
if cpuScore == 1 || memScore == 1 {
score = 1
}
return score, nil
}
func (rl *ResourceLimits) ScoreExtensions() framework.ScoreExtensions {
return nil
}
type preScoreState struct {
podResourceRequest *schedulernodeinfo.Resource
}
func (s *preScoreState) Clone() framework.StateData {
return s
}
func getPodResource(cycleState *framework.CycleState) (*schedulernodeinfo.Resource, error) {
c, err := cycleState.Read(preScoreStateKey)
if err != nil {
return nil, fmt.Errorf("Error reading %q from cycleState: %v", preScoreStateKey, err)
}
s, ok := c.(*preScoreState)
if !ok {
return nil, fmt.Errorf("%+v convert to ResourceLimits.preScoreState error", c)
}
return s.podResourceRequest, nil
}
// 获取Containers的limit
// 并根据InitContainers计算最大值
func getResourceLimits(pod *v1.Pod) *schedulernodeinfo.Resource {
result := &schedulernodeinfo.Resource{}
for _, container := range pod.Spec.Containers {
result.Add(container.Resources.Limits)
}
for _, container := range pod.Spec.InitContainers {
result.SetMaxResource(container.Resources.Limits)
}
return result
}
// 若limit配置小于可分配资源,则得一分
func computeScore(limit, allocatable int64) int64 {
if limit != 0 && allocatable != 0 && limit <= allocatable {
return 1
}
return 0
}
1.7、RequestedToCapacityRatio(Score扩展点)
1.7.1、算法功能
1.7.2、代码解析
2、nodeaffinity(关于node亲和性相关的调度算法)
2.1、NodeAffinity(Filter/Score扩展点)
2.1.1、算法功能
该算法主要描述node亲和性,主要用来解决pod绑定到给定条件的node上面,实现pod和node的亲和性调度。比如:pod1需要调度到有ssd盘的node上面,pod2需要调度到cpu配置较好的node上面等
2.1.2、代码解析
const (
Name = "NodeAffinity"
ErrReason = "node(s) didn't match node selector"
)
type NodeAffinity struct {
handle framework.FrameworkHandle
}
func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
return &NodeAffinity{handle: h}, nil
}
func (pl *NodeAffinity) Name() string {
return Name
}
// 预选插件
func (pl *NodeAffinity) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
node := nodeInfo.Node()
if node == nil {
return framework.NewStatus(framework.Error, "node not found")
}
if !pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason)
}
return nil
}
// 优选插件
func (pl *NodeAffinity) Score(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
node := nodeInfo.Node()
if node == nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
affinity := pod.Spec.Affinity
var count int64
if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
// 依次遍历所有的优选配置
// 若符合则累加相应的权重
// 最后返回得分
for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
// 权重为0的不处理
if preferredSchedulingTerm.Weight == 0 {
continue
}
nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
if err != nil {
return 0, framework.NewStatus(framework.Error, err.Error())
}
if nodeSelector.Matches(labels.Set(node.Labels)) {
count += int64(preferredSchedulingTerm.Weight)
}
}
}
return count, nil
}
func (pl *NodeAffinity) NormalizeScore(_ context.Context, _ *framework.CycleState, _ *v1.Pod, scores framework.NodeScoreList) *framework.Status {
return pluginhelper.DefaultNormalizeScore(framework.MaxNodeScore, false, scores)
}
func (pl *NodeAffinity) ScoreExtensions() framework.ScoreExtensions {
return pl
}
// 预选阶段判断node亲和性
// 1、node的label必须与pod的NodeSelector一致
// 2、在1的基础上再判断node亲和性
func PodMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool {
// 判断label
if len(pod.Spec.NodeSelector) > 0 {
selector := labels.SelectorFromSet(pod.Spec.NodeSelector)
if !selector.Matches(labels.Set(node.Labels)) {
return false
}
}
// 只要有一个NodeSelectorTerm符合条件,就return true
// 且MatchFields目前只支持node的metadata.name
nodeAffinityMatches := true
affinity := pod.Spec.Affinity
if affinity != nil && affinity.NodeAffinity != nil {
nodeAffinity := affinity.NodeAffinity
// 若没有设置RequiredDuringSchedulingIgnoredDuringExecution,直接返回
if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
return true
}
// 查看node的label是否符合pod配置的亲和性
nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
nodeAffinityMatches = nodeAffinityMatches && nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)
}
return nodeAffinityMatches
}
func nodeMatchesNodeSelectorTerms(node *v1.Node, nodeSelectorTerms []v1.NodeSelectorTerm) bool {
return v1helper.MatchNodeSelectorTerms(nodeSelectorTerms, node.Labels, fields.Set{
"metadata.name": node.Name,
})
}
// DefaultNormalizeScore generates a Normalize Score function that can normalize the
// scores to [0, maxPriority]. If reverse is set to true, it reverses the scores by
// subtracting it from maxPriority.
func DefaultNormalizeScore(maxPriority int64, reverse bool, scores framework.NodeScoreList) *framework.Status {
var maxCount int64
for i := range scores {
if scores[i].Score > maxCount {
maxCount = scores[i].Score
}
}
if maxCount == 0 {
if reverse {
for i := range scores {
scores[i].Score = maxPriority
}
}
return nil
}
for i := range scores {
score := scores[i].Score
score = maxPriority * score / maxCount
if reverse {
score = maxPriority - score
}
scores[i].Score = score
}
return nil
}