kube-scheduler设计

Kube-scheduler 是 kubernetes 的核心组件之一,kube-scheduler 的目的就是为每一个 pod 选择一个合适的 node,整体流程可以概括为三步,获取未调度的 podList,通过执行一系列调度算法为 pod 选择一个合适的 node,提交数据到 apiserver,其核心则是一系列调度算法的设计与执行。
官方对 kube-scheduler 的调度流程描述 The Kubernetes Scheduler

  1. For given pod:
  2. +---------------------------------------------+
  3. | Schedulable nodes: |
  4. | |
  5. | +--------+ +--------+ +--------+ |
  6. | | node 1 | | node 2 | | node 3 | |
  7. | +--------+ +--------+ +--------+ |
  8. | |
  9. +-------------------+-------------------------+
  10. |
  11. |
  12. v
  13. +-------------------+-------------------------+
  14. Pred. filters: node 3 doesn't have enough resource
  15. +-------------------+-------------------------+
  16. |
  17. |
  18. v
  19. +-------------------+-------------------------+
  20. | remaining nodes: |
  21. | +--------+ +--------+ |
  22. | | node 1 | | node 2 | |
  23. | +--------+ +--------+ |
  24. | |
  25. +-------------------+-------------------------+
  26. |
  27. |
  28. v
  29. +-------------------+-------------------------+
  30. Priority function: node 1: p=2
  31. node 2: p=5
  32. +-------------------+-------------------------+
  33. |
  34. |
  35. v
  36. select max{node priority} = node 2

kube-scheduler 目前包含两部分调度算法 predicates 和 priorities,首先执行 predicates 算法过滤部分 node 然后执行 priorities 算法为所有 node 打分,最后从所有 node 中选出分数最高的最为最佳的 node。

参数配置

kube-scheduler 的启动入口位于 cmd/kube-scheduler/scheduler.go 文件,该文件中就包含一个 main 入口函数:

// cmd/kube-scheduler/scheduler.go

func main() {
 rand.Seed(time.Now().UnixNano())

 // 1、初始化 Cobra.Command 对象
 command := app.NewSchedulerCommand()

  // 将命令行参数进行标准化(_替换成-)
 pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
  // 2、初始化日志
 logs.InitLogs()
 defer logs.FlushLogs()

  // 3、执行命令
 if err := command.Execute(); err != nil {
  os.Exit(1)
 }
}

其中最核心的就是通过 app.NewSchedulerCommand() 或者一个 Cobra 的 Command 对象,然后最下面调用 command.Execute() 函数执行这个命令,所以核心就是 NewSchedulerCommand 函数的实现:

// cmd/kubescheduler/app/server.go

// NewSchedulerCommand 使用默认参数和 registryOptions 创建一个 *cobra.Command 对象
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {

    // 获取默认的default scheduler配置信息
    opts, err := options.NewOptions()
    if err != nil {
        klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        ......
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, opts, registryOptions...); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }

    fs := cmd.Flags()
    // 调用 Options 的 Flags 方法
    namedFlagSets := opts.Flags()
    verflag.AddFlags(namedFlagSets.FlagSet("global"))
    globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())

    // 将默认的所有参数添加到 cmd.Flags 中去
    for _, f := range namedFlagSets.FlagSets {
        fs.AddFlagSet(f)
    }
    ......
    return cmd
}

其中的 opts.Flags() 方法就是将默认的 Options 配置转换成命令行参数的函数:

// cmd/kube-scheduler/app/options/options.go

func (o *Options) Flags() (nfs cliflag.NamedFlagSets) {
    fs := nfs.FlagSet("misc")
    fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, `The path to the configuration file. The following flags can overwrite fields in this file:
  --address
  --port
  --use-legacy-policy-config
  --policy-configmap
  --policy-config-file
  --algorithm-provider`)
    fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
    fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")

    o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
    o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
    o.Authentication.AddFlags(nfs.FlagSet("authentication"))
    o.Authorization.AddFlags(nfs.FlagSet("authorization"))
    o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)

    options.BindLeaderElectionFlags(&o.ComponentConfig.LeaderElection, nfs.FlagSet("leader election"))
    utilfeature.DefaultMutableFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
    o.Metrics.AddFlags(nfs.FlagSet("metrics"))
    o.Logs.AddFlags(nfs.FlagSet("logs"))

    return nfs
}

其中第一个参数 --config 就可以用来指定配置文件。到这里我们就获取到了调度器所有默认的配置参数了。

启动调度器

接下来分析真正运行调度器的 runCommand 函数的实现。

// cmd/kube-scheduler/app/server.go

