在早期的版本中 NodeController 只有一种,v1.16 版本中 NodeController 已经分为了 NodeIpamController 与 NodeLifecycleController,本文主要介绍 NodeLifecycleController。

//EnableTaintManager默认为true,在pkg\controller\nodelifecycle\config\v1alpha1\defaults.go_ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager,
//TaintBasedEvictions默认为true,在pkg\features\kube_features.go_utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),

NodeLifecycleController 的功能

NodeLifecycleController 主要功能是定期监控 node 的状态并根据 node 的 condition 添加对应的 taint 标签或者直接驱逐 node 上的 pod。

设置污点逻辑doNodeProcessingPassWorker和doNoExecuteTaintingPass

根据node condition设置taint主要由两个循环来负责, 这两个循环在程序启动后会不断执行:

  1. doNodeProcessingPassWorker中主要的逻辑就是: doNoScheduleTaintingPass, 该函数会根据node当前的condition设置unschedulable的taint,便于调度器根据该值进行调度决策,不再调度新pod至该node。
  2. doNoExecuteTaintingPass 会不断地从上面提到的zoneNoExecuteTainter队列中获取元素进行处理,根据node condition设置对应的NotReadyUnreachable的taint, 如果NodeReadycondition为false则taint为NotReady, 如果为unknown,则taint为Unreachable, 这两种状态只能同时存在一种!

上面提到从zoneNoExecuteTainter队列中出队时是有一定的速率限制,防止大规模快速驱逐pod。该元素是由RateLimitedTimedQueue数据结构来实现:

  1. // RateLimitedTimedQueue is a unique item priority queue ordered by
  2. // the expected next time of execution. It is also rate limited.
  3. type RateLimitedTimedQueue struct {
  4. queue UniqueQueue
  5. limiterLock sync.Mutex
  6. limiter flowcontrol.RateLimiter
  7. }

从其定义就可以说明了这是一个 去重的优先级队列, 对于每个加入到其中的node根据执行时间(此处即为加入时间)进行排序,优先级队列肯定是通过heap数据结构来实现,而去重则通过set数据结构来实现。在每次doNoExecuteTaintingPass执行的时候,首先尽力从TokenBucketRateLimiter中获取token,然后从队头获取元素进行处理,这样就能控制速度地依次处理最先加入的node了。

驱逐逻辑NoExecuteTaintManager

pkg\controller\nodelifecycle\scheduler\taint_manager.go
pod驱逐在NewNoExecuteTaintManager控制器:

/ NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
// communicate with the API server.
func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager {
    eventBroadcaster := record.NewBroadcaster()
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
    eventBroadcaster.StartLogging(klog.Infof)
    if c != nil {
        klog.V(0).Infof("Sending events to api server.")
        eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.CoreV1().Events("")})
    } else {
        klog.Fatalf("kubeClient is nil when starting NodeController")
    }

    tm := &NoExecuteTaintManager{
        client:                c,
        recorder:              recorder,
        getPod:                getPod,
        getNode:               getNode,
        getPodsAssignedToNode: getPodsAssignedToNode,
        taintedNodes:          make(map[string][]v1.Taint),

        nodeUpdateQueue: workqueue.NewNamed("noexec_taint_node"),
        podUpdateQueue:  workqueue.NewNamed("noexec_taint_pod"),
    }
    tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))

    return tm
}

NoExecuteTaintManager的handleNodeUpdate方法,先获取到node上Effect为NoExecute的污点,加到taintedNodes这个map中。

NoExecuteTaintManager的handlePodUpdate在pod变化时,会从map中取出污点,进入processPodOnNode方法处理当前node上的这个pod

