1.调度器的初始化与启动

1.1 调度器的入口 main

在“cmd/kube-scheduler/scheduler.go”中,scheduler使用 NewSchedulerCommand() 初始化命令并执行命令。

  1. func main() {
  2. ...
  3. command := app.NewSchedulerCommand()
  4. ...
  5. if err := command.Execute(); err != nil {
  6. os.Exit(1)
  7. }
  8. }

1.2 初始化调度器命令 NewSchedulerCommand

NewSchedulerCommand() 会读取配置文件和参数,初始化调度命令,这其中最主要的函数是 runCommand()。

func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
    ...
    cmd := &cobra.Command{
        Use: "kube-scheduler",
        ...
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, opts, registryOptions...); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
        ...
    }
    ...
    return cmd
}

1.3 执行调度器命令 runCommand

runCommand() 函数主要分为两个重要步骤:

  1. Setup:读取配置文件以及参数,初始化调度器。这里的配置文件包括 Profiles 配置等。
  2. Run:运行调度器所需的组件,例如健康检查服务,Informer 等。然后使用 Setup 得到的调度器运行调度的主流程。

    func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
     ...
     cc, sched, err := Setup(ctx, opts, registryOptions...)
     if err != nil {
         return err
     }
     return Run(ctx, cc, sched)
    }
    

    1.4 创建调度器 Setup

    Setup() 函数会根据配置文件和参数创建 scheduler。这里最主要的应该是 Profiles,里面定义了调度器的名字,以及 scheduling framework 的插件配置。还有一些可以用来调优的参数,例如 PercentageOfNodesToScore, PodInitialBackoffSeconds , PodMaxBackoffSeconds 等。

    func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
     ...
     // Create the scheduler.
     sched, err := scheduler.New(cc.Client,
         cc.InformerFactory,
         cc.PodInformer,
         recorderFactory,
         ctx.Done(),
         scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
         scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
         scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
         scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
         scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
         scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
         scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
     )
     if err != nil {
         return nil, nil, err
     }
     return &cc, sched, nil
    }
    

    1.5 运行调度器 Run

    Run 主要是启动一些组件,然后调用 sched.Run(ctx) 进行调度的主流程。

    func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
     ...
     // Prepare the event broadcaster.
     cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
     // Setup healthz checks.
     ...
     // Start up the healthz server.
     ...
     // Start all informers.
     go cc.PodInformer.Informer().Run(ctx.Done())
     cc.InformerFactory.Start(ctx.Done())
     // Wait for all caches to sync before scheduling.
     cc.InformerFactory.WaitForCacheSync(ctx.Done())
    
     // If leader election is enabled, runCommand via LeaderElector until done and exit.
     // Leader election
     ...
     // Leader election is disabled, so runCommand inline until done.
     sched.Run(ctx)
     return fmt.Errorf("finished without leader elect")
    }
    

    2. 调度队列内部三种子队列的功能及其实现

    看看scheduler的结构:

    // 文件路径:pkg/scheduler/scheduler.go
    // Scheduler watches for new unscheduled pods. It attempts to find
    // nodes that they fit on and writes bindings back to the api server.
    type Scheduler struct {
     // It is expected that changes made via SchedulerCache will be observed
     // by NodeLister and Algorithm.
     SchedulerCache internalcache.Cache
    
     Algorithm core.ScheduleAlgorithm
    
     // NextPod should be a function that blocks until the next pod
     // is available. We don't use a channel for this, because scheduling
     // a pod may take some amount of time and we don't want pods to get
     // stale while they sit in a channel.
     NextPod func() *framework.QueuedPodInfo
    
     // Error is called if there is an error. It is passed the pod in
     // question, and the error
     Error func(*framework.QueuedPodInfo, error)
    
     // Close this to shut down the scheduler.
     StopEverything <-chan struct{}
    
     // SchedulingQueue holds pods to be scheduled
     SchedulingQueue internalqueue.SchedulingQueue
    
     // Profiles are the scheduling profiles.
     Profiles profile.Map
    
     client clientset.Interface
    }
    

    里面有一个调度队列,队列的接口和具体结构如下: ```go // 文件路径: pkg/scheduler/internal/queue/scheduling_queue.go // SchedulingQueue定义了一个队列接口,用于保存等待调度的pod信息, type SchedulingQueue interface { framework.PodNominator Add(pod v1.Pod) error // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. // The podSchedulingCycle represents the current scheduling cycle number which can be // returned by calling SchedulingCycle(). AddUnschedulableIfNotPresent(pod framework.QueuedPodInfo, podSchedulingCycle int64) error // SchedulingCycle returns the current number of scheduling cycle which is // cached by scheduling queue. Normally, incrementing this number whenever // a pod is popped (e.g. called Pop()) is enough. SchedulingCycle() int64 // Pop removes the head of the queue and returns it. It blocks if the // queue is empty and waits until a new item is added to the queue. Pop() (framework.QueuedPodInfo, error) Update(oldPod, newPod v1.Pod) error Delete(pod v1.Pod) error MoveAllToActiveOrBackoffQueue(event string) AssignedPodAdded(pod v1.Pod) AssignedPodUpdated(pod v1.Pod) PendingPods() []v1.Pod // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. NumUnschedulablePods() int // Run starts the goroutines managing the queue. Run() } // Pod提名信息的控制接口 type PodNominator interface { // AddNominatedPod adds the given pod to the nominated pod map or // updates it if it already exists. AddNominatedPod(pod v1.Pod, nodeName string) // DeleteNominatedPodIfExists deletes nominatedPod from internal cache. It's a no-op if it doesn't exist. DeleteNominatedPodIfExists(pod v1.Pod) // UpdateNominatedPod updates the <oldPod> with <newPod>. UpdateNominatedPod(oldPod, newPod v1.Pod) // NominatedPodsForNode returns nominatedPods on the given node. NominatedPodsForNode(nodeName string) []v1.Pod }

// PriorityQueue是具体的实现 // 队列的头是等待调度的pod里优先级最高的 // 包括三个子队列 type PriorityQueue struct { // 存储、设置调度的提名信息,其实就是调度的结果:pod和node的对应关系 framework.PodNominator // 外部控制队列的channel stop chan struct{} clock util.Clock

// backoff pod 初始的等待重新调度时间
podInitialBackoffDuration time.Duration
// backoff pod 最大的等待重新调度的时间
podMaxBackoffDuration time.Duration

lock sync.RWMutex
// 并发场景下,控制pop的阻塞
cond sync.Cond
// 阻塞队列
activeQ *heap.Heap
// backoff队列
podBackoffQ *heap.Heap
// 不可调度队列
unschedulableQ *UnschedulablePodsMap
// 一个计数器,每次pop一个pod,自增一次
schedulingCycle int64
// moveRequestCycle caches the sequence number of scheduling cycle when we
// received a move request. Unscheduable pods in and before this scheduling
// cycle will be put back to activeQueue if we were trying to schedule them
// when we received move request.
moveRequestCycle int64

// 控制队列的开关
closed bool

}

PriorityQueue作为实现SchedulingQueue的实现,其核心数据结构主要包含三个队列:activeQ、podBackoffQ、unscheduleQ。<br />内部通过cond来实现Pop操作的阻塞与通知,接下来先分析核心的调度流程。util.heap结构就是堆结构。
<a name="idJLd"></a>
## 2.1 activeQ
存储所有等待调度的Pod的队列,默认是基于堆来实现,其中元素的优先级则通过对比pod的创建时间和pod的优先级来进行排序。<br />既然是堆,那么必然需要定义一下堆结构的元素优先级比较方法;这块代码定义在framework路径下:
```go
// 文件路径: pkg/scheduler/framework/plugins/queuesort/priority_sort.go

// PrioritySort is a plugin that implements Priority based sorting.
type PrioritySort struct{}

// Less 是activeQ默认的排序方法
// 该方法根据pod的优先级来排序,如果优先级一样,则根据pod的timestamp来决定优先级
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
    p1 := pod.GetPodPriority(pInfo1.Pod)
    p2 := pod.GetPodPriority(pInfo2.Pod)
    return (p1 &gt; p2) || (p1 == p2 &amp;&amp; pInfo1.Timestamp.Before(pInfo2.Timestamp))
}

这里的GetPodPriority方法做了啥呢?

// 文件路径:pkg/api/v1/pod/util.go
// 从pod的配置种获取优先等级的设置,没有则返回0
func GetPodPriority(pod *v1.Pod) int32 {
    if pod.Spec.Priority != nil {
        return *pod.Spec.Priority
    }
    return 0
}

priorityclass字段控制着pod调度的优先级,优先级高的pod,会被直接排到最前面。

2.2 podbackoffQ

一般执行失败的pod,会有一些backoff的event。
backoff机制是并发编程中常见的一种机制,即如果任务反复执行依旧失败,则会按次增长等待调度时间,降低重试效率,从而避免反复失败浪费调度资源。
针对调度失败的pod会优先存储在backoff队列中,等待后续重试。
podBackOffQ主要存储那些在多个schedulingCycle中依旧调度失败的情况下,则会通过之前说的backOff机制,延迟等待调度的时间。
backoffQ也是一个优先队列,那么它的默认比较方法如下所示,简单粗暴,根据失败的时间来排序:

// 文件路径: pkg/scheduler/internal/queue/scheduling_queue.go
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
    pInfo1 := podInfo1.(*framework.QueuedPodInfo)
    pInfo2 := podInfo2.(*framework.QueuedPodInfo)
    bo1 := p.getBackoffTime(pInfo1)
    bo2 := p.getBackoffTime(pInfo2)
    return bo1.Before(bo2)
}

// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
    duration := p.calculateBackoffDuration(podInfo)
    backoffTime := podInfo.Timestamp.Add(duration)
    return backoffTime
}

// 计算backoff时间
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
    duration := p.podInitialBackoffDuration
    for i := 1; i &lt; podInfo.Attempts; i++ {
        duration = duration * 2
        if duration &gt; p.podMaxBackoffDuration {
            return p.podMaxBackoffDuration
        }
    }
    return duration
}