// 运行调度器真正的函数
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
    verflag.PrintAndExitIfRequested()
    cliflag.PrintFlags(cmd.Flags())

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 根据命令行 args 和 options 创建完整的配置和调度程序
    cc, sched, err := Setup(ctx, opts, registryOptions...)
    if err != nil {
        return err
    }

    // 如果指定了 WriteConfigTo 参数
    if len(opts.WriteConfigTo) > 0 {

        // 将配置写入到指定的文件中
        if err := options.WriteConfigFile(opts.WriteConfigTo, &cc.ComponentConfig); err != nil {
            return err
        }
        klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
        return nil
    }

    // 真正去启动调度器
    return Run(ctx, cc, sched)
}

上面的函数首先判断是否是执行类似于 --version 这样的操作,如果是这打印后直接退出,然后根据命令行参数和选项通过 Setup 函数构造 CompletedConfig 配置和 Scheduler 调度器对象。

// cmd/kube-scheduler/app/server.go/

// 根据命令行参数和选项构造完整的配置和调度器对象
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {

    // 校验命令行选项
    if errs := opts.Validate(); len(errs) > 0 {
        return nil, nil, utilerrors.NewAggregate(errs)
    }

    // 获取调度器Config对象,该对象拥有一个调度器所有的上下文信息
    c, err := opts.Config()
    if err != nil {
        return nil, nil, err
    }

    // 获取 completed 配置
    cc := c.Complete()

    outOfTreeRegistry := make(runtime.Registry)

    // outOfTree plugins插件注册
    for _, option := range outOfTreeRegistryOptions {
        if err := option(outOfTreeRegistry); err != nil {
            return nil, nil, err
        }
    }

    recorderFactory := getRecorderFactory(&cc)

    // 创建调度器
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory,
        cc.PodInformer,
        recorderFactory,
        ctx.Done(),
        scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
        scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
        scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
        scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
        scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
    )
    if err != nil {
        return nil, nil, err
    }

    return &cc, sched, nil
}

该函数首先调用 opts.Validate() 函数对所有参数进行校验,接着使用 opts.Config() 函数创建 *schedulerappconfig.Config 对象,该对象拥有一个调度器所有的上下文信息。

// cmd/kube-scheduler/app/options/options.go

// Config 返回一个调度器配置对象
func (o *Options) Config() (*schedulerappconfig.Config, error) {
    if o.SecureServing != nil {
        if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
            return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
        }
    }

    c := &schedulerappconfig.Config{}
    // 将扩展调度器配置应用于调度器应用程序配置
    if err := o.ApplyTo(c); err != nil {
        return nil, err
    }

    // 创建 kube 客户端
    client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
    if err != nil {
        return nil, err
    }

    c.EventBroadcaster = events.NewEventBroadcasterAdapter(eventClient)

    // Set up leader election if enabled.
    var leaderElectionConfig *leaderelection.LeaderElectionConfig
    if c.ComponentConfig.LeaderElection.LeaderElect {
        // Use the scheduler name in the first profile to record leader election.
        coreRecorder := c.EventBroadcaster.DeprecatedNewLegacyRecorder(c.ComponentConfig.Profiles[0].SchedulerName)
        leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, coreRecorder)
        if err != nil {
            return nil, err
        }
    }

    c.Client = client
    c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
    c.PodInformer = scheduler.NewPodInformer(client, 0)
    c.LeaderElection = leaderElectionConfig

    return c, nil
}

上面函数的核心是通过 o.ApplyTo(c) 函数将 Options 转换成了 *schedulerappconfig.Config 对象,

// cmd/kube-scheduler/app/options/options.go

// 将调度程序 options 转换成调度程序应用配置
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
 if len(o.ConfigFile) == 0 {
  c.ComponentConfig = o.ComponentConfig

  // 如果未加载任何配置文件,则应用 deprecated flags(这是旧的行为)
    o.Deprecated.ApplyTo(&c.ComponentConfig)
  if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
   return err
  }
 } else {
  cfg, err := loadConfigFromFile(o.ConfigFile)
  if err != nil {
   return err
  }
  if err := validation.ValidateKubeSchedulerConfiguration(cfg).ToAggregate(); err != nil {
   return err
  }
  c.ComponentConfig = *cfg
  ......
 }
 ......
 return nil
}

上面的转换函数会首先判断是否配置了 ConfigFile(也就是 --config 参数),如果配置了则会加载对应的配置文件转换成对应的 KubeSchedulerConfiguration 对象,然后校验有效性,如果都正常则将其赋给 schedulerappconfig.Config 的 ComponentConfig 属性;如果没有配置 ConfigFile,则使用旧的参数进行配置。