//处理包含Execute类型污点node上的pod
func (tc *NoExecuteTaintManager) processPodOnNode(
    podNamespacedName types.NamespacedName,
    nodeName string,
    tolerations []v1.Toleration,
    taints []v1.Taint,
    now time.Time,
) {
    if len(taints) == 0 {
        tc.cancelWorkWithEvent(podNamespacedName)
    }
    allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
    if !allTolerated {
        //不是容忍所有的污点,所以应该立即删除pod
        klog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)
        // We're canceling scheduled work (if any), as we're going to delete the Pod right away.
        //我们将取消预定的工作(如果有的话),因为我们将立即删除Pod
        tc.cancelWorkWithEvent(podNamespacedName)
        //将pod信息添加到污点驱逐队列
        tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
        return
    }
    //获取到pod中NoExecute类型的tolerations中配置的tolerationSeconds时间
    minTolerationTime := getMinTolerationTime(usedTolerations)
    // getMinTolerationTime returns negative value to denote infinite toleration.
    // getMinTolerationTime返回负值以表示一直容忍,不做处理
    if minTolerationTime < 0 {
        klog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())
        return
    }
    //当前时间
    startTime := now
    //算出应该被删除的时间
    triggerTime := startTime.Add(minTolerationTime)
    //todo ???
    scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
    if scheduledEviction != nil {
        startTime = scheduledEviction.CreatedAt
        if startTime.Add(minTolerationTime).Before(triggerTime) {
            return
        }
        tc.cancelWorkWithEvent(podNamespacedName)
    }
    //加入删除队列
    tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}

判断不是容忍所有的污点,所以应该立即删除pod,我们将取消预定的工作(如果有的话),因为我们将立即删除Pod。
获取到pod中NoExecute类型的tolerations中配置的tolerationSeconds时间;
getMinTolerationTime返回负值以表示一直容忍,不做处理;
算出应该被删除的时间;
加入删除队列;

_AddWork会创建一个协程,
将工作添加到WorkerQueue中,该工作将在`fireAt之前执行删除逻辑:

// AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.
// AddWork将工作添加到WorkerQueue中,该工作将在`fireAt`之前执行
func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) {
    key := args.KeyFromWorkArgs()
    klog.V(4).Infof("Adding TimedWorkerQueue item %v at %v to be fired at %v", key, createdAt, fireAt)

    q.Lock()
    defer q.Unlock()
    if _, exists := q.workers[key]; exists {
        klog.Warningf("Trying to add already existing work for %+v. Skipping.", args)
        return
    }
    worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key))
    q.workers[key] = worker
}

CreateWorker创建一个TimedWorker,它将在不早于fireAt的情况下执行f:

// CreateWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
// CreateWorker创建一个TimedWorker,它将在不早于fireAt的情况下执行f
func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker {
    delay := fireAt.Sub(createdAt)
    if delay <= 0 {
        go f(args)
        return nil
    }
    timer := time.AfterFunc(delay, func() { f(args) })
    return &TimedWorker{
        WorkItem:  args,
        CreatedAt: createdAt,
        FireAt:    fireAt,
        Timer:     timer,
    }
}

f函数是什么?就是上面创建NoExecuteTaintManager时注册的deletePodHandler(c, tm.emitPodDeletionEvent)函数。

会间隔10ms,共重试5次删除,删除失败将在下次pod更新时做处理。


//污点驱逐的核心删除逻辑
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
    return func(args *WorkArgs) error {
        ns := args.NamespacedName.Namespace
        name := args.NamespacedName.Name
        klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
        if emitEventFunc != nil {
            emitEventFunc(args.NamespacedName)
        }
        var err error
        for i := 0; i < retries; i++ {
            err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{})
            if err == nil {
                break
            }
            time.Sleep(10 * time.Millisecond)
        }
        return err
    }
}

参考

最详细Kubernetes 1.12.0 Kube-controller-manager之node-lifecycle-controller源码阅读分析
https://my.oschina.net/u/3797264/blog/2885807

田飞雨kubernetes源码分析:
https://www.bookstack.cn/read/source-code-reading-notes/kubernetes-garbagecollector_controller.md#gc.resyncMonitors

kubernetes中node心跳处理逻辑分析
https://www.cnblogs.com/gaorong/p/12312590.html