2.3 unschedulableQ

字面意思,不可调度队列。
虽说是个队列,实际的数据结构是一个map。

// 文件路径: pkg/scheduler/internal/queue/scheduling_queue.go
// 存储暂时无法被调度的pod信息
// 例如: 资源不足,无法被调度
type UnschedulablePodsMap struct {
    // podInfoMap 是一个map,他的key是pod的full-name,值是指向pod信息的指针,想不到吧 go里面也有指针,相当于引用
    podInfoMap map[string]*framework.QueuedPodInfo
    keyFunc    func(*v1.Pod) string
    // 注释很浅显,字面意思
    // metricRecorder updates the counter when elements of an unschedulablePodsMap
    // get added or removed, and it does nothing if it&#039;s nil
    metricRecorder metrics.MetricRecorder
}

// 不可调度队列的构造函数,看得出来,keyfunc里获取pod的全名
func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap {
    return &amp;UnschedulablePodsMap{
        podInfoMap:     make(map[string]*framework.QueuedPodInfo),
        keyFunc:        util.GetPodFullName,
        metricRecorder: metricRecorder,
    }

2.4 三级队列如何使用呢?

// 文件路径:pkg/scheduler/scheduler.go
// scheduler的RUN方法
func (sched *Scheduler) Run(ctx context.Context) {
    sched.SchedulingQueue.Run()
    wait.UntilWithContext(ctx, sched.scheduleOne, 0)
    sched.SchedulingQueue.Close()
}
// 文件路径: pkg/scheduler/internal/queue/scheduling_queue.go
// PriorityQueue的RUN方法
func (p *PriorityQueue) Run() {
    go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
    go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

// 文件路径 pkg/scheduler/scheduler.go
// scheduleOne方法是调度的核心方法,该方法里,为一个pod的调度走了完整的调度流程
func (sched *Scheduler) scheduleOne(ctx context.Context) {
    // 方法太长,后面再详细分析
}

有三条工作流在同时运行:

  1. 每隔1秒,检测backoffQ里是否有pod可以被放进activeQ里
  2. 每隔30秒,检测unschedulepodQ里是否有pod可以被放进activeQ里(默认条件是等待时间超过60秒)
  3. 不停的调用scheduleOne方法,从activeQ里取出pod来进行调度

    2.5 失败与重试处理

    那么什么情况下要放进backoff里,什么情况下,进unschedule呢?进去unschedule后,只能等时间到了才重新拿出来调度吗?
    这里就要提一下优先队列里,还有两个重要的值属性了:

  4. schedulingCycle

  5. moveRequestCycle

以及几个对外暴露的方法:

  1. AddUnschedulableIfNotPresent
  2. AssignedPodAdded
  3. AssignedPodUpdated
  4. MoveAllToActiveOrBackoffQueue

设想一下,当前如果有一个pod调度失败了,只有一个可能,就是当前没有能满足它要求的资源。换言之,它属于unScheduleable,不可调度的。
如果集群内的状态一直不发生变化,理论上它会一直处于不可调度的状态,根据前面RUN方法里的定时器,这种情况下,每隔60秒,这些pod还是会被重新尝试调度一次。
但是一旦集群的状态发生了变化,很有可能,这些不可调度的pod,就能获取自己想要的资源了,换言之,集群状态变化后,unscheduleableQ的pod应该放进backoffQ里。等待安排重新调度。backoffQ里的pod会根据重试的次数设定等待重试的时间,重试的次数越少,等待重新调度的时间也就越少。换言之,backOffQ里的pod调度的速度会比unscheduleableQ里的pod快得多,肯定不需要等60秒那么久。
而AssignedPodAdded、AssignedPodUpdated、MoveAllToActiveOrBackoffQueue主要被用来设置资源更新时的回调方法。也就是在新的pod被创建、更新,或者别的资源更新时,k8s会认为之前无法被调度的pod,有了重试的机会。这三个方法在底层都会调用movePodsToActiveOrBackoffQueue方法:

// 文件路径: pkg/scheduler/internal/queue/scheduling_queue.go
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event string) {
    for _, pInfo := range podInfoList {
        pod := pInfo.Pod
        if p.isPodBackingoff(pInfo) {
            if err := p.podBackoffQ.Add(pInfo); err != nil {
                klog.Errorf(&quot;Error adding pod %v to the backoff queue: %v&quot;, pod.Name, err)
            } else {
                metrics.SchedulerQueueIncomingPods.WithLabelValues(&quot;backoff&quot;, event).Inc()
                p.unschedulableQ.delete(pod)
            }
        } else {
            if err := p.activeQ.Add(pInfo); err != nil {
                klog.Errorf(&quot;Error adding pod %v to the scheduling queue: %v&quot;, pod.Name, err)
            } else {
                metrics.SchedulerQueueIncomingPods.WithLabelValues(&quot;active&quot;, event).Inc()
                p.unschedulableQ.delete(pod)
            }
        }
    }
    p.moveRequestCycle = p.schedulingCycle
    p.cond.Broadcast()
}

资源更新后,不可调度的pod会被重新放进activeQ或者backoffQ。同时,moveRequestCycle会设置为当前的schedulingCycle。
最后看看AddUnschedulableIfNotPresent方法,以及这个方法被使用的场景:

// 文件路径: pkg/scheduler/internal/queue/scheduling_queue.go
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    pod := pInfo.Pod
    if p.unschedulableQ.get(pod) != nil {
        return fmt.Errorf(&quot;pod: %v is already present in unschedulable queue&quot;, nsNameForPod(pod))
    }

    // Refresh the timestamp since the pod is re-added.
    pInfo.Timestamp = p.clock.Now()
    if _, exists, _ := p.activeQ.Get(pInfo); exists {
        return fmt.Errorf(&quot;pod: %v is already present in the active queue&quot;, nsNameForPod(pod))
    }
    if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
        return fmt.Errorf(&quot;pod %v is already present in the backoff queue&quot;, nsNameForPod(pod))
    }

    // If a move request has been received, move it to the BackoffQ, otherwise move
    // it to unschedulableQ.
    if p.moveRequestCycle &gt;= podSchedulingCycle {
        if err := p.podBackoffQ.Add(pInfo); err != nil {
            return fmt.Errorf(&quot;error adding pod %v to the backoff queue: %v&quot;, pod.Name, err)
        }
        metrics.SchedulerQueueIncomingPods.WithLabelValues(&quot;backoff&quot;, ScheduleAttemptFailure).Inc()
    } else {
        p.unschedulableQ.addOrUpdate(pInfo)
        metrics.SchedulerQueueIncomingPods.WithLabelValues(&quot;unschedulable&quot;, ScheduleAttemptFailure).Inc()
    }

    p.PodNominator.AddNominatedPod(pod, &quot;&quot;)
    return nil
}

// 文件路径: pkg/scheduler/factory.go
func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
    return func(podInfo *framework.QueuedPodInfo, err error) {
        pod := podInfo.Pod
        if err == core.ErrNoNodesAvailable {
            klog.V(2).InfoS(&quot;Unable to schedule pod; no nodes are registered to the cluster; waiting&quot;, &quot;pod&quot;, klog.KObj(pod))
        } else if _, ok := err.(*core.FitError); ok {
            klog.V(2).InfoS(&quot;Unable to schedule pod; no fit; waiting&quot;, &quot;pod&quot;, klog.KObj(pod), &quot;err&quot;, err)
        } else if apierrors.IsNotFound(err) {
            klog.V(2).Infof(&quot;Unable to schedule %v/%v: possibly due to node not found: %v; waiting&quot;, pod.Namespace, pod.Name, err)
            if errStatus, ok := err.(apierrors.APIStatus); ok &amp;&amp; errStatus.Status().Details.Kind == &quot;node&quot; {
                nodeName := errStatus.Status().Details.Name
                // when node is not found, We do not remove the node right away. Trying again to get
                // the node and if the node is still not found, then remove it from the scheduler cache.
                _, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
                if err != nil &amp;&amp; apierrors.IsNotFound(err) {
                    node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
                    if err := schedulerCache.RemoveNode(&amp;node); err != nil {
                        klog.V(4).Infof(&quot;Node %q is not found; failed to remove it from the cache.&quot;, node.Name)
                    }
                }
            }
        } else {
            klog.ErrorS(err, &quot;Error scheduling pod; retrying&quot;, &quot;pod&quot;, klog.KObj(pod))
        }

        // Check if the Pod exists in informer cache.
        cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
        if err != nil {
            klog.Warningf(&quot;Pod %v/%v doesn&#039;t exist in informer cache: %v&quot;, pod.Namespace, pod.Name, err)
            return
        }
        // As &lt;cachedPod&gt; is from SharedInformer, we need to do a DeepCopy() here.
        podInfo.Pod = cachedPod.DeepCopy()
        if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
            klog.Error(err)
        }
    }
}

MakeDefaultErrorFunc方法会在构建调度器实例时被传入,作为默认的错误处理方法,在pod调度失败时,默认执行此方法,在此方法里,会调用AddUnschedulableIfNotPresent,会有一个判断:

  1. 如果moveRequestCycle大于等于当前的podSchedulingCycle,则当前应该对之前已经失败的pod进行重试,也就是放进backoffQ里
  2. 如果不满足,则放进unscheduleableQ里

结合moveRequestCycle的变更机制,可以知道,只有集群资源发生过变更,moveRequestCycle才会等于podSchedulingCycle。结合起来,可以理解一下这里错误处理的细节。在pod调度失败时,正常情况下,会被放进unscheduleableQ队列,但是在某些情况下,pod刚刚调度失败,在错误处理之前,忽然发生了资源变更,紧接着再调用错误处理回调,这个时候,由于在这个错误处理的间隙,集群的状态已经发生了变化,所以可以认为这个pod应该有了被调度成功的可能性,所以就被放进了backoffQ重试队列种,等待快速重试。
这样的一种资源更新回调、错误处理机制,本质上就是为了尽可能快的把调度失败的pod重新调取起来,缩短等待调度的时间。
最终队列的工作方式如下:
K8S调度源码 - 图1