接着会去调用 scheduler.New() 函数去构造一个真正的调度器对象,该函数的具体实现如下所示:

// pkg/scheduler/scheduler.go

// 配置调度器
type Option func(*schedulerOptions)

var defaultSchedulerOptions = schedulerOptions{
    profiles: []schedulerapi.KubeSchedulerProfile{
        // Profiles 的默认插件是从算法提供程序配置的
        {SchedulerName: v1.DefaultSchedulerName},   // 默认的调度器名称为 default-scheduler
    },
    schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{
        Provider: defaultAlgorithmSourceProviderName(),     // 默认的算法源提供器名称为 DefaultProvider
    },
    percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
    podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
    podMaxBackoffSeconds:     int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
}

// 返回一个Scheduler对象
func New(client clientset.Interface,
    informerFactory informers.SharedInformerFactory,
    podInformer coreinformers.PodInformer,
    recorderFactory profile.RecorderFactory,
    stopCh <-chan struct{},
    opts ...Option) (*Scheduler, error) {

    stopEverything := stopCh
    if stopEverything == nil {
        stopEverything = wait.NeverStop
    }

     // 默认的调度器配置
    options := defaultSchedulerOptions
    for _, opt := range opts {
        // 将默认的调度器配置调用 Option 重新配置一次
        opt(&options)
    }

    schedulerCache := internalcache.New(30*time.Second, stopEverything)

    // 所有默认inTree插件
    registry := frameworkplugins.NewInTreeRegistry()
    // 合并outTree和inTree插件并注册
    if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
        return nil, err
    }

    snapshot := internalcache.NewEmptySnapshot()

    configurator := &Configurator{
        client:                   client,
        recorderFactory:          recorderFactory,
        informerFactory:          informerFactory,
        podInformer:              podInformer,
        schedulerCache:           schedulerCache,
        StopEverything:           stopEverything,
        percentageOfNodesToScore: options.percentageOfNodesToScore,
        podInitialBackoffSeconds: options.podInitialBackoffSeconds,
        podMaxBackoffSeconds:     options.podMaxBackoffSeconds,
        profiles:                 append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
        registry:                 registry,
        nodeInfoSnapshot:         snapshot,
        extenders:                options.extenders,
        frameworkCapturer:        options.frameworkCapturer,
    }

    metrics.Register()

    var sched *Scheduler
    // SchedulerAlgorithmSource 是调度程序算法的源
   // 包含Policy与Provider两种方式,必须指定一个源字段,并且源字段是互斥的
    source := options.schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // 从一个算法 provider 中创建配置,这是我们现在需要重点关注的方式
        sc, err := configurator.createFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
        }
        sched = sc
    case source.Policy != nil:
        // 从用户指定的策略源中创建配置,这是以前的扩展方式
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        ...
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    ...

    // addAllEventHandlers 是在测试和 Scheduler 中使用的帮助程序函数,用于为各种 informers 添加事件处理程序
    addAllEventHandlers(sched, informerFactory, podInformer)
    return sched, nil
}

首先将默认的调度器配置通过传递的 Option 参数进行一一配置,然后重点就是根据应用过后的配置来判断调度算法的源是 Provider 还是 Policy 方式,我们现在的重点是调度框架,所以主要关注 Provider 这种配置,Policy 是以前的扩展调度器的方式。所以调度器的实例化核心是通过 configurator.createFromProvider(*source.Provider) 该函数来实现的。

// pkg/scheduler/factory.go

// createFromProvider 从注册的算法提供器来创建调度器
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
    klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
    r := algorithmprovider.NewRegistry()

     // 获取指定算法提供器的插件集合
    defaultPlugins, exist := r[providerName]
    if !exist {
        return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
    }

    for i := range c.profiles {
        prof := &c.profiles[i]
        plugins := &schedulerapi.Plugins{}
        plugins.Append(defaultPlugins)
         // Apply 合并来自自定义插件的插件配置
        plugins.Apply(prof.Plugins)
        prof.Plugins = plugins
    }
    return c.create()
}

// 从一组已注册的插件集合中创建一个调度器
func (c *Configurator) create() (*Scheduler, error) {
    var extenders []framework.Extender
    var ignoredExtendedResources []string
    if len(c.extenders) != 0 {

         // Extender 方式扩展调度器
        ......
    }

    .....

    // Profiles 需要提供有效的 queue sort 插件
    // 所有配置文件必须在 QueueSort 扩展点使用相同的插件,并具有相同的配置参数(如果适用)。 这是因为调度器只有一个的队列保存悬决的 Pod。
    lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()

    // 将优先级队列初始化为调度队列
    podQueue := internalqueue.NewSchedulingQueue(
        lessFn,
        internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
        internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
        internalqueue.WithPodNominator(nominator),
    )

    // Setup cache debugger.
    debugger := cachedebugger.New(
        c.informerFactory.Core().V1().Nodes().Lister(),
        c.podInformer.Lister(),
        c.schedulerCache,
        podQueue,
    )
    debugger.ListenForSignal(c.StopEverything)

    // 创建一个 genericScheduler 对象,该对象实现了 ScheduleAlgorithm 接口,具体的调度实现就是这个对象实现的
    algo := core.NewGenericScheduler(
        c.schedulerCache,
        c.nodeInfoSnapshot,
        extenders,
        c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
        c.disablePreemption,
        c.percentageOfNodesToScore,
    )

    return &Scheduler{
        SchedulerCache:  c.schedulerCache,
        Algorithm:       algo,
        Profiles:        profiles,
        NextPod:         internalqueue.MakeNextPodFunc(podQueue),
        Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
        StopEverything:  c.StopEverything,
        SchedulingQueue: podQueue,
    }, nil
}

通过上面的一些列操作后就实例化了真正的调度器对象,最后我们需要去启动一系列的资源对象的事件监听程序,比如 Pod、Node 等对象,上面实例化函数中通过 addAllEventHandlers(sched, informerFactory, podInformer) 来实现的,关于这些资源对象对应的 onAdd、onUpdate、onDelete 操作均在 pkg/scheduler/eventhandlers.go 文件中实现的,这样比如当创建一个 Pod 过后,我们的调度器通过 watch 就会在 onAdd 事件中接收到该操作,然后我们就可以根据 queue sort 插件将器加入到带调度队列中去开始调度了。
最后就是去调用 Run 函数来真正启动调度器了,首先会等待所有的 cache 同步完成,然后开始进行调度操作。

// cmd/kube-scheduler/app/server.go/

// Run 根据指定的配置执行调度程序,仅在出现错误或上下文完成时才返回
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
     klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())

     ......

     // 启动 healthz 以及 metrics 相关服务
     ......

     // 启动所有 informers
     go cc.PodInformer.Informer().Run(ctx.Done())
     cc.InformerFactory.Start(ctx.Done())

     // 调度之前等待所有 caches 同步完成
     cc.InformerFactory.WaitForCacheSync(ctx.Done())

     // 开启了 leader election
     if cc.LeaderElection != nil {
        cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            OnStartedLeading: sched.Run,
            OnStoppedLeading: func() {
                klog.Fatalf("leaderelection lost")
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run(ctx)

        return fmt.Errorf("lost lease")
    }

     // 如果没有开启 Leader election,这直接调用调度器对象的 Run 函数
     sched.Run(ctx)
 return fmt.Errorf("finished without leader elect")
}

sched.Run()主要执行以下步骤

  • 等待Cache同步完毕
  • 启动控制Pod队列的goroutines
  • 通过 wait.UntilWithContext() 方法,定时调用 sched.scheduleOne() 方法, sched.scheduleOne() 方法完成一轮调度

    // pkg/scheduler/scheduler.go:312
    // 1、等待 cache 同步完成
    func (sched *Scheduler) Run(ctx context.Context) {
      if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
          return
      }
      // 2、运行SchedulingQueue
      sched.SchedulingQueue.Run()
      // 3、启动通过sched.scheduleOne()方法定时调度的goroutines
      wait.UntilWithContext(ctx, sched.scheduleOne, 0)
      sched.SchedulingQueue.Close()
    }
    

    scheduleOne

    scheduleOne主要为单个pod选择一个适合的节点,为调度逻辑的核心函数。

    func (sched *Scheduler) scheduleOne(ctx context.Context) {
      podInfo := sched.NextPod()
    
      if podInfo == nil || podInfo.Pod == nil {
          return
      }
      pod := podInfo.Pod
      prof, err := sched.profileForPod(pod)
      if err != nil {
          klog.Error(err)
          return
      }
      if sched.skipPodSchedule(prof, pod) {
          return
      }
    
      klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
    
      // Synchronously attempt to find a fit for the pod.
      start := time.Now()
      state := framework.NewCycleState()
      state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
      schedulingCycleCtx, cancel := context.WithCancel(ctx)
      defer cancel()
      scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
      if err != nil {
          // Schedule() may have failed because the pod would not fit on any host, so we try to
          // preempt, with the expectation that the next time the pod is tried for scheduling it
          // will fit due to the preemption. It is also possible that a different pod will schedule
          // into the resources that were preempted, but this is harmless.
          nominatedNode := ""
          if fitError, ok := err.(*core.FitError); ok {
              if !prof.HasPostFilterPlugins() {
                  klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.")
              } else {
                  // Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
                  result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
                  if status.Code() == framework.Error {
                      klog.Errorf("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
                  } else {
                      klog.V(5).Infof("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
                  }
                  if status.IsSuccess() && result != nil {
                      nominatedNode = result.NominatedNodeName
                  }
              }
              // Pod did not fit anywhere, so it is counted as a failure. If preemption
              // succeeds, the pod should get counted as a success the next time we try to
              // schedule it. (hopefully)
              metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
          } else if err == core.ErrNoNodesAvailable {
              // No nodes available is counted as unschedulable rather than an error.
              metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
          } else {
              klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod))
              metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
          }
          sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
          return
      }
      metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
      // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
      // This allows us to keep scheduling without waiting on binding to occur.
      assumedPodInfo := podInfo.DeepCopy()
      assumedPod := assumedPodInfo.Pod
      // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
      err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
      if err != nil {
          metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
          // This is most probably result of a BUG in retrying logic.
          // We report an error here so that pod scheduling can be retried.
          // This relies on the fact that Error will check if the pod has been bound
          // to a node and if so will not add it back to the unscheduled pods queue
          // (otherwise this would cause an infinite loop).
          sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "")
          return
      }
    
      // Run the Reserve method of reserve plugins.
      if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
          metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
          // trigger un-reserve to clean up state associated with the reserved Pod
          prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
          if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
              klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
          }
          sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
          return
      }
    
      // Run "permit" plugins.
      runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
      if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
          var reason string
          if runPermitStatus.IsUnschedulable() {
              metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
              reason = v1.PodReasonUnschedulable
          } else {
              metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
              reason = SchedulerError
          }
          // One of the plugins returned status different than success or wait.
          prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
          if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
              klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
          }
          sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "")
          return
      }
    
      // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
      go func() {
          bindingCycleCtx, cancel := context.WithCancel(ctx)
          defer cancel()
          metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
          defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
    
          waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
          if !waitOnPermitStatus.IsSuccess() {
              var reason string
              if waitOnPermitStatus.IsUnschedulable() {
                  metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start))
                  reason = v1.PodReasonUnschedulable
              } else {
                  metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
                  reason = SchedulerError
              }
              // trigger un-reserve plugins to clean up state associated with the reserved Pod
              prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
              if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
                  klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
              }
              sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
              return
          }
    
          // Run "prebind" plugins.
          preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
          if !preBindStatus.IsSuccess() {
              metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
              // trigger un-reserve plugins to clean up state associated with the reserved Pod
              prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
              if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
                  klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
              }
              sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
              return
          }
    
          err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
          if err != nil {
              metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start))
              // trigger un-reserve plugins to clean up state associated with the reserved Pod
              prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
              if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
                  klog.Errorf("scheduler cache ForgetPod failed: %v", err)
              }
              sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "")
          } else {
              // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
              if klog.V(2).Enabled() {
                  klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
              }
              metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start))
              metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
              metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
    
              // Run "postbind" plugins.
              prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
          }
      }()
    }
    

Leaderelection机制

kubernetes支持3种资源锁:

  • EndpointsResourceLock(依赖于endpoints资源,默认资源锁)
  • ConfigMapResourceLock(依赖于config资源)
  • LeasesResourceLock(依赖于lease资源),

可通过—leader-elect-resource-lock参数指定使用哪种资源锁。
分布式锁的key存在于etcd集群的/registry/services/endpoints/kube-system/kube-scheduler中,该key存储竞选为leader节点的信息。

$ etcdctl get /registry/services/endpoints/kube-system/kube-scheduler

/registry/services/endpoints/kube-system/kube-scheduler
k8s

v1    Endpoints�
�
kube-scheduler
              kube-system"*$46a65347-41b8-49b9-90ad-bbc020a4564b2����b�
(control-plane.alpha.kubernetes.io/leader�{"holderIdentity":"vm192-168-0-218_9c5be54b-54f4-4b84-85f4-b88883c05cb5","leaseDurationSeconds":60,"acquireTime":"2021-05-17T03:32:24Z","renewTime":"2021-05-17T03:55:36Z","leaderTransitions":4}z"

leader节点信息,通过LeaderElectionRecord结构体描述:

type LeaderElectionRecord struct {
    HolderIdentity       string      `json:"holderIdentity"`            // 领导者的身份表示,通常为HostName_<hash值>
    LeaseDurationSeconds int         `json:"leaseDurationSeconds"`        // 领导者租约的时长
    AcquireTime          metav1.Time `json:"acquireTime"`               // 领导者获取锁的时间
    RenewTime            metav1.Time `json:"renewTime"`                    // 领导者续租的时间
    LeaderTransitions    int         `json:"leaderTransitions"`            // 领导者选举切换的次数
}

每种资源锁实现了对key(资源锁)的操作方法:

type Interface interface {
    Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
    Create(ctx context.Context, ler LeaderElectionRecord) error
    Update(ctx context.Context, ler LeaderElectionRecord) error
    RecordEvent(string)
    Identity() string
    Describe() string
}
func (o *Options) Config() (*schedulerappconfig.Config, error) {
    ...
    // 1.准备kube clients.
    client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
    if err != nil {
        return nil, err
    }

    // 2.创建event recorder,记录选举产生的事件
    c.EventBroadcaster = events.NewEventBroadcasterAdapter(eventClient)

    var leaderElectionConfig *leaderelection.LeaderElectionConfig
    if c.ComponentConfig.LeaderElection.LeaderElect {
        // 3.以profile第一个schedulerName为event recorder添加name属性
        coreRecorder := c.EventBroadcaster.DeprecatedNewLegacyRecorder(c.ComponentConfig.Profiles[0].SchedulerName)
        // 4. 创建一个与配置相关联的新资源锁
        leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, coreRecorder)
        if err != nil {
            return nil, err
        }
    }

    ...

    c.LeaderElection = leaderElectionConfig

    return c, nil
}

makeLeaderElectionConfig返回该节点的LeaderElectionConfig实例。

func makeLeaderElectionConfig(config componentbaseconfig.LeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
    hostname, err := os.Hostname()
    if err != nil {
        return nil, fmt.Errorf("unable to get hostname: %v", err)
    }
    // add a uniquifier so that two processes on the same host don't accidentally both become active
    id := hostname + "_" + string(uuid.NewUUID())

    rl, err := resourcelock.New(config.ResourceLock,
        config.ResourceNamespace,
        config.ResourceName,
        client.CoreV1(),
        client.CoordinationV1(),
        resourcelock.ResourceLockConfig{
            Identity:      id,
            EventRecorder: recorder,
        })
    if err != nil {
        return nil, fmt.Errorf("couldn't create resource lock: %v", err)
    }

    return &leaderelection.LeaderElectionConfig{
        Lock:          rl,
        LeaseDuration: config.LeaseDuration.Duration,
        RenewDeadline: config.RenewDeadline.Duration,
        RetryPeriod:   config.RetryPeriod.Duration,
        WatchDog:      leaderelection.NewLeaderHealthzAdaptor(time.Second * 20),
        Name:          "kube-scheduler",
    }, nil
}

resourcelock.New根据lockType返回不同类型资源锁

func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
    endpointsLock := &EndpointsLock{
        EndpointsMeta: metav1.ObjectMeta{
            Namespace: ns,
            Name:      name,
        },
        Client:     coreClient,
        LockConfig: rlc,
    }
    configmapLock := &ConfigMapLock{
        ConfigMapMeta: metav1.ObjectMeta{
            Namespace: ns,
            Name:      name,
        },
        Client:     coreClient,
        LockConfig: rlc,
    }
    leaseLock := &LeaseLock{
        LeaseMeta: metav1.ObjectMeta{
            Namespace: ns,
            Name:      name,
        },
        Client:     coordinationClient,
        LockConfig: rlc,
    }
    switch lockType {
    case EndpointsResourceLock:        // 选举信息保存在endpoints中
        return endpointsLock, nil
    case ConfigMapsResourceLock:        // 选举信息保存在configMap中
        return configmapLock, nil
    case LeasesResourceLock:               // 选举信息保存在lease中
        return leaseLock, nil
    case EndpointsLeasesResourceLock:    // 优先保存在endpoints中,失败的话,保存在lease
        return &MultiLock{
            Primary:   endpointsLock,
            Secondary: leaseLock,
        }, nil
    case ConfigMapsLeasesResourceLock:    // 优先保存在configMap中,失败的话,保存在lease
        return &MultiLock{
            Primary:   configmapLock,
            Secondary: leaseLock,
        }, nil
    default:
        return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }
}

leader选举过程

func (le *LeaderElector) Run(ctx context.Context) {
    defer runtime.HandleCrash()
    defer func() {
        le.config.Callbacks.OnStoppedLeading()
    }()

    if !le.acquire(ctx) {
        return // ctx signalled done
    }
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    go le.config.Callbacks.OnStartedLeading(ctx)
    le.renew(ctx)
}

SchedulingQueue解析

调度队列

调度队列中的Pod

// pkg/scheduler/framework/v1alpha1/types.go

// QueuedPodInfo是在Pod的API对象基础上增加了一些与调度队列相关的变量,所以在调度队列中管理的Pod对象是QueuedPodInfo类型的
type QueuedPodInfo struct {
    // 继承Pod的API类型
    Pod *v1.Pod
    // Pod添加到调度队列的时间。因为Pod可能会频繁的从调度队列中取出(用于调度)然后再放入调度队列(不可调度)
    // 所以每次进入队列时都会记录入队列的时间,这个时间作用很大,后面在分析调度队列的实现的时候会提到。
    Timestamp time.Time
    // Pod尝试调度的次数。应该说,正常的情况下Pod一次就会调度成功,但是在一些异常情况下(比如资源不足),Pod可能会被尝试调度多次
    Attempts int
    // Pod第一次添加到调度队列的时间,Pod调度成功前可能会多次加回队列,这个变量可以用来计算Pod的调度延迟(即从Pod入队到最终调度成功所用时间)
    InitialAttemptTimestamp time.Time
}

堆(Heap)

这里提到的堆与排序有关,总所周知,golang的快排采用的是堆排序。所以不要与内存管理中”堆”概念混淆。

在kube-scheduler中,堆既有map的高效检索能力,有具备slice的顺序,这对于调度队列来说非常关键。因为调度对象随时可能添加、删除、更新,需要有高效的检索能力快速找到对象,map非常适合。但是golang中的map是无序的,访问map还有一定的随机性(每次range的第一个对象是随机的)。而调度经常会因为优先级、时间、依赖等原因需要对对象排序,slice非常合适,所以就有了堆这个类型。

//pkg/scheduler/internal/heap/heap.go


// Heap定义
type Heap struct {
    // Heap继承了data
    data *data
    // 监控相关,与本文内容无关
    metricRecorder metrics.MetricRecorder
}
// data是Heap核心实现
type data struct {
    // 用map管理所有对象
    items map[string]*heapItem
    // 在用slice管理所有对象的key,对象的key就是Pod的namespace+name,这是在kubernetes下唯一键
    queue []string

    // 获取对象key的函数,毕竟对象是interface{}类型的
    keyFunc KeyFunc
    // 判断两个对象哪个小的函数,了解sort.Sort()的读者此时应该能够猜到它是用来排序的。
    // 所以可以推断出来queue中key是根据lessFunc排过序的,而lessFunc又是传入进来的,
    // 这让Heap中对象的序可定制,这个非常有价值,非常有用
    lessFunc lessFunc
}
// 堆存储对象的定义
type heapItem struct {
    obj   interface{} // 对象,更准确的说应该是指针,应用在调度队列中就是*QueuedPodInfo
    index int         // Heap.queue的索引

不可调度队列(UnschedulablePodsMap)

不可调度队列(UnschedulablePodsMap)管理暂时无法被调度(比如没有满足要求的Node)的Pod。虽然叫队列,其实是map实现,队列本质就是排队的缓冲,他与数据结构中的queue不是一个概念。

// pkg/scheduler/internal/queue/scheduling_queue.go

// 不可调度Pod的队列,就是对map的一种封装,可以简单的把它理解为map就可以了
type UnschedulablePodsMap struct {
    // 本质就是一个map,keyFunc与堆一样,用来获取对象的key
    podInfoMap map[string]*framework.QueuedPodInfo
    keyFunc    func(*v1.Pod) string
    // 监控相关,与本文内容无关
    metricRecorder metrics.MetricRecorder
}

调度队列的抽象

golang开发习惯是用interface抽象一种接口,然后在用struct实现该接口。kube-scheduler对于调度队列也是有它的抽象,如下代码所示

// pkg/scheduler/internal/queue/scheduling_queue.go

type SchedulingQueue interface {
    // PodNominator其实与调度队列的功能关系不大,但是Pod在调度队列中的状态变化需要同步给PodNominator。
    // 本文不对PodNominator做详细说明,在其他后续分析调度器的文章中引用时再做说明。
    framework.PodNominator
    // 向队列中添加待调度的Pod,比如通过kubectl创建一个Pod时,kube-scheduler会通过该接口放入队列中.
    Add(pod *v1.Pod) error
    // 返回队列头部的pod,如果队列为空会被阻塞直到新的pod被添加到队列中.Add()和Pop()的组合有点数据结构中queue的感觉了,
    // 可能不是先入先出,这要通过lessFunc对Pod的进行排序,也就是本文后面提到的优先队列,按照Pod的优先级出队列。
    Pop() (*framework.QueuedPodInfo, error)
    // 首先需要理解什么是调度周期,kube-scheduler没调度一轮算作一个周期。向调度队列添加Pod不能算作一个调度周期,因为没有执行调度动作。
    // 只有从调度队列中弹出(Pop)才会执行调度动作,当然可能因为某些原因调度失败了,但是也算调度了一次,所以调度周期是在Pop()接口中统计的。
    // 每次pop一个pod就会加一,可以理解为调度队列的一种特殊的tick。用该接口可以获取当前的调度周期。
    SchedulingCycle() int64
    // 把无法调度的Pod添加回调度队列,前提条件是Pod不在调度队列中。podSchedulingCycle是通过调用SchedulingCycle()返回的当前调度周期号。
    AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
    // 更新pod
    Update(oldPod, newPod *v1.Pod) error
    // 删除pod
    Delete(pod *v1.Pod) error
    // 首选需要知道调度队列中至少包含activeQ和backoffQ,activeQ是所有ready等待调度的Pod,backoffQ是所有退避Pod。
    // 什么是退避?与退避三舍一个意思,退避的Pod不会被调度,即便优先级再高也没用。那退避到什么程度呢?调度队列用时间来衡量,比如1秒钟。
    // 对于kube-scheduler也是有退避策略的,退避时间按照尝试次数指数增长,但不是无限增长,有退避上限,默认的退避上限是10秒。
    // 在kube-scheduler中Pod退避的原因就是调度失败,退避就是为了减少无意义的频繁重试。
    // 把所有不可调度的Pod移到activeQ或者backoffQ中,至于哪些放到activeQ哪些放入backoffQ后面章节会有说明。
    MoveAllToActiveOrBackoffQueue(event string)
    // 当参数pod指向的Pod被调度后,把通过标签选择该Pod的所有不可调度的Pod移到activeQ,是不是有点绕?
    // 说的直白点就是当Pod1依赖(通过标签选择)Pod2时,在Pod2没有被调度前Pod1是不可调度的,当Pod2被调度后调度器就会调用该接口。
    AssignedPodAdded(pod *v1.Pod)
    // 与AssignedPodAdded一样,只是发生在更新时
    AssignedPodUpdated(pod *v1.Pod)
    // 获取所有挂起的Pod,其实就是队列中所有的Pod,因为调度队列中都是未调度(pending)的Pod
    PendingPods() []*v1.Pod
    // 关闭队列
    Close()
    // 获取队列中不可调度的pod数量
    NumUnschedulablePods() int
    // 启动协程管理队列
    Run()
}

优先队列(PriorityQueue)

优先队列(PriorityQueue)实现了调度队列(SchedulingQueue),优先队列的头部是优先级最高的挂起(Pending)Pod。
优先队列有三个子队列:

  • 一个子队列包含准备好调度的Pod,称为activeQ(是堆类型);
  • 另一个队列包含已尝试并且确定为不可调度的Pod,称为unschedulableQ(UnschedulablePodsMap);
  • 第三个队列是backoffQ,包含从unschedulableQ移出的Pod,退避完成后的Pod将其移到activeQ。 ```go // pkg/scheduler/internal/queue/scheduling_queue.go

type PriorityQueue struct { // 与本文无关 framework.PodNominator // 这两个不解释了,比较简单 stop chan struct{} clock util.Clock

// Pod的初始退避时间,默认值是1秒,可配置.当Pod调度失败后第一次的退避时间就是podInitialBackoffDuration。
 // 第二次尝试调度失败后的退避时间就是podInitialBackoffDuration*2,第三次是podInitialBackoffDuration*4以此类推
podInitialBackoffDuration time.Duration
// Pod的最大退避时间,默认值是10秒,可配置。因为退避时间随着尝试次数指数增长,这个变量就是退避时间的上限值
podMaxBackoffDuration time.Duration

lock sync.RWMutex
cond sync.Cond

// activeQ,是Heap类型,前面解释过了
activeQ *heap.Heap
// backoffQ,也是Heap类型
podBackoffQ *heap.Heap
// unschedulableQ,详情见前面章节
unschedulableQ *UnschedulablePodsMap
// 调度周期,在Pop()中自增,SchedulingCycle()获取这个值
schedulingCycle int64
// 这个变量以调度周期为tick,记录上一次调用MoveAllToActiveOrBackoffQueue()的调度周期。
// 因为调用AddUnschedulableIfNotPresent()接口的时候需要提供调度周期,当调度周期小于moveRequestCycle时,
// 说明该不可调度Pod应该也被挪走,只是在调用接口的时候抢锁晚于MoveAllToActiveOrBackoffQueue()。具体后面会有代码注释。
moveRequestCycle int64

closed bool

} ```