kube-scheduler设计
Kube-scheduler 是 kubernetes 的核心组件之一,kube-scheduler 的目的就是为每一个 pod 选择一个合适的 node,整体流程可以概括为三步,获取未调度的 podList,通过执行一系列调度算法为 pod 选择一个合适的 node,提交数据到 apiserver,其核心则是一系列调度算法的设计与执行。
官方对 kube-scheduler 的调度流程描述 The Kubernetes Scheduler:
For given pod:+---------------------------------------------+| Schedulable nodes: || || +--------+ +--------+ +--------+ || | node 1 | | node 2 | | node 3 | || +--------+ +--------+ +--------+ || |+-------------------+-------------------------+||v+-------------------+-------------------------+Pred. filters: node 3 doesn't have enough resource+-------------------+-------------------------+||v+-------------------+-------------------------+| remaining nodes: || +--------+ +--------+ || | node 1 | | node 2 | || +--------+ +--------+ || |+-------------------+-------------------------+||v+-------------------+-------------------------+Priority function: node 1: p=2node 2: p=5+-------------------+-------------------------+||vselect max{node priority} = node 2
kube-scheduler 目前包含两部分调度算法 predicates 和 priorities,首先执行 predicates 算法过滤部分 node 然后执行 priorities 算法为所有 node 打分,最后从所有 node 中选出分数最高的最为最佳的 node。
参数配置
kube-scheduler 的启动入口位于 cmd/kube-scheduler/scheduler.go 文件,该文件中就包含一个 main 入口函数:
// cmd/kube-scheduler/scheduler.go
func main() {
rand.Seed(time.Now().UnixNano())
// 1、初始化 Cobra.Command 对象
command := app.NewSchedulerCommand()
// 将命令行参数进行标准化(_替换成-)
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// 2、初始化日志
logs.InitLogs()
defer logs.FlushLogs()
// 3、执行命令
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
其中最核心的就是通过 app.NewSchedulerCommand() 或者一个 Cobra 的 Command 对象,然后最下面调用 command.Execute() 函数执行这个命令,所以核心就是 NewSchedulerCommand 函数的实现:
// cmd/kubescheduler/app/server.go
// NewSchedulerCommand 使用默认参数和 registryOptions 创建一个 *cobra.Command 对象
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
// 获取默认的default scheduler配置信息
opts, err := options.NewOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
......
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
fs := cmd.Flags()
// 调用 Options 的 Flags 方法
namedFlagSets := opts.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
// 将默认的所有参数添加到 cmd.Flags 中去
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
......
return cmd
}
其中的 opts.Flags() 方法就是将默认的 Options 配置转换成命令行参数的函数:
// cmd/kube-scheduler/app/options/options.go
func (o *Options) Flags() (nfs cliflag.NamedFlagSets) {
fs := nfs.FlagSet("misc")
fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, `The path to the configuration file. The following flags can overwrite fields in this file:
--address
--port
--use-legacy-policy-config
--policy-configmap
--policy-config-file
--algorithm-provider`)
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
o.Authentication.AddFlags(nfs.FlagSet("authentication"))
o.Authorization.AddFlags(nfs.FlagSet("authorization"))
o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)
options.BindLeaderElectionFlags(&o.ComponentConfig.LeaderElection, nfs.FlagSet("leader election"))
utilfeature.DefaultMutableFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
o.Metrics.AddFlags(nfs.FlagSet("metrics"))
o.Logs.AddFlags(nfs.FlagSet("logs"))
return nfs
}
其中第一个参数 --config 就可以用来指定配置文件。到这里我们就获取到了调度器所有默认的配置参数了。
启动调度器
接下来分析真正运行调度器的 runCommand 函数的实现。
// cmd/kube-scheduler/app/server.go
// 运行调度器真正的函数
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 根据命令行 args 和 options 创建完整的配置和调度程序
cc, sched, err := Setup(ctx, opts, registryOptions...)
if err != nil {
return err
}
// 如果指定了 WriteConfigTo 参数
if len(opts.WriteConfigTo) > 0 {
// 将配置写入到指定的文件中
if err := options.WriteConfigFile(opts.WriteConfigTo, &cc.ComponentConfig); err != nil {
return err
}
klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
return nil
}
// 真正去启动调度器
return Run(ctx, cc, sched)
}
上面的函数首先判断是否是执行类似于 --version 这样的操作,如果是这打印后直接退出,然后根据命令行参数和选项通过 Setup 函数构造 CompletedConfig 配置和 Scheduler 调度器对象。
// cmd/kube-scheduler/app/server.go/
// 根据命令行参数和选项构造完整的配置和调度器对象
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
// 校验命令行选项
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}
// 获取调度器Config对象,该对象拥有一个调度器所有的上下文信息
c, err := opts.Config()
if err != nil {
return nil, nil, err
}
// 获取 completed 配置
cc := c.Complete()
outOfTreeRegistry := make(runtime.Registry)
// outOfTree plugins插件注册
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}
recorderFactory := getRecorderFactory(&cc)
// 创建调度器
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
if err != nil {
return nil, nil, err
}
return &cc, sched, nil
}
该函数首先调用 opts.Validate() 函数对所有参数进行校验,接着使用 opts.Config() 函数创建 *schedulerappconfig.Config 对象,该对象拥有一个调度器所有的上下文信息。
// cmd/kube-scheduler/app/options/options.go
// Config 返回一个调度器配置对象
func (o *Options) Config() (*schedulerappconfig.Config, error) {
if o.SecureServing != nil {
if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
}
c := &schedulerappconfig.Config{}
// 将扩展调度器配置应用于调度器应用程序配置
if err := o.ApplyTo(c); err != nil {
return nil, err
}
// 创建 kube 客户端
client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
if err != nil {
return nil, err
}
c.EventBroadcaster = events.NewEventBroadcasterAdapter(eventClient)
// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig
if c.ComponentConfig.LeaderElection.LeaderElect {
// Use the scheduler name in the first profile to record leader election.
coreRecorder := c.EventBroadcaster.DeprecatedNewLegacyRecorder(c.ComponentConfig.Profiles[0].SchedulerName)
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, coreRecorder)
if err != nil {
return nil, err
}
}
c.Client = client
c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
c.PodInformer = scheduler.NewPodInformer(client, 0)
c.LeaderElection = leaderElectionConfig
return c, nil
}
上面函数的核心是通过 o.ApplyTo(c) 函数将 Options 转换成了 *schedulerappconfig.Config 对象,
// cmd/kube-scheduler/app/options/options.go
// 将调度程序 options 转换成调度程序应用配置
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
if len(o.ConfigFile) == 0 {
c.ComponentConfig = o.ComponentConfig
// 如果未加载任何配置文件,则应用 deprecated flags(这是旧的行为)
o.Deprecated.ApplyTo(&c.ComponentConfig)
if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
return err
}
} else {
cfg, err := loadConfigFromFile(o.ConfigFile)
if err != nil {
return err
}
if err := validation.ValidateKubeSchedulerConfiguration(cfg).ToAggregate(); err != nil {
return err
}
c.ComponentConfig = *cfg
......
}
......
return nil
}
上面的转换函数会首先判断是否配置了 ConfigFile(也就是 --config 参数),如果配置了则会加载对应的配置文件转换成对应的 KubeSchedulerConfiguration 对象,然后校验有效性,如果都正常则将其赋给 schedulerappconfig.Config 的 ComponentConfig 属性;如果没有配置 ConfigFile,则使用旧的参数进行配置。
接着会去调用 scheduler.New() 函数去构造一个真正的调度器对象,该函数的具体实现如下所示:
// pkg/scheduler/scheduler.go
// 配置调度器
type Option func(*schedulerOptions)
var defaultSchedulerOptions = schedulerOptions{
profiles: []schedulerapi.KubeSchedulerProfile{
// Profiles 的默认插件是从算法提供程序配置的
{SchedulerName: v1.DefaultSchedulerName}, // 默认的调度器名称为 default-scheduler
},
schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{
Provider: defaultAlgorithmSourceProviderName(), // 默认的算法源提供器名称为 DefaultProvider
},
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
}
// 返回一个Scheduler对象
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {
stopEverything := stopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
// 默认的调度器配置
options := defaultSchedulerOptions
for _, opt := range opts {
// 将默认的调度器配置调用 Option 重新配置一次
opt(&options)
}
schedulerCache := internalcache.New(30*time.Second, stopEverything)
// 所有默认inTree插件
registry := frameworkplugins.NewInTreeRegistry()
// 合并outTree和inTree插件并注册
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
snapshot := internalcache.NewEmptySnapshot()
configurator := &Configurator{
client: client,
recorderFactory: recorderFactory,
informerFactory: informerFactory,
podInformer: podInformer,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
percentageOfNodesToScore: options.percentageOfNodesToScore,
podInitialBackoffSeconds: options.podInitialBackoffSeconds,
podMaxBackoffSeconds: options.podMaxBackoffSeconds,
profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
registry: registry,
nodeInfoSnapshot: snapshot,
extenders: options.extenders,
frameworkCapturer: options.frameworkCapturer,
}
metrics.Register()
var sched *Scheduler
// SchedulerAlgorithmSource 是调度程序算法的源
// 包含Policy与Provider两种方式,必须指定一个源字段,并且源字段是互斥的
source := options.schedulerAlgorithmSource
switch {
case source.Provider != nil:
// 从一个算法 provider 中创建配置,这是我们现在需要重点关注的方式
sc, err := configurator.createFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
sched = sc
case source.Policy != nil:
// 从用户指定的策略源中创建配置,这是以前的扩展方式
policy := &schedulerapi.Policy{}
switch {
case source.Policy.File != nil:
if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
return nil, err
}
case source.Policy.ConfigMap != nil:
if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
return nil, err
}
}
...
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
...
// addAllEventHandlers 是在测试和 Scheduler 中使用的帮助程序函数,用于为各种 informers 添加事件处理程序
addAllEventHandlers(sched, informerFactory, podInformer)
return sched, nil
}
首先将默认的调度器配置通过传递的 Option 参数进行一一配置,然后重点就是根据应用过后的配置来判断调度算法的源是 Provider 还是 Policy 方式,我们现在的重点是调度框架,所以主要关注 Provider 这种配置,Policy 是以前的扩展调度器的方式。所以调度器的实例化核心是通过 configurator.createFromProvider(*source.Provider) 该函数来实现的。
// pkg/scheduler/factory.go
// createFromProvider 从注册的算法提供器来创建调度器
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
r := algorithmprovider.NewRegistry()
// 获取指定算法提供器的插件集合
defaultPlugins, exist := r[providerName]
if !exist {
return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
}
for i := range c.profiles {
prof := &c.profiles[i]
plugins := &schedulerapi.Plugins{}
plugins.Append(defaultPlugins)
// Apply 合并来自自定义插件的插件配置
plugins.Apply(prof.Plugins)
prof.Plugins = plugins
}
return c.create()
}
// 从一组已注册的插件集合中创建一个调度器
func (c *Configurator) create() (*Scheduler, error) {
var extenders []framework.Extender
var ignoredExtendedResources []string
if len(c.extenders) != 0 {
// Extender 方式扩展调度器
......
}
.....
// Profiles 需要提供有效的 queue sort 插件
// 所有配置文件必须在 QueueSort 扩展点使用相同的插件,并具有相同的配置参数(如果适用)。 这是因为调度器只有一个的队列保存悬决的 Pod。
lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
// 将优先级队列初始化为调度队列
podQueue := internalqueue.NewSchedulingQueue(
lessFn,
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
)
// Setup cache debugger.
debugger := cachedebugger.New(
c.informerFactory.Core().V1().Nodes().Lister(),
c.podInformer.Lister(),
c.schedulerCache,
podQueue,
)
debugger.ListenForSignal(c.StopEverything)
// 创建一个 genericScheduler 对象,该对象实现了 ScheduleAlgorithm 接口,具体的调度实现就是这个对象实现的
algo := core.NewGenericScheduler(
c.schedulerCache,
c.nodeInfoSnapshot,
extenders,
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
c.disablePreemption,
c.percentageOfNodesToScore,
)
return &Scheduler{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
Profiles: profiles,
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
StopEverything: c.StopEverything,
SchedulingQueue: podQueue,
}, nil
}
通过上面的一些列操作后就实例化了真正的调度器对象,最后我们需要去启动一系列的资源对象的事件监听程序,比如 Pod、Node 等对象,上面实例化函数中通过 addAllEventHandlers(sched, informerFactory, podInformer) 来实现的,关于这些资源对象对应的 onAdd、onUpdate、onDelete 操作均在 pkg/scheduler/eventhandlers.go 文件中实现的,这样比如当创建一个 Pod 过后,我们的调度器通过 watch 就会在 onAdd 事件中接收到该操作,然后我们就可以根据 queue sort 插件将器加入到带调度队列中去开始调度了。
最后就是去调用 Run 函数来真正启动调度器了,首先会等待所有的 cache 同步完成,然后开始进行调度操作。
// cmd/kube-scheduler/app/server.go/
// Run 根据指定的配置执行调度程序,仅在出现错误或上下文完成时才返回
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())
......
// 启动 healthz 以及 metrics 相关服务
......
// 启动所有 informers
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())
// 调度之前等待所有 caches 同步完成
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// 开启了 leader election
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// 如果没有开启 Leader election,这直接调用调度器对象的 Run 函数
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
sched.Run()主要执行以下步骤
- 等待Cache同步完毕
- 启动控制Pod队列的goroutines
通过 wait.UntilWithContext() 方法,定时调用 sched.scheduleOne() 方法, sched.scheduleOne() 方法完成一轮调度
// pkg/scheduler/scheduler.go:312 // 1、等待 cache 同步完成 func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } // 2、运行SchedulingQueue sched.SchedulingQueue.Run() // 3、启动通过sched.scheduleOne()方法定时调度的goroutines wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }scheduleOne
scheduleOne主要为单个pod选择一个适合的节点,为调度逻辑的核心函数。func (sched *Scheduler) scheduleOne(ctx context.Context) { podInfo := sched.NextPod() if podInfo == nil || podInfo.Pod == nil { return } pod := podInfo.Pod prof, err := sched.profileForPod(pod) if err != nil { klog.Error(err) return } if sched.skipPodSchedule(prof, pod) { return } klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name) // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) if err != nil { // Schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. nominatedNode := "" if fitError, ok := err.(*core.FitError); ok { if !prof.HasPostFilterPlugins() { klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.") } else { // Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle. result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses) if status.Code() == framework.Error { klog.Errorf("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status) } else { klog.V(5).Infof("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status) } if status.IsSuccess() && result != nil { nominatedNode = result.NominatedNodeName } } // Pod did not fit anywhere, so it is counted as a failure. If preemption // succeeds, the pod should get counted as a success the next time we try to // schedule it. (hopefully) metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) } else if err == core.ErrNoNodesAvailable { // No nodes available is counted as unschedulable rather than an error. metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) } else { klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) } sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode) return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. assumedPodInfo := podInfo.DeepCopy() assumedPod := assumedPodInfo.Pod // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) // This is most probably result of a BUG in retrying logic. // We report an error here so that pod scheduling can be retried. // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "") return } // Run the Reserve method of reserve plugins. if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) // trigger un-reserve to clean up state associated with the reserved Pod prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "") return } // Run "permit" plugins. runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() { var reason string if runPermitStatus.IsUnschedulable() { metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) reason = v1.PodReasonUnschedulable } else { metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) reason = SchedulerError } // One of the plugins returned status different than success or wait. prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "") return } // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { bindingCycleCtx, cancel := context.WithCancel(ctx) defer cancel() metrics.SchedulerGoroutines.WithLabelValues("binding").Inc() defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec() waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod) if !waitOnPermitStatus.IsSuccess() { var reason string if waitOnPermitStatus.IsUnschedulable() { metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) reason = v1.PodReasonUnschedulable } else { metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) reason = SchedulerError } // trigger un-reserve plugins to clean up state associated with the reserved Pod prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "") return } // Run "prebind" plugins. preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) // trigger un-reserve plugins to clean up state associated with the reserved Pod prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "") return } err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state) if err != nil { metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) // trigger un-reserve plugins to clean up state associated with the reserved Pod prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil { klog.Errorf("scheduler cache ForgetPod failed: %v", err) } sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "") } else { // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. if klog.V(2).Enabled() { klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) } metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start)) metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // Run "postbind" plugins. prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) } }() }
Leaderelection机制
kubernetes支持3种资源锁:
- EndpointsResourceLock(依赖于endpoints资源,默认资源锁)
- ConfigMapResourceLock(依赖于config资源)
- LeasesResourceLock(依赖于lease资源),
可通过—leader-elect-resource-lock参数指定使用哪种资源锁。
分布式锁的key存在于etcd集群的/registry/services/endpoints/kube-system/kube-scheduler中,该key存储竞选为leader节点的信息。
$ etcdctl get /registry/services/endpoints/kube-system/kube-scheduler
/registry/services/endpoints/kube-system/kube-scheduler
k8s
v1 Endpoints�
�
kube-scheduler
kube-system"*$46a65347-41b8-49b9-90ad-bbc020a4564b2����b�
(control-plane.alpha.kubernetes.io/leader�{"holderIdentity":"vm192-168-0-218_9c5be54b-54f4-4b84-85f4-b88883c05cb5","leaseDurationSeconds":60,"acquireTime":"2021-05-17T03:32:24Z","renewTime":"2021-05-17T03:55:36Z","leaderTransitions":4}z"
leader节点信息,通过LeaderElectionRecord结构体描述:
type LeaderElectionRecord struct {
HolderIdentity string `json:"holderIdentity"` // 领导者的身份表示,通常为HostName_<hash值>
LeaseDurationSeconds int `json:"leaseDurationSeconds"` // 领导者租约的时长
AcquireTime metav1.Time `json:"acquireTime"` // 领导者获取锁的时间
RenewTime metav1.Time `json:"renewTime"` // 领导者续租的时间
LeaderTransitions int `json:"leaderTransitions"` // 领导者选举切换的次数
}
每种资源锁实现了对key(资源锁)的操作方法:
type Interface interface {
Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
Create(ctx context.Context, ler LeaderElectionRecord) error
Update(ctx context.Context, ler LeaderElectionRecord) error
RecordEvent(string)
Identity() string
Describe() string
}
func (o *Options) Config() (*schedulerappconfig.Config, error) {
...
// 1.准备kube clients.
client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
if err != nil {
return nil, err
}
// 2.创建event recorder,记录选举产生的事件
c.EventBroadcaster = events.NewEventBroadcasterAdapter(eventClient)
var leaderElectionConfig *leaderelection.LeaderElectionConfig
if c.ComponentConfig.LeaderElection.LeaderElect {
// 3.以profile第一个schedulerName为event recorder添加name属性
coreRecorder := c.EventBroadcaster.DeprecatedNewLegacyRecorder(c.ComponentConfig.Profiles[0].SchedulerName)
// 4. 创建一个与配置相关联的新资源锁
leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, coreRecorder)
if err != nil {
return nil, err
}
}
...
c.LeaderElection = leaderElectionConfig
return c, nil
}
makeLeaderElectionConfig返回该节点的LeaderElectionConfig实例。
func makeLeaderElectionConfig(config componentbaseconfig.LeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("unable to get hostname: %v", err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id := hostname + "_" + string(uuid.NewUUID())
rl, err := resourcelock.New(config.ResourceLock,
config.ResourceNamespace,
config.ResourceName,
client.CoreV1(),
client.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
return nil, fmt.Errorf("couldn't create resource lock: %v", err)
}
return &leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: config.LeaseDuration.Duration,
RenewDeadline: config.RenewDeadline.Duration,
RetryPeriod: config.RetryPeriod.Duration,
WatchDog: leaderelection.NewLeaderHealthzAdaptor(time.Second * 20),
Name: "kube-scheduler",
}, nil
}
resourcelock.New根据lockType返回不同类型资源锁
func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
endpointsLock := &EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coreClient,
LockConfig: rlc,
}
configmapLock := &ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coreClient,
LockConfig: rlc,
}
leaseLock := &LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coordinationClient,
LockConfig: rlc,
}
switch lockType {
case EndpointsResourceLock: // 选举信息保存在endpoints中
return endpointsLock, nil
case ConfigMapsResourceLock: // 选举信息保存在configMap中
return configmapLock, nil
case LeasesResourceLock: // 选举信息保存在lease中
return leaseLock, nil
case EndpointsLeasesResourceLock: // 优先保存在endpoints中,失败的话,保存在lease
return &MultiLock{
Primary: endpointsLock,
Secondary: leaseLock,
}, nil
case ConfigMapsLeasesResourceLock: // 优先保存在configMap中,失败的话,保存在lease
return &MultiLock{
Primary: configmapLock,
Secondary: leaseLock,
}, nil
default:
return nil, fmt.Errorf("Invalid lock-type %s", lockType)
}
}
leader选举过程
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer func() {
le.config.Callbacks.OnStoppedLeading()
}()
if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx)
le.renew(ctx)
}
SchedulingQueue解析
调度队列
调度队列中的Pod
// pkg/scheduler/framework/v1alpha1/types.go
// QueuedPodInfo是在Pod的API对象基础上增加了一些与调度队列相关的变量,所以在调度队列中管理的Pod对象是QueuedPodInfo类型的
type QueuedPodInfo struct {
// 继承Pod的API类型
Pod *v1.Pod
// Pod添加到调度队列的时间。因为Pod可能会频繁的从调度队列中取出(用于调度)然后再放入调度队列(不可调度)
// 所以每次进入队列时都会记录入队列的时间,这个时间作用很大,后面在分析调度队列的实现的时候会提到。
Timestamp time.Time
// Pod尝试调度的次数。应该说,正常的情况下Pod一次就会调度成功,但是在一些异常情况下(比如资源不足),Pod可能会被尝试调度多次
Attempts int
// Pod第一次添加到调度队列的时间,Pod调度成功前可能会多次加回队列,这个变量可以用来计算Pod的调度延迟(即从Pod入队到最终调度成功所用时间)
InitialAttemptTimestamp time.Time
}
堆(Heap)
这里提到的堆与排序有关,总所周知,golang的快排采用的是堆排序。所以不要与内存管理中”堆”概念混淆。
在kube-scheduler中,堆既有map的高效检索能力,有具备slice的顺序,这对于调度队列来说非常关键。因为调度对象随时可能添加、删除、更新,需要有高效的检索能力快速找到对象,map非常适合。但是golang中的map是无序的,访问map还有一定的随机性(每次range的第一个对象是随机的)。而调度经常会因为优先级、时间、依赖等原因需要对对象排序,slice非常合适,所以就有了堆这个类型。
//pkg/scheduler/internal/heap/heap.go
// Heap定义
type Heap struct {
// Heap继承了data
data *data
// 监控相关,与本文内容无关
metricRecorder metrics.MetricRecorder
}
// data是Heap核心实现
type data struct {
// 用map管理所有对象
items map[string]*heapItem
// 在用slice管理所有对象的key,对象的key就是Pod的namespace+name,这是在kubernetes下唯一键
queue []string
// 获取对象key的函数,毕竟对象是interface{}类型的
keyFunc KeyFunc
// 判断两个对象哪个小的函数,了解sort.Sort()的读者此时应该能够猜到它是用来排序的。
// 所以可以推断出来queue中key是根据lessFunc排过序的,而lessFunc又是传入进来的,
// 这让Heap中对象的序可定制,这个非常有价值,非常有用
lessFunc lessFunc
}
// 堆存储对象的定义
type heapItem struct {
obj interface{} // 对象,更准确的说应该是指针,应用在调度队列中就是*QueuedPodInfo
index int // Heap.queue的索引
不可调度队列(UnschedulablePodsMap)
不可调度队列(UnschedulablePodsMap)管理暂时无法被调度(比如没有满足要求的Node)的Pod。虽然叫队列,其实是map实现,队列本质就是排队的缓冲,他与数据结构中的queue不是一个概念。
// pkg/scheduler/internal/queue/scheduling_queue.go
// 不可调度Pod的队列,就是对map的一种封装,可以简单的把它理解为map就可以了
type UnschedulablePodsMap struct {
// 本质就是一个map,keyFunc与堆一样,用来获取对象的key
podInfoMap map[string]*framework.QueuedPodInfo
keyFunc func(*v1.Pod) string
// 监控相关,与本文内容无关
metricRecorder metrics.MetricRecorder
}
调度队列的抽象
golang开发习惯是用interface抽象一种接口,然后在用struct实现该接口。kube-scheduler对于调度队列也是有它的抽象,如下代码所示
// pkg/scheduler/internal/queue/scheduling_queue.go
type SchedulingQueue interface {
// PodNominator其实与调度队列的功能关系不大,但是Pod在调度队列中的状态变化需要同步给PodNominator。
// 本文不对PodNominator做详细说明,在其他后续分析调度器的文章中引用时再做说明。
framework.PodNominator
// 向队列中添加待调度的Pod,比如通过kubectl创建一个Pod时,kube-scheduler会通过该接口放入队列中.
Add(pod *v1.Pod) error
// 返回队列头部的pod,如果队列为空会被阻塞直到新的pod被添加到队列中.Add()和Pop()的组合有点数据结构中queue的感觉了,
// 可能不是先入先出,这要通过lessFunc对Pod的进行排序,也就是本文后面提到的优先队列,按照Pod的优先级出队列。
Pop() (*framework.QueuedPodInfo, error)
// 首先需要理解什么是调度周期,kube-scheduler没调度一轮算作一个周期。向调度队列添加Pod不能算作一个调度周期,因为没有执行调度动作。
// 只有从调度队列中弹出(Pop)才会执行调度动作,当然可能因为某些原因调度失败了,但是也算调度了一次,所以调度周期是在Pop()接口中统计的。
// 每次pop一个pod就会加一,可以理解为调度队列的一种特殊的tick。用该接口可以获取当前的调度周期。
SchedulingCycle() int64
// 把无法调度的Pod添加回调度队列,前提条件是Pod不在调度队列中。podSchedulingCycle是通过调用SchedulingCycle()返回的当前调度周期号。
AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
// 更新pod
Update(oldPod, newPod *v1.Pod) error
// 删除pod
Delete(pod *v1.Pod) error
// 首选需要知道调度队列中至少包含activeQ和backoffQ,activeQ是所有ready等待调度的Pod,backoffQ是所有退避Pod。
// 什么是退避?与退避三舍一个意思,退避的Pod不会被调度,即便优先级再高也没用。那退避到什么程度呢?调度队列用时间来衡量,比如1秒钟。
// 对于kube-scheduler也是有退避策略的,退避时间按照尝试次数指数增长,但不是无限增长,有退避上限,默认的退避上限是10秒。
// 在kube-scheduler中Pod退避的原因就是调度失败,退避就是为了减少无意义的频繁重试。
// 把所有不可调度的Pod移到activeQ或者backoffQ中,至于哪些放到activeQ哪些放入backoffQ后面章节会有说明。
MoveAllToActiveOrBackoffQueue(event string)
// 当参数pod指向的Pod被调度后,把通过标签选择该Pod的所有不可调度的Pod移到activeQ,是不是有点绕?
// 说的直白点就是当Pod1依赖(通过标签选择)Pod2时,在Pod2没有被调度前Pod1是不可调度的,当Pod2被调度后调度器就会调用该接口。
AssignedPodAdded(pod *v1.Pod)
// 与AssignedPodAdded一样,只是发生在更新时
AssignedPodUpdated(pod *v1.Pod)
// 获取所有挂起的Pod,其实就是队列中所有的Pod,因为调度队列中都是未调度(pending)的Pod
PendingPods() []*v1.Pod
// 关闭队列
Close()
// 获取队列中不可调度的pod数量
NumUnschedulablePods() int
// 启动协程管理队列
Run()
}
优先队列(PriorityQueue)
优先队列(PriorityQueue)实现了调度队列(SchedulingQueue),优先队列的头部是优先级最高的挂起(Pending)Pod。
优先队列有三个子队列:
- 一个子队列包含准备好调度的Pod,称为activeQ(是堆类型);
- 另一个队列包含已尝试并且确定为不可调度的Pod,称为unschedulableQ(UnschedulablePodsMap);
- 第三个队列是backoffQ,包含从unschedulableQ移出的Pod,退避完成后的Pod将其移到activeQ。 ```go // pkg/scheduler/internal/queue/scheduling_queue.go
type PriorityQueue struct { // 与本文无关 framework.PodNominator // 这两个不解释了,比较简单 stop chan struct{} clock util.Clock
// Pod的初始退避时间,默认值是1秒,可配置.当Pod调度失败后第一次的退避时间就是podInitialBackoffDuration。
// 第二次尝试调度失败后的退避时间就是podInitialBackoffDuration*2,第三次是podInitialBackoffDuration*4以此类推
podInitialBackoffDuration time.Duration
// Pod的最大退避时间,默认值是10秒,可配置。因为退避时间随着尝试次数指数增长,这个变量就是退避时间的上限值
podMaxBackoffDuration time.Duration
lock sync.RWMutex
cond sync.Cond
// activeQ,是Heap类型,前面解释过了
activeQ *heap.Heap
// backoffQ,也是Heap类型
podBackoffQ *heap.Heap
// unschedulableQ,详情见前面章节
unschedulableQ *UnschedulablePodsMap
// 调度周期,在Pop()中自增,SchedulingCycle()获取这个值
schedulingCycle int64
// 这个变量以调度周期为tick,记录上一次调用MoveAllToActiveOrBackoffQueue()的调度周期。
// 因为调用AddUnschedulableIfNotPresent()接口的时候需要提供调度周期,当调度周期小于moveRequestCycle时,
// 说明该不可调度Pod应该也被挪走,只是在调用接口的时候抢锁晚于MoveAllToActiveOrBackoffQueue()。具体后面会有代码注释。
moveRequestCycle int64
closed bool
} ```