3. 调度器的运行

3.1 kube-scheduler 的设计

kube-scheduler 的目的就是为每一个 pod 选择一个合适的 node,整体流程可以概括为三步,获取未调度的 podList,通过执行一系列调度算法为 pod 选择一个合适的 node,提交数据到 apiserver,其核心则是一系列调度算法的设计与执行。
官方对 kube-scheduler 的调度流程描述 The Kubernetes Scheduler:

For given pod:

+---------------------------------------------+
|               Schedulable nodes:            |
|                                             |
| +--------+    +--------+      +--------+    |
| | node 1 |    | node 2 |      | node 3 |    |
| +--------+    +--------+      +--------+    |
|                                             |
+-------------------+-------------------------+
                    |
                    |
                    v
+-------------------+-------------------------+

Pred. filters: node 3 doesn't have enough resource

+-------------------+-------------------------+
                    |
                    |
                    v
+-------------------+-------------------------+
|             remaining nodes:                |
|   +--------+                 +--------+     |
|   | node 1 |                 | node 2 |     |
|   +--------+                 +--------+     |
|                                             |
+-------------------+-------------------------+
                    |
                    |
                    v
+-------------------+-------------------------+

Priority function:    node 1: p=2
                      node 2: p=5

+-------------------+-------------------------+
                    |
                    |
                    v
        select max{node priority} = node 2

kube-scheduler 目前包含两部分调度算法 predicates 和 priorities,首先执行 predicates 算法过滤部分 node 然后执行 priorities 算法为所有 node 打分,最后从所有 node 中选出分数最高的最为最佳的 node。

3.2 kube-scheduler 源码分析

kubernetes 中所有组件的启动流程都是类似的,首先会解析命令行参数、添加默认值,kube-scheduler 的默认参数在 k8s.io/kubernetes/pkg/scheduler/apis/config/v1alpha1/defaults.go 中定义的。然后会执行 run 方法启动主逻辑,下面直接看 kube-scheduler 的主逻辑 run 方法执行过程。
Run() 方法主要做了以下工作:

  • 初始化 scheduler 对象
  • 启动 kube-scheduler server,kube-scheduler 监听 10251 和 10259 端口,10251 端口不需要认证,可以获取 healthz metrics 等信息,10259 为安全端口,需要认证
  • 启动所有的 informer
  • 执行 sched.Run() 方法,执行主调度逻辑

k8s.io/kubernetes/cmd/kube-scheduler/app/server.go:160

func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, registryOptions ...Option) error {
    ......
    // 1、初始化 scheduler 对象
    sched, err := scheduler.New(......)
    if err != nil {
        return err
    }

    // 2、启动事件广播
    if cc.Broadcaster != nil && cc.EventClient != nil {
        cc.Broadcaster.StartRecordingToSink(stopCh)
    }
    if cc.LeaderElectionBroadcaster != nil && cc.CoreEventClient != nil {
        cc.LeaderElectionBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
    }

    ......
    // 3、启动 http server
    if cc.InsecureServing != nil {
        separateMetrics := cc.InsecureMetricsServing != nil
        handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
        if err := cc.InsecureServing.Serve(handler, 0, stopCh); err != nil {
            return fmt.Errorf("failed to start healthz server: %v", err)
        }
    }
    ......
    // 4、启动所有 informer
    go cc.PodInformer.Informer().Run(stopCh)
    cc.InformerFactory.Start(stopCh)

    cc.InformerFactory.WaitForCacheSync(stopCh)

    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }

    ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
    defer cancel()
    go func() {
        select {
        case <-stopCh:
            cancel()
        case <-ctx.Done():
        }
    }()

    // 5、选举 leader
    if cc.LeaderElection != nil {
        ......
    }
    // 6、执行 sched.Run() 方法
    run(ctx)
    return fmt.Errorf("finished without leader elect")
}

下面是 pod informer 的启动逻辑,只监听 status.phase 不为 succeeded 以及 failed 状态的 pod,即非 terminating 的 pod。
k8s.io/kubernetes/pkg/scheduler/factory/factory.go:527

func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer {
    selector := fields.ParseSelectorOrDie(
        "status.phase!=" + string(v1.PodSucceeded) +
            ",status.phase!=" + string(v1.PodFailed))
    lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector)
    return &podInformer{
        informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
    }
}

然后继续看 Run() 方法中最后执行的 sched.Run() 调度循环逻辑,若 informer 中的 cache 同步完成后会启动一个循环逻辑执行 sched.scheduleOne 方法。
k8s.io/kubernetes/pkg/scheduler/scheduler.go:313

func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }

    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

scheduleOne() 每次对一个 pod 进行调度,主要有以下步骤:

  • 从 scheduler 调度队列中取出一个 pod,如果该 pod 处于删除状态则跳过
  • 执行调度逻辑 sched.schedule() 返回通过预算及优选算法过滤后选出的最佳 node
  • 如果过滤算法没有选出合适的 node,则返回 core.FitError
  • 若没有合适的 node 会判断是否启用了抢占策略,若启用了则执行抢占机制
  • 判断是否需要 VolumeScheduling 特性
  • 执行 reserve plugin
  • pod 对应的 spec.NodeName 写上 scheduler 最终选择的 node,更新 scheduler cache
  • 请求 apiserver 异步处理最终的绑定操作,写入到 etcd
  • 执行 permit plugin
  • 执行 prebind plugin
  • 执行 postbind plugin

k8s.io/kubernetes/pkg/scheduler/scheduler.go:515

func (sched *Scheduler) scheduleOne() {
    fwk := sched.Framework

    pod := sched.NextPod()
    if pod == nil {
        return
    }
    // 1.判断 pod 是否处于删除状态
    if pod.DeletionTimestamp != nil {
        ......
    }

    // 2.执行调度策略选择 node
    start := time.Now()
    pluginContext := framework.NewPluginContext()
    scheduleResult, err := sched.schedule(pod, pluginContext)
    if err != nil {
        if fitError, ok := err.(*core.FitError); ok {
            // 3.若启用抢占机制则执行
            if sched.DisablePreemption {
                ......
            } else {
                preemptionStartTime := time.Now()
                sched.preempt(pluginContext, fwk, pod, fitError)
                ......
            }
            ......
            metrics.PodScheduleFailures.Inc()
        } else {
            klog.Errorf("error selecting node for pod: %v", err)
            metrics.PodScheduleErrors.Inc()
        }
        return
    }
    ......
    assumedPod := pod.DeepCopy()

    // 4.判断是否需要 VolumeScheduling 特性
    allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        klog.Errorf("error assuming volumes: %v", err)
        metrics.PodScheduleErrors.Inc()
        return
    }

    // 5.执行 "reserve" plugins
    if sts := fwk.RunReservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
        .....
    }

    // 6.为 pod 设置 NodeName 字段,更新 scheduler 缓存
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        ......
    }

    // 7.异步请求 apiserver
    go func() {
        // Bind volumes first before Pod
        if !allBound {
            err := sched.bindVolumes(assumedPod)
            if err != nil {
                ......
                return
            }
        }

        // 8.执行 "permit" plugins
        permitStatus := fwk.RunPermitPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
        if !permitStatus.IsSuccess() {
            ......
        }
        // 9.执行 "prebind" plugins
        preBindStatus := fwk.RunPreBindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
        if !preBindStatus.IsSuccess() {
            ......
        }
        err := sched.bind(assumedPod, scheduleResult.SuggestedHost, pluginContext)
        ......
        if err != nil {
            ......
        } else {
            ......
            // 10.执行 "postbind" plugins
            fwk.RunPostBindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
        }
    }()
}

scheduleOne() 中通过调用 sched.schedule() 来执行预选与优选算法处理:
k8s.io/kubernetes/pkg/scheduler/scheduler.go:337

func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) {
    result, err := sched.Algorithm.Schedule(pod, pluginContext)
    if err != nil {
    ......
    }
    return result, err
}

sched.Algorithm 是一个 interface,主要包含四个方法,GenericScheduler 是其具体的实现:
k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:131

type ScheduleAlgorithm interface {
    Schedule(*v1.Pod, *framework.PluginContext) (scheduleResult ScheduleResult, err error)
    Preempt(*framework.PluginContext, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
    Predicates() map[string]predicates.FitPredicate
    Prioritizers() []priorities.PriorityConfig
}
  • Schedule():正常调度逻辑,包含预算与优选算法的执行
  • Preempt():抢占策略,在 pod 调度发生失败的时候尝试抢占低优先级的 pod,函数返回发生抢占的 node,被 抢占的 pods 列表,nominated node name 需要被移除的 pods 列表以及 error
  • Predicates():predicates 算法列表
  • Prioritizers():prioritizers 算法列表

kube-scheduler 提供的默认调度为 DefaultProvider,DefaultProvider 配置的 predicates 和 priorities policies 在 k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go 中定义,算法具体实现是在 k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/ 和k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/ 中,默认的算法如下所示:
pkg/scheduler/algorithmprovider/defaults/defaults.go

func defaultPredicates() sets.String {
    return sets.NewString(
        predicates.NoVolumeZoneConflictPred,
        predicates.MaxEBSVolumeCountPred,
        predicates.MaxGCEPDVolumeCountPred,
        predicates.MaxAzureDiskVolumeCountPred,
        predicates.MaxCSIVolumeCountPred,
        predicates.MatchInterPodAffinityPred,
        predicates.NoDiskConflictPred,
        predicates.GeneralPred,
        predicates.CheckNodeMemoryPressurePred,
        predicates.CheckNodeDiskPressurePred,
        predicates.CheckNodePIDPressurePred,
        predicates.CheckNodeConditionPred,
        predicates.PodToleratesNodeTaintsPred,
        predicates.CheckVolumeBindingPred,
    )
}

func defaultPriorities() sets.String {
    return sets.NewString(
        priorities.SelectorSpreadPriority,
        priorities.InterPodAffinityPriority,
        priorities.LeastRequestedPriority,
        priorities.BalancedResourceAllocation,
        priorities.NodePreferAvoidPodsPriority,
        priorities.NodeAffinityPriority,
        priorities.TaintTolerationPriority,
        priorities.ImageLocalityPriority,
    )
}

下面继续看 sched.Algorithm.Schedule() 调用具体调度算法的过程:

  • 检查 pod pvc 信息
  • 执行 prefilter plugins
  • 获取 scheduler cache 的快照,每次调度 pod 时都会获取一次快照
  • 执行 g.findNodesThatFit() 预选算法
  • 执行 postfilter plugin
  • 若 node 为 0 直接返回失败的 error,若 node 数为1 直接返回该 node
  • 执行 g.priorityMetaProducer() 获取 metaPrioritiesInterface,计算 pod 的metadata,检查该 node 上是否有相同 meta 的 pod
  • 执行 PrioritizeNodes() 算法
  • 执行 g.selectHost() 通过得分选择一个最佳的 node

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:186

func (g *genericScheduler) Schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
    ......
    // 1.检查 pod pvc 
    if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
        return result, err
    }

    // 2.执行 "prefilter" plugins
    preFilterStatus := g.framework.RunPreFilterPlugins(pluginContext, pod)
    if !preFilterStatus.IsSuccess() {
        return result, preFilterStatus.AsError()
    }

    // 3.获取 node 数量
    numNodes := g.cache.NodeTree().NumNodes()
    if numNodes == 0 {
        return result, ErrNoNodesAvailable
    }

    // 4.快照 node 信息
    if err := g.snapshot(); err != nil {
        return result, err
    }

    // 5.执行预选算法
    startPredicateEvalTime := time.Now()
    filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod)
    if err != nil {
        return result, err
    }
    // 6.执行 "postfilter" plugins
    postfilterStatus := g.framework.RunPostFilterPlugins(pluginContext, pod, filteredNodes, filteredNodesStatuses)
    if !postfilterStatus.IsSuccess() {
        return result, postfilterStatus.AsError()
    }

    // 7.预选后没有合适的 node 直接返回
    if len(filteredNodes) == 0 {
        ......
    }

    startPriorityEvalTime := time.Now()
    // 8.若只有一个 node 则直接返回该 node
    if len(filteredNodes) == 1 {
        return ScheduleResult{
            SuggestedHost:  filteredNodes[0].Name,
            EvaluatedNodes: 1 + len(failedPredicateMap),
            FeasibleNodes:  1,
        }, nil
    }

    // 9.获取 pod meta 信息,执行优选算法
    metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
    priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework,      pluginContext)
    if err != nil {
        return result, err
    }

    // 10.根据打分选择最佳的 node
    host, err := g.selectHost(priorityList)
    trace.Step("Selecting host done")
    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
        FeasibleNodes:  len(filteredNodes),
    }, err
}

4 调度器构造

Scheduler最外层调度器的类型定义位于文件pkg/scheduler/scheduler.go。
Scheduler struct源码分析:

// 文件路径 pkg/scheduler/scheduler.go
// Scheduler
type Scheduler struct{
  SchedulerCache internalcache.Cache
  Algorithm core.ScheduleAlgorithm
  // NextPod获取下一个待调度的pod
  NextPod func()*framework.QueuedPodInfo
  // Error是默认的调度失败处理方法
  Error func(*framework.QueuedPodInfo,error)
  // Close this to shut down the scheduler.
  StopEverything &lt;-chanstruct{}
  // 上面提到的三级队列
  SchedulingQueue internalqueue.SchedulingQueue
  // 调度配置很重要,控制整个调度过程的核心
  Profiles profile.Map
  client clientset.Interface
}

scheduler.New() 方法是如何初始化 scheduler 结构体的,该方法主要的功能是初始化默认的调度算法以及默认的调度器 GenericScheduler。

  • 创建 scheduler 配置文件
  • 根据默认的 DefaultProvider 初始化schedulerAlgorithmSource 然后加载默认的预选及优选算法,然后初始化 GenericScheduler
  • 若启动参数提供了 policy config 则使用其覆盖默认的预选及优选算法并初始化 GenericScheduler,不过该参数现已被弃用

scheduler.New() 源码分析:
k8s.io/kubernetes/pkg/scheduler/scheduler.go:166

// 文件路径 pkg/scheduler/scheduler.go
// New returns a Scheduler
func New(client clientset.Interface,
    informerFactory informers.SharedInformerFactory,
    recorderFactory profile.RecorderFactory,
    stopCh &lt;-chan struct{},
    opts ...Option) (*Scheduler, error) {
    stopEverything := stopCh
    if stopEverything == nil {
        stopEverything = wait.NeverStop
    }
    // 获取默认的调度器选项
    // 1. 里面会设定一些默认的组件参数
    // 2. 里面会给定默认的algorithmSourceProvider,这个很关键,后续的调度全部依赖这里提供的算法
    options := defaultSchedulerOptions
    for _, opt := range opts {
        opt(&amp;options)
    }
    // 初始化调度缓存
    schedulerCache := internalcache.New(30*time.Second, stopEverything)
    // registry是一个字典,里面存放了插件名与插件的工厂方法
    // 默认有接近30个插件
    registry := frameworkplugins.NewInTreeRegistry()
    if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
        return nil, err
    }
    snapshot := internalcache.NewEmptySnapshot()
    // 基于配置 创建configurator实例
    configurator := &amp;Configurator{
        client:                   client,
        recorderFactory:          recorderFactory,
        informerFactory:          informerFactory,
        schedulerCache:           schedulerCache,
        StopEverything:           stopEverything,
        percentageOfNodesToScore: options.percentageOfNodesToScore,
        podInitialBackoffSeconds: options.podInitialBackoffSeconds,
        podMaxBackoffSeconds:     options.podMaxBackoffSeconds,
        profiles:                 append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
        registry:                 registry,
        nodeInfoSnapshot:         snapshot,
        extenders:                options.extenders,
        frameworkCapturer:        options.frameworkCapturer,
    }
    metrics.Register()
    var sched *Scheduler
    source := options.schedulerAlgorithmSource
    switch {
        // 这里Provider默认不为空,会走这个分支
    case source.Provider != nil:
        // Create the config from a named algorithm provider.
        sc, err := configurator.createFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf(&quot;couldn&#039;t create scheduler using provider %q: %v&quot;, *source.Provider, err)
        }
        sched = sc
    case source.Policy != nil:
        // Create the config from a user specified policy source.
        policy := &amp;schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        configurator.extenders = policy.Extenders
        sc, err := configurator.createFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf(&quot;couldn&#039;t create scheduler from policy: %v&quot;, err)
        }
        sched = sc
    default:
        return nil, fmt.Errorf(&quot;unsupported algorithm source: %v&quot;, source)
    }
    // Additional tweaks to the config produced by the configurator.
    sched.StopEverything = stopEverything
    sched.client = client
    // 这一步启动所有的事件监听
    addAllEventHandlers(sched, informerFactory)
    return sched, nil
}

上述的初始化过程中,有两个关键步骤,一个是默认的algorithmSourceProvider,一个是addAllEventHandlers:

// 文件路径 pkg/scheduler/algorithmprovider/registry.go
// 默认的调度算法配置就是由该方法提供的
// 可以看到,各个阶段的名字以及对应的算法名;
// 部分插件还设置了权重,以供计算后进一步的筛选使用
func getDefaultConfig() *schedulerapi.Plugins {
    return &amp;schedulerapi.Plugins{
        QueueSort: &amp;schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: queuesort.Name},
            },
        },
        PreFilter: &amp;schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: noderesources.FitName},
                {Name: nodeports.Name},
                {Name: podtopologyspread.Name},
                {Name: interpodaffinity.Name},
                {Name: volumebinding.Name},
            },
        },
        Filter: &amp;schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: nodeunschedulable.Name},
                {Name: noderesources.FitName},
                {Name: nodename.Name},
                {Name: nodeports.Name},
                {Name: nodeaffinity.Name},
                {Name: volumerestrictions.Name},
                {Name: tainttoleration.Name},
                {Name: nodevolumelimits.EBSName},
                {Name: nodevolumelimits.GCEPDName},
                {Name: nodevolumelimits.CSIName},
                {Name: nodevolumelimits.AzureDiskName},
                {Name: volumebinding.Name},
                {Name: volumezone.Name},
                {Name: podtopologyspread.Name},
                {Name: interpodaffinity.Name},
            },
        },
        PostFilter: &amp;schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: defaultpreemption.Name},
            },
        },
        PreScore: &amp;schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: interpodaffinity.Name},
                {Name: podtopologyspread.Name},
                {Name: tainttoleration.Name},
            },
        },
        Score: &amp;schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: noderesources.BalancedAllocationName, Weight: 1},
                {Name: imagelocality.Name, Weight: 1},
                {Name: interpodaffinity.Name, Weight: 1},
                {Name: noderesources.LeastAllocatedName, Weight: 1},
                {Name: nodeaffinity.Name, Weight: 1},
                {Name: nodepreferavoidpods.Name, Weight: 10000},
                // Weight is doubled because:
                // - This is a score coming from user preference.
                // - It makes its signal comparable to NodeResourcesLeastAllocated.
                {Name: podtopologyspread.Name, Weight: 2},
                {Name: tainttoleration.Name, Weight: 1},
            },
        },
        Reserve: &amp;schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: volumebinding.Name},
            },
        },
        PreBind: &amp;schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: volumebinding.Name},
            },
        },
        Bind: &amp;schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: defaultbinder.Name},
            },
        },
    }
}
// 文件路径 pkg/scheduler/eventhandlers.go
// addAllEventHandlers 
// 其实就是加回调,省略了部分代码
func addAllEventHandlers(
    sched *Scheduler,
    informerFactory informers.SharedInformerFactory,
) {
    informerFactory.Core().V1().Pods().Informer().AddEventHandler(
        ...
    )
    informerFactory.Core().V1().Pods().Informer().AddEventHandler(
        ...
    )
    informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
        ...
    )
    if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
        informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
            ...
        )
    }
    informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
        ...
    )
    informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
        ...
    )

    informerFactory.Core().V1().Services().Informer().AddEventHandler(
        ...
    )

    informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
        ...
    )
}

addAllEventHandlers方法里把client-go的内容和调度队列的内容衔接起来了,调度器通过这些给这些资源加回调方法更新本地状态同时更新调度队列里的unscheduleableQ。

5. 调度器的predicates 和 priorities调度算法源码分析

5.1 predicates 调度算法源码分析

predicates 算法主要是对集群中的 node 进行过滤,选出符合当前 pod 运行的 nodes。

5.1.1 调度算法说明

上节已经提到默认的调度算法在pkg/scheduler/algorithmprovider/defaults/defaults.go中定义了:

func defaultPredicates() sets.String {
    return sets.NewString(
        predicates.NoVolumeZoneConflictPred,
        predicates.MaxEBSVolumeCountPred,
        predicates.MaxGCEPDVolumeCountPred,
        predicates.MaxAzureDiskVolumeCountPred,
        predicates.MaxCSIVolumeCountPred,
        predicates.MatchInterPodAffinityPred,
        predicates.NoDiskConflictPred,
        predicates.GeneralPred,
        predicates.CheckNodeMemoryPressurePred,
        predicates.CheckNodeDiskPressurePred,
        predicates.CheckNodePIDPressurePred,
        predicates.CheckNodeConditionPred,
        predicates.PodToleratesNodeTaintsPred,
        predicates.CheckVolumeBindingPred,
    )
}

下面是对默认调度算法的一些说明:

predicates 算法 说明
GeneralPred GeneralPred 包含 PodFitsResources、PodFitsHost,、PodFitsHostPorts、PodMatchNodeSelector 四种算法
NoDiskConflictPred 检查多个 Pod 声明挂载的持久化 Volume 是否有冲突
MaxGCEPDVolumeCountPred 检查 GCE 持久化 Volume 是否超过了一定数目
MaxAzureDiskVolumeCountPred 检查 Azure 持久化 Volume 是否超过了一定数目
MaxCSIVolumeCountPred 检查 CSI 持久化 Volume 是否超过了一定数目(已废弃)
MaxEBSVolumeCountPred 检查 EBS 持久化 Volume 是否超过了一定数目
NoVolumeZoneConflictPred 检查持久化 Volume 的 Zone(高可用域)标签是否与节点的 Zone 标签相匹配
CheckVolumeBindingPred 检查该 Pod 对应 PV 的 nodeAffinity 字段是否跟某个节点的标签相匹配,Local Persistent Volume(本地持久化卷)必须使用 nodeAffinity 来跟某个具体的节点绑定
PodToleratesNodeTaintsPred 检查 Node 的 Taint 机制,只有当 Pod 的 Toleration 字段与 Node 的 Taint 字段能够匹配时,这个 Pod 才能被调度到该节点上
MatchInterPodAffinityPred 检查待调度 Pod 与 Node 上的已有 Pod 之间的亲密(affinity)和反亲密(anti-affinity)关系
CheckNodeConditionPred 检查 NodeCondition
CheckNodePIDPressurePred 检查 NodePIDPressure
CheckNodeDiskPressurePred 检查 NodeDiskPressure
CheckNodeMemoryPressurePred 检查 NodeMemoryPressure

默认的 predicates 调度算法主要分为五种类型:
1、第一种类型叫作 GeneralPredicates,包含 PodFitsResources、PodFitsHost、PodFitsHostPorts、PodMatchNodeSelector 四种策略,其具体含义如下所示:

  • PodFitsHost:检查宿主机的名字是否跟 Pod 的 spec.nodeName 一致
  • PodFitsHostPorts:检查 Pod 申请的宿主机端口(spec.nodePort)是不是跟已经被使用的端口有冲突
  • PodMatchNodeSelector:检查 Pod 的 nodeSelector 或者 nodeAffinity 指定的节点是否与节点匹配等
  • PodFitsResources:检查主机的资源是否满足 Pod 的需求,根据实际已经分配(Request)的资源量做调度

kubelet 在启动 Pod 前,会执行一个 Admit 操作来进行二次确认,这里二次确认的规则就是执行一遍 GeneralPredicates。
2、第二种类型是与 Volume 相关的过滤规则,主要有NoDiskConflictPred、MaxGCEPDVolumeCountPred、MaxAzureDiskVolumeCountPred、MaxCSIVolumeCountPred、MaxEBSVolumeCountPred、NoVolumeZoneConflictPred、CheckVolumeBindingPred。
3、第三种类型是宿主机相关的过滤规则,主要是 PodToleratesNodeTaintsPred。
4、第四种类型是 Pod 相关的过滤规则,主要是 MatchInterPodAffinityPred。
5、第五种类型是新增的过滤规则,与宿主机的运行状况有关,主要有 CheckNodeCondition、 CheckNodeMemoryPressure、CheckNodePIDPressure、CheckNodeDiskPressure 四种。若启用了 TaintNodesByCondition FeatureGates 则在 predicates 算法中会将该四种算法移除,TaintNodesByCondition 基于 node conditions 当 node 出现 pressure 时自动为 node 打上 taints 标签,该功能在 v1.8 引入,v1.12 成为 beta 版本,目前 v1.16 中也是 beta 版本,但在 v1.13 中该功能已默认启用。
predicates 调度算法也有一个顺序,要不然在一台资源已经严重不足的宿主机上,上来就开始计算 PodAffinityPredicate 是没有实际意义的,其默认顺序如下所示:
k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go:146

var (
    predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
        GeneralPred, HostNamePred, PodFitsHostPortsPred,
        MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
        PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
        CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
        MaxAzureDiskVolumeCountPred, MaxCinderVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
        CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, EvenPodsSpreadPred, MatchInterPodAffinityPred}
)

5.1.2 源码分析

上节中已经说到调用预选以及优选算法的逻辑在 k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:189中,

func (g *genericScheduler) Schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
    ......
    // 执行 predicates 策略
    filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod)
    ......
    // 执行 priorities 策略
    priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework,        pluginContext)
    ......
    return
}

findNodesThatFit() 是 predicates 策略的实际调用方法,其基本流程如下:

  • 设定最多需要检查的节点数,作为预选节点数组的容量,避免总节点过多影响调度效率
  • 通过NodeTree()不断获取下一个节点来判断该节点是否满足 pod 的调度条件
  • 通过之前注册的各种 predicates 函数来判断当前节点是否符合 pod 的调度条件
  • 最后返回满足调度条件的 node 列表,供下一步的优选操作

checkNode()是一个校验 node 是否符合要求的函数,其实际调用到的核心函数是podFitsOnNode(),再通过workqueue() 并发执行checkNode() 函数,workqueue() 会启动 16 个 goroutine 来并行计算需要筛选的 node 列表,其主要流程如下:

  • 通过 cache 中的 NodeTree() 不断获取下一个 node
  • 将当前 node 和 pod 传入podFitsOnNode() 方法中来判断当前 node 是否符合要求
  • 如果当前 node 符合要求就将当前 node 加入预选节点的数组中filtered
  • 如果当前 node 不满足要求,则加入到失败的数组中,并记录原因
  • 通过workqueue.ParallelizeUntil()并发执行checkNode()函数,一旦找到足够的可行节点数后就停止筛选更多节点
  • 若配置了 extender 则再次进行过滤已筛选出的 node

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:464

func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
    var filtered []*v1.Node
    failedPredicateMap := FailedPredicateMap{}
    filteredNodesStatuses := framework.NodeToStatusMap{}

    if len(g.predicates) == 0 {
        filtered = g.cache.ListNodes()
    } else {
        allNodes := int32(g.cache.NodeTree().NumNodes())
        // 1.设定最多需要检查的节点数
        numNodesToFind := g.numFeasibleNodesToFind(allNodes)

        filtered = make([]*v1.Node, numNodesToFind)
        ......

        // 2.获取该 pod 的 meta 值 
        meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)

        // 3.checkNode 为执行预选算法的函数
        checkNode := func(i int) {
            nodeName := g.cache.NodeTree().Next()

            // 4.podFitsOnNode 最终执行预选算法的函数 
            fits, failedPredicates, status, err := g.podFitsOnNode(
                ......
            )
            if err != nil {
                ......
            }
            if fits {
                length := atomic.AddInt32(&filteredLen, 1)
                if length > numNodesToFind {
                    cancel()
                    atomic.AddInt32(&filteredLen, -1)
                } else {
                    filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
                }
            } else {
                ......
            }
        }

        // 5.启动 16 个 goroutine 并发执行 checkNode 函数
        workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

        filtered = filtered[:filteredLen]
        if len(errs) > 0 {
            ......
        }
    }

    // 6.若配置了 extender 则再次进行过滤
    if len(filtered) > 0 && len(g.extenders) != 0 {
        ......
    }
    return filtered, failedPredicateMap, filteredNodesStatuses, nil
}

然后继续看如何设定最多需要检查的节点数,此过程由numFeasibleNodesToFind()进行处理,基本流程如下:

  • 如果总的 node 节点小于minFeasibleNodesToFind(默认为100)则直接返回总节点数
  • 如果节点数超过 100,则取指定百分比 percentageOfNodesToScore(默认值为 50)的节点数 ,当该百分比后的数目仍小于minFeasibleNodesToFind,则返回minFeasibleNodesToFind
  • 如果百分比后的数目大于minFeasibleNodesToFind,则返回该百分比的节点数

所以当节点数小于 100 时直接返回,大于 100 时只返回其总数的 50%。percentageOfNodesToScore 参数在 v1.12 引入,默认值为 50,kube-scheduler 在启动时可以设定该参数的值。
k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:441

func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
    if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
        return numAllNodes
    }

    adaptivePercentage := g.percentageOfNodesToScore
    if adaptivePercentage <= 0 {
        adaptivePercentage = schedulerapi.DefaultPercentageOfNodesToScore - numAllNodes/125
        if adaptivePercentage < minFeasibleNodesPercentageToFind {
            adaptivePercentage = minFeasibleNodesPercentageToFind
        }
    }

    numNodes = numAllNodes * adaptivePercentage / 100
    if numNodes < minFeasibleNodesToFind {
        return minFeasibleNodesToFind
    }

    return numNodes
}

pridicates 调度算法的核心是 podFitsOnNode() ,scheduler 的抢占机制也会执行该函数,podFitsOnNode()基本流程如下:

  • 遍历已经注册好的预选策略predicates.Ordering(),按顺序执行对应的策略函数
  • 遍历执行每个策略函数,并返回是否合适,预选失败的原因和错误
  • 如果预选函数执行失败,则加入预选失败的数组中,直接返回,后面的预选函数不会再执行
  • 如果该 node 上存在 nominated pod 则执行两次预选函数

因为引入了抢占机制,此处主要说明一下执行两次预选函数的原因:
第一次循环,若该 pod 为抢占者(nominatedPods),调度器会假设该 pod 已经运行在这个节点上,然后更新meta和nodeInfo,nominatedPods是指执行了抢占机制且已经分配到了 node(pod.Status.NominatedNodeName 已被设定) 但是还没有真正运行起来的 pod,然后再执行所有的预选函数。
第二次循环,不将nominatedPods加入到 node 内。
而只有这两遍 predicates 算法都能通过时,这个 pod 和 node 才会被认为是可以绑定(bind)的。这样做是因为考虑到 pod affinity 等策略的执行,如果当前的 pod 与nominatedPods有依赖关系就会有问题,因为nominatedPods不能保证一定可以调度且在已指定的 node 运行成功,也可能出现被其他高优先级的 pod 抢占等问题,关于抢占问题下篇会详细介绍。
k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:610

func (g *genericScheduler) podFitsOnNode(......) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {
    var failedPredicates []predicates.PredicateFailureReason
    var status *framework.Status

    podsAdded := false

    for i := 0; i < 2; i++ {
        metaToUse := meta
        nodeInfoToUse := info
        if i == 0 {
            // 1.第一次循环加入 NominatedPods,计算 meta, nodeInfo
            podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
        } else if !podsAdded || len(failedPredicates) != 0 {
            break
        }
        // 2.按顺序执行所有预选函数
        for _, predicateKey := range predicates.Ordering() {
            var (
                fit     bool
                reasons []predicates.PredicateFailureReason
                err     error
            )
            if predicate, exist := predicateFuncs[predicateKey]; exist {
                fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
                if err != nil {
                    return false, []predicates.PredicateFailureReason{}, nil, err
                }

                // 3.任何一个预选函数执行失败则直接返回
                if !fit {
                    failedPredicates = append(failedPredicates, reasons...)                   
                    if !alwaysCheckAllPredicates {
                        klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
                            "evaluation is short circuited and there are chances " +
                            "of other predicates failing as well.")
                        break
                    }
                }
            }
        }
        // 4.执行 Filter Plugin
        status = g.framework.RunFilterPlugins(pluginContext, pod, info.Node().Name)
        if !status.IsSuccess() && !status.IsUnschedulable() {
            return false, failedPredicates, status, status.AsError()
        }
    }

    return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
}

至此,关于 predicates 调度算法的执行过程已经分析完。

5.2 priorities 调度算法源码分析

priorities 调度算法是在 pridicates 算法后执行的,主要功能是对已经过滤出的 nodes 进行打分并选出最佳的一个 node。

5.2.1 调度算法说明

默认的调度算法在pkg/scheduler/algorithmprovider/defaults/defaults.go中定义了:

func defaultPriorities() sets.String {
    return sets.NewString(
        priorities.SelectorSpreadPriority,
        priorities.InterPodAffinityPriority,
        priorities.LeastRequestedPriority,
        priorities.BalancedResourceAllocation,
        priorities.NodePreferAvoidPodsPriority,
        priorities.NodeAffinityPriority,
        priorities.TaintTolerationPriority,
        priorities.ImageLocalityPriority,
    )
}

默认调度算法的一些说明:

priorities 算法 说明
SelectorSpreadPriority 按 service,rs,statefulset 归属计算 Node 上分布最少的同类 Pod数量,数量越少得分越高,默认权重为1
InterPodAffinityPriority pod 亲和性选择策略,默认权重为1
LeastRequestedPriority 选择空闲资源(CPU 和 Memory)最多的节点,默认权重为1,其计算方式为:score = (cpu((capacity-sum(requested))10/capacity) + memory((capacity-sum(requested))10/capacity))/2
BalancedResourceAllocation CPU、Memory 以及 Volume 资源分配最均衡的节点,默认权重为1,其计算方式为:score = 10 - variance(cpuFraction,memoryFraction,volumeFraction)*10
NodePreferAvoidPodsPriority 判断 node annotation 是否有scheduler.alpha.kubernetes.io/preferAvoidPods 标签,类似于 taints 机制,过滤标签中定义类型的 pod,默认权重为10000
NodeAffinityPriority 节点亲和性选择策略,默认权重为1
TaintTolerationPriority Pod 是否容忍节点上的 Taint,优先调度到标记了 Taint 的节点,默认权重为1
ImageLocalityPriority 待调度 Pod 需要使用的镜像是否存在于该节点,默认权重为1

5.2.2 源码分析

执行 priorities 调度算法的逻辑是在 PrioritizeNodes()函数中,其目的是执行每个 priority 函数为 node 打分,分数为 0-10,其功能主要有:

  • PrioritizeNodes() 通过并行运行各个优先级函数来对节点进行打分
  • 每个优先级函数会给节点打分,打分范围为 0-10 分,0 表示优先级最低的节点,10表示优先级最高的节点
  • 每个优先级函数有各自的权重
  • 优先级函数返回的节点分数乘以权重以获得加权分数
  • 最后计算所有节点的总加权分数

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:691

func PrioritizeNodes(......) (schedulerapi.HostPriorityList, error) {
    // 1.检查是否有自定义配置
    if len(priorityConfigs) == 0 && len(extenders) == 0 {
        result := make(schedulerapi.HostPriorityList, 0, len(nodes))
        for i := range nodes {
            hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
            if err != nil {
                return nil, err
            }
            result = append(result, hostPriority)
        }
        return result, nil
    }
    ......

    results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

    ......
    // 2.使用 workqueue 启动 16 个 goroutine 并发为 node 打分
    workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
        nodeInfo := nodeNameToInfo[nodes[index].Name]
        for i := range priorityConfigs {
            if priorityConfigs[i].Function != nil {
                continue
            }

            var err error
            results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
            if err != nil {
                appendError(err)
                results[i][index].Host = nodes[index].Name
            }
        }
    })

    // 3.执行自定义配置
    for i := range priorityConfigs {
        ......
    }

    wg.Wait()
    if len(errs) != 0 {
        return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
    }

    // 4.运行 Score plugins
    scoresMap, scoreStatus := framework.RunScorePlugins(pluginContext, pod, nodes)
    if !scoreStatus.IsSuccess() {
        return schedulerapi.HostPriorityList{}, scoreStatus.AsError()
    }

    result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    // 5.为每个 node 汇总分数
    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }

        for j := range scoresMap {
            result[i].Score += scoresMap[j][i].Score
        }
    }

    // 6.执行 extender 
    if len(extenders) != 0 && nodes != nil {
        ......
    }
    ......
    return result, nil
}

5.3 总结

本节主要讲述了 kube-scheduler 中的 predicates 调度算法与 priorities 调度算法的执行流程,可以看到 kube-scheduler 中有许多的调度策略,但是想要添加自己的策略并不容易,scheduler 目前已经朝着提升性能与扩展性的方向演进了,其调度部分进行性能优化的一个最根本原则就是尽最大可能将集群信息 cache 化,以便从根本上提高 predicates 和 priorities 调度算法的执行效率。第二个就是在 bind 阶段进行异步处理,只会更新其 cache 里的 pod 和 node 的信息,这种基于“乐观”假设的 API 对象更新方式,在 kubernetes 里被称作 assume,如果这次异步的 bind 过程失败了,其实也没有太大关系,等 scheduler cache 同步之后一切又恢复正常了。除了上述的“cache 化”和“乐观绑定”,还有一个重要的设计,那就是“无锁化”,predicates 调度算法与 priorities 调度算法的执行都是并行的,只有在调度队列和 scheduler cache 进行操作时,才需要加锁,而对调度队列的操作并不影响主流程。

6 优先级与抢占机制源码分析

上面已经分析了 kube-scheduler 的代码逻辑以及 predicates 与 priorities 算法,本节会继续讲 scheduler 中的一个重要机制,pod 优先级与抢占机制(Pod Priority and Preemption),该功能是在 v1.8 中引入的,v1.11 中该功能为 beta 版本且默认启用了,v1.14 为 stable 版本。

6.1 为什么要有优先级与抢占机制

正常情况下,当一个 pod 调度失败后,就会被暂时 “搁置” 处于 pending 状态,直到 pod 被更新或者集群状态发生变化,调度器才会对这个 pod 进行重新调度。但在实际的业务场景中会存在在线与离线业务之分,若在线业务的 pod 因资源不足而调度失败时,此时就需要离线业务下掉一部分为在线业务提供资源,即在线业务要抢占离线业务的资源,此时就需要 scheduler 的优先级和抢占机制了,该机制解决的是 pod 调度失败时该怎么办的问题,若该 pod 的优先级比较高此时并不会被”搁置”,而是会”挤走”某个 node 上的一些低优先级的 pod,这样就可以保证高优先级的 pod 调度成功。

6.2 优先级与抢占机制源码分析

抢占发生的原因,一定是一个高优先级的 pod 调度失败,我们称这个 pod 为“抢占者”,称被抢占的 pod 为“牺牲者”(victims)。而 kubernetes 调度器实现抢占算法的一个最重要的设计,就是在调度队列的实现里,使用了两个不同的队列。
第一个队列叫作 activeQ,凡是在 activeQ 里的 pod,都是下一个调度周期需要调度的对象。所以,当你在 kubernetes 集群里新创建一个 pod 的时候,调度器会将这个 pod 入队到 activeQ 里面,调度器不断从队列里出队(pop)一个 pod 进行调度,实际上都是从 activeQ 里出队的。
第二个队列叫作 unschedulableQ,专门用来存放调度失败的 pod,当一个 unschedulableQ 里的 pod 被更新之后,调度器会自动把这个 pod 移动到 activeQ 里,从而给这些调度失败的 pod “重新做人”的机会。
当 pod 拥有了优先级之后,高优先级的 pod 就可能会比低优先级的 pod 提前出队,从而尽早完成调度过程。
k8s.io/kubernetes/pkg/scheduler/internal/queue/scheduling_queue.go

// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(stop <-chan struct{}, fwk framework.Framework) SchedulingQueue {
    return NewPriorityQueue(stop, fwk)
}
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(stop <-chan struct{}, fwk framework.Framework) *PriorityQueue {
    return NewPriorityQueueWithClock(stop, util.RealClock{}, fwk)
}

// NewPriorityQueueWithClock creates a PriorityQueue which uses the passed clock for time.
func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock, fwk framework.Framework) *PriorityQueue {
    comp := activeQComp
    if fwk != nil {
        if queueSortFunc := fwk.QueueSortFunc(); queueSortFunc != nil {
            comp = func(podInfo1, podInfo2 interface{}) bool {
                pInfo1 := podInfo1.(*framework.PodInfo)
                pInfo2 := podInfo2.(*framework.PodInfo)

                return queueSortFunc(pInfo1, pInfo2)
            }
        }
    }

    pq := &PriorityQueue{
        clock:            clock,
        stop:             stop,
        podBackoff:       NewPodBackoffMap(1*time.Second, 10*time.Second),
        activeQ:          util.NewHeapWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
        unschedulableQ:   newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
        nominatedPods:    newNominatedPodMap(),
        moveRequestCycle: -1,
    }
    pq.cond.L = &pq.lock
    pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())

    pq.run()

    return pq
}

前面的文章已经说了 scheduleOne() 是执行调度算法的主逻辑,其主要功能有:

  • 调用 sched.schedule(),即执行 predicates 算法和 priorities 算法
  • 若执行失败,会返回 core.FitError
  • 若开启了抢占机制,则执行抢占机制
    k8s.io/kubernetes/pkg/scheduler/scheduler.go:516

    func (sched *Scheduler) scheduleOne() {
      ......
      scheduleResult, err := sched.schedule(pod, pluginContext)
      // predicates 算法和 priorities 算法执行失败
      if err != nil {
          if fitError, ok := err.(*core.FitError); ok {
              // 是否开启抢占机制
              if sched.DisablePreemption {
                  .......
              } else {
                  // 执行抢占机制
                  preemptionStartTime := time.Now()
                  sched.preempt(pluginContext, fwk, pod, fitError)
                  ......
              }
              ......
          } else {
              ......
          }
          return
      }
      ......
    }
    


    我们主要来看其中的抢占机制,sched.preempt() 是执行抢占机制的主逻辑,主要功能有:

  • 从 apiserver 获取 pod info

  • 调用 sched.Algorithm.Preempt()执行抢占逻辑,该函数会返回抢占成功的 node、被抢占的 pods(victims) 以及需要被移除已提名的 pods
  • 更新 scheduler 缓存,为抢占者绑定 nodeName,即设定 pod.Status.NominatedNodeName
  • 将 pod info 提交到 apiserver
  • 删除被抢占的 pods
  • 删除被抢占 pods 的 NominatedNodeName 字段


可以看到当上述抢占过程发生时,抢占者并不会立刻被调度到被抢占的 node 上,调度器只会将抢占者的 status.nominatedNodeName 字段设置为被抢占的 node 的名字。然后,抢占者会重新进入下一个调度周期,在新的调度周期里来决定是不是要运行在被抢占的节点上,当然,即使在下一个调度周期,调度器也不会保证抢占者一定会运行在被抢占的节点上。

这样设计的一个重要原因是调度器只会通过标准的 DELETE API 来删除被抢占的 pod,所以,这些 pod 必然是有一定的“优雅退出”时间(默认是 30s)的。而在这段时间里,其他的节点也是有可能变成可调度的,或者直接有新的节点被添加到这个集群中来。所以,鉴于优雅退出期间集群的可调度性可能会发生的变化,把抢占者交给下一个调度周期再处理,是一个非常合理的选择。而在抢占者等待被调度的过程中,如果有其他更高优先级的 pod 也要抢占同一个节点,那么调度器就会清空原抢占者的 status.nominatedNodeName 字段,从而允许更高优先级的抢占者执行抢占,并且,这也使得原抢占者本身也有机会去重新抢占其他节点。以上这些都是设置 nominatedNodeName 字段的主要目的。
k8s.io/kubernetes/pkg/scheduler/scheduler.go:352

func (sched Scheduler) preempt(pluginContext framework.PluginContext, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
// 获取 pod info
preemptor, err := sched.PodPreemptor.GetUpdatedPod(preemptor)
if err != nil {
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}

// 执行抢占算法
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(pluginContext, preemptor, scheduleErr)
if err != nil {
    ......
}
var nodeName = ""
if node != nil {
    nodeName = node.Name
    // 更新 scheduler 缓存,为抢占者绑定 nodename,即设定 pod.Status.NominatedNodeName
    sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)

    // 将 pod info 提交到 apiserver
    err = sched.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
    if err != nil {
        sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
        return "", err
    }
    // 删除被抢占的 pods
    for _, victim := range victims {
        if err := sched.PodPreemptor.DeletePod(victim); err != nil {
            return "", err
        }
        ......
    }
}

// 删除被抢占 pods 的 NominatedNodeName 字段
for _, p := range nominatedPodsToClear {
    rErr := sched.PodPreemptor.RemoveNominatedNodeName(p)
    if rErr != nil {
        ......
    }
}
return nodeName, err
}


preempt()中会调用 sched.Algorithm.Preempt()来执行实际抢占的算法,其主要功能有:

  • 判断 err 是否为 FitError
  • 调用podEligibleToPreemptOthers()确认 pod 是否有抢占其他 pod 的资格,若 pod 已经抢占了低优先级的 pod,被抢占的 pod 处于 terminating 状态中,则不会继续进行抢占
  • 如果确定抢占可以发生,调度器会把自己缓存的所有节点信息复制一份,然后使用这个副本来模拟抢占过程
  • 过滤预选失败的 node 列表,此处会检查 predicates 失败的原因,若存在 NodeSelectorNotMatch、PodNotMatchHostName 这些 error 则不能成为抢占者,如果过滤出的候选 node 为空则返回抢占者作为 nominatedPodsToClear
  • 获取 PodDisruptionBudget 对象
  • 从预选失败的 node 列表中并发计算可以被抢占的 nodes,得到 nodeToVictims
  • 若声明了 extenders 则调用 extenders 再次过滤 nodeToVictims
  • 调用 pickOneNodeForPreemption() 从 nodeToVictims 中选出一个节点作为最佳候选人
  • 移除低优先级 pod 的 Nominated,更新这些 pod,移动到 activeQ 队列中,让调度器为这些 pod 重新 bind node


k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:320

func (g genericScheduler) Preempt(pluginContext framework.PluginContext, pod v1.Pod, scheduleErr error) (v1.Node, []v1.Pod, []v1.Pod, error) {
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil, nil
}
// 判断 pod 是否支持抢占,若 pod 已经抢占了低优先级的 pod,被抢占的 pod 处于 terminating 状态中,则不会继续进行抢占
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) {
return nil, nil, nil, nil
}
// 从缓存中获取 node list
allNodes := g.cache.ListNodes()
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
// 过滤 predicates 算法执行失败的 node 作为抢占的候选 node
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
// 如果过滤出的候选 node 为空则返回抢占者作为 nominatedPodsToClear
if len(potentialNodes) == 0 {
return nil, nil, []*v1.Pod{pod}, nil
}
// 获取 PodDisruptionBudget objects
pdbs, err := g.pdbLister.List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
// 过滤出可以抢占的 node 列表
nodeToVictims, err := g.selectNodesForPreemption(pluginContext, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
g.predicateMetaProducer, g.schedulingQueue, pdbs)
if err != nil {
return nil, nil, nil, err
}

// 若有 extender 则执行
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
if err != nil {
    return nil, nil, nil, err
}

// 选出最佳的 node
candidateNode := pickOneNodeForPreemption(nodeToVictims)
if candidateNode == nil {
    return nil, nil, nil, nil
}

// 移除低优先级 pod 的 Nominated,更新这些 pod,移动到 activeQ 队列中,让调度器
// 为这些 pod 重新 bind node
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok {
    return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil
}

return nil, nil, nil, fmt.Errorf(
    "preemption failed: the target node %s has been deleted from scheduler cache",
    candidateNode.Name)
 }


该函数中调用了多个函数:

  • nodesWherePreemptionMightHelp():过滤 predicates 算法执行失败的 node
  • selectNodesForPreemption():过滤出可以抢占的 node 列表
  • pickOneNodeForPreemption():选出最佳的 node
  • getLowerPriorityNominatedPods():移除低优先级 pod 的 Nominated


selectNodesForPreemption() 从 prediacates 算法执行失败的 node 列表中来寻找可以被抢占的 node,通过workqueue.ParallelizeUntil()并发执行checkNode()函数检查 node。

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:996

func (g *genericScheduler) selectNodesForPreemption(
......
) (map[v1.Node]schedulerapi.Victims, error) {
nodeToVictims := map[v1.Node]schedulerapi.Victims{}
var resultLock sync.Mutex
meta := metadataProducer(pod, nodeNameToInfo)
// checkNode 函数
checkNode := func(i int) {
    nodeName := potentialNodes[i].Name
    var metaCopy predicates.PredicateMetadata
    if meta != nil {
        metaCopy = meta.ShallowCopy()
    }
    // 调用 selectVictimsOnNode 函数进行检查
    pods, numPDBViolations, fits := g.selectVictimsOnNode(pluginContext, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
    if fits {
        resultLock.Lock()
        victims := schedulerapi.Victims{
            Pods:             pods,
            NumPDBViolations: numPDBViolations,
        }
        nodeToVictims[potentialNodes[i]] = &victims
        resultLock.Unlock()
    }
}
// 启动 16 个 goroutine 并发执行
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
return nodeToVictims, nil
}


其中调用的selectVictimsOnNode()是来获取每个 node 上 victims pod 的,首先移除所有低优先级的 pod 尝试抢占者是否可以调度成功,如果能够调度成功,然后基于 pod 是否有 PDB 被分为两组 violatingVictims 和 nonViolatingVictims,再对每一组的 pod 按优先级进行排序。PDB(pod 中断预算)是 kubernetes 保证副本高可用的一个对象。
然后开始逐一”删除“ pod 即要删掉最少的 pod 数来完成这次抢占即可,先从 violatingVictims(有PDB)的一组中进行”删除“ pod,并且记录删除有 PDB pod 的数量,然后再“删除” nonViolatingVictims 组中的 pod,每次”删除“一个 pod 都要检查一下抢占者是否能够运行在该 node 上即执行一次预选策略,若执行预选策略失败则该 node 当前不满足抢占需要继续”删除“ pod 并将该 pod 加入到 victims 中,直到”删除“足够多的 pod 可以满足抢占,最后返回 victims 以及删除有 PDB pod 的数量。
k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:1086

func (g *genericScheduler) selectVictimsOnNode(
......
) ([]*v1.Pod, int, bool) {
if nodeInfo == nil {
return nil, 0, false
}
potentialVictims := util.SortableList{CompFunc: util.MoreImportantPod}
nodeInfoCopy := nodeInfo.Clone()

removePod := func(rp *v1.Pod) {
    nodeInfoCopy.RemovePod(rp)
    if meta != nil {
        meta.RemovePod(rp, nodeInfoCopy.Node())
    }
}
addPod := func(ap *v1.Pod) {
    nodeInfoCopy.AddPod(ap)
    if meta != nil {
        meta.AddPod(ap, nodeInfoCopy)
    }
}
// 先删除所有的低优先级 pod 检查是否能满足抢占 pod 的调度需求
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {
    if util.GetPodPriority(p) < podPriority {
        potentialVictims.Items = append(potentialVictims.Items, p)
        removePod(p)
    }
}
// 如果删除所有低优先级的 pod 不符合要求则直接过滤掉该 node
// podFitsOnNode 就是前文讲过用来执行预选函数的
if fits, _, _, err := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
    if err != nil {
                    ......
    }
    return nil, 0, false
}
var victims []*v1.Pod
numViolatingVictim := 0
potentialVictims.Sort()

// 尝试尽量多地“删除”这些 pods,先从 PDB violating victims 中“删除”,再从 PDB non-violating victims 中“删除”
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)

// reprievePod 是“删除” pods 的函数
reprievePod := func(p *v1.Pod) bool {
    addPod(p)
    // 同样也会调用 podFitsOnNode 再次执行 predicates 算法
    fits, _, _, _ := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false)
    if !fits {
        removePod(p)
        // 加入到 victims 中
        victims = append(victims, p)
    }
    return fits
}
 // 删除 violatingVictims 中的 pod,同时也记录删除了多少个
for _, p := range violatingVictims {
    if !reprievePod(p) {
        numViolatingVictim++
    }
}
// 删除 nonViolatingVictims 中的 pod
for _, p := range nonViolatingVictims {
    reprievePod(p)
}
return victims, numViolatingVictim, true
}


pickOneNodeForPreemption() 用来选出最佳的 node 作为抢占者的 node,该函数主要基于 6 个原则:

  • PDB violations 值最小的 node
  • 挑选具有高优先级较少的 node
  • 对每个 node 上所有 victims 的优先级进项累加,选取最小的
  • 如果多个 node 优先级总和相等,选择具有最小 victims 数量的 node
  • 如果多个 node 优先级总和相等,选择具有高优先级且 pod 运行时间最短的
  • 如果依据以上策略仍然选出了多个 node 则直接返回第一个 node

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:867

func pickOneNodeForPreemption(nodesToVictims map[v1.Node]schedulerapi.Victims) *v1.Node {
if len(nodesToVictims) == 0 {
return nil
}
minNumPDBViolatingPods := math.MaxInt32
var minNodes1 []*v1.Node
lenNodes1 := 0
for node, victims := range nodesToVictims {
if len(victims.Pods) == 0 {
// 若该 node 没有 victims 则返回
return node
}
numPDBViolatingPods := victims.NumPDBViolations
if numPDBViolatingPods < minNumPDBViolatingPods {
minNumPDBViolatingPods = numPDBViolatingPods
minNodes1 = nil
lenNodes1 = 0
}
if numPDBViolatingPods == minNumPDBViolatingPods {
minNodes1 = append(minNodes1, node)
lenNodes1++
}
}
if lenNodes1 == 1 {
return minNodes1[0]
}
// 选出 PDB violating pods 数量最少的或者高优先级 victim 数量少的
minHighestPriority := int32(math.MaxInt32)
var minNodes2 = make([]*v1.Node, lenNodes1)
lenNodes2 := 0
for i := 0; i < lenNodes1; i++ {
    node := minNodes1[i]
    victims := nodesToVictims[node]
    highestPodPriority := util.GetPodPriority(victims.Pods[0])
    if highestPodPriority < minHighestPriority {
        minHighestPriority = highestPodPriority
        lenNodes2 = 0
    }
    if highestPodPriority == minHighestPriority {
        minNodes2[lenNodes2] = node
        lenNodes2++
    }
}
if lenNodes2 == 1 {
    return minNodes2[0]
}
// 若多个 node 高优先级的 pod 同样少,则选出加权得分最小的
minSumPriorities := int64(math.MaxInt64)
lenNodes1 = 0
for i := 0; i < lenNodes2; i++ {
    var sumPriorities int64
    node := minNodes2[i]
    for _, pod := range nodesToVictims[node].Pods {
        sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
    }
    if sumPriorities < minSumPriorities {
        minSumPriorities = sumPriorities
        lenNodes1 = 0
    }
    if sumPriorities == minSumPriorities {
        minNodes1[lenNodes1] = node
        lenNodes1++
    }
}
if lenNodes1 == 1 {
    return minNodes1[0]
}
// 若多个 node 高优先级的 pod 数量同等且加权分数相等,则选出 pod 数量最少的
minNumPods := math.MaxInt32
lenNodes2 = 0
for i := 0; i < lenNodes1; i++ {
    node := minNodes1[i]
    numPods := len(nodesToVictims[node].Pods)
    if numPods < minNumPods {
        minNumPods = numPods
        lenNodes2 = 0
    }
    if numPods == minNumPods {
        minNodes2[lenNodes2] = node
        lenNodes2++
    }
}
if lenNodes2 == 1 {
    return minNodes2[0]
}
// 若多个 node 的 pod 数量相等,则选出高优先级 pod 启动时间最短的
latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
if latestStartTime == nil {
    return minNodes2[0]
}
nodeToReturn := minNodes2[0]
for i := 1; i < lenNodes2; i++ {
    node := minNodes2[i]
    earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
    if earliestStartTimeOnNode == nil {
        klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
        continue
    }
    if earliestStartTimeOnNode.After(latestStartTime.Time) {
        latestStartTime = earliestStartTimeOnNode
        nodeToReturn = node
    }
}

return nodeToReturn
}


以上就是对抢占机制代码的一个通读。

6.3 优先级与抢占机制的使用


1、创建 PriorityClass 对象:

apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: high-priority
value: 1000000
globalDefault: false
description: "This priority class should be used for XYZ service pods only."

2、在 deployment、statefulset 或者 pod 中声明使用已有的 priorityClass 对象即可
在 pod 中使用:

apiVersion: v1
kind: Pod
metadata:
  labels:
    app: nginx-a
  name: nginx-a
spec:
  containers:
  - image: nginx:1.7.9
    imagePullPolicy: IfNotPresent
    name: nginx-a
    ports:
    - containerPort: 80
      protocol: TCP
    resources:
      requests:
        memory: "64Mi"
        cpu: 5
      limits:
        memory: "128Mi"
        cpu: 5
  priorityClassName: high-priority


在 deployment 中使用:

template:
spec:
containers:
- image: nginx
name: nginx-deployment
priorityClassName: high-priority


3、测试过程中可以看到高优先级的 nginx-a 会抢占 nginx-5754944d6c 的资源:

$ kubectl get pod -o  wide -w
NAME                     READY   STATUS    RESTARTS   AGE   IP           NODE          NOMINATED NODE   READINESS GATES
nginx-5754944d6c-9mnxa   1/1     Running   0          37s   10.244.1.4   test-worker   <none>           <none>
nginx-a                  0/1     Pending   0          0s    <none>       <none>        <none>           <none>
nginx-a                  0/1     Pending   0          0s    <none>       <none>        <none>           <none>
nginx-a                  0/1     Pending   0          0s    <none>       <none>        test-worker      <none>
nginx-5754944d6c-9mnxa   1/1     Terminating   0          45s   10.244.1.4   test-worker   <none>           <none>
nginx-5754944d6c-9mnxa   0/1     Terminating   0          46s   10.244.1.4   test-worker   <none>           <none>
nginx-5754944d6c-9mnxa   0/1     Terminating   0          47s   10.244.1.4   test-worker   <none>           <none>
nginx-5754944d6c-9mnxa   0/1     Terminating   0          47s   10.244.1.4   test-worker   <none>           <none>
nginx-a                  0/1     Pending       0          2s    <none>       test-worker   test-worker      <none>
nginx-a                  0/1     ContainerCreating   0          2s    <none>       test-worker   <none>           <none>
nginx-a                  1/1     Running             0          4s    10.244.1.5   test-worker   <none>           <none>

6.4 总结

这里主要讲述 kube-scheduler 中的优先级与抢占机制,可以看到抢占机制比 predicates 与 priorities 算法都要复杂。