在早期的版本中 NodeController 只有一种,v1.16 版本中 NodeController 已经分为了 NodeIpamController 与 NodeLifecycleController,本文主要介绍 NodeLifecycleController。
//EnableTaintManager默认为true,在pkg\controller\nodelifecycle\config\v1alpha1\defaults.go_ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager,
//TaintBasedEvictions默认为true,在pkg\features\kube_features.go_utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
NodeLifecycleController 的功能
NodeLifecycleController 主要功能是定期监控 node 的状态并根据 node 的 condition 添加对应的 taint 标签或者直接驱逐 node 上的 pod。
设置污点逻辑doNodeProcessingPassWorker和doNoExecuteTaintingPass
根据node condition设置taint主要由两个循环来负责, 这两个循环在程序启动后会不断执行:
doNodeProcessingPassWorker
中主要的逻辑就是:doNoScheduleTaintingPass
, 该函数会根据node当前的condition设置unschedulable
的taint,便于调度器根据该值进行调度决策,不再调度新pod至该node。doNoExecuteTaintingPass
会不断地从上面提到的zoneNoExecuteTainter
队列中获取元素进行处理,根据node condition设置对应的NotReady
或Unreachable
的taint, 如果NodeReady
condition为false则taint为NotReady, 如果为unknown,则taint为Unreachable, 这两种状态只能同时存在一种!
上面提到从zoneNoExecuteTainter
队列中出队时是有一定的速率限制,防止大规模快速驱逐pod。该元素是由RateLimitedTimedQueue
数据结构来实现:
// RateLimitedTimedQueue is a unique item priority queue ordered by
// the expected next time of execution. It is also rate limited.
type RateLimitedTimedQueue struct {
queue UniqueQueue
limiterLock sync.Mutex
limiter flowcontrol.RateLimiter
}
从其定义就可以说明了这是一个 去重的优先级队列
, 对于每个加入到其中的node根据执行时间(此处即为加入时间)进行排序,优先级队列肯定是通过heap数据结构来实现,而去重则通过set数据结构来实现。在每次doNoExecuteTaintingPass
执行的时候,首先尽力从TokenBucketRateLimiter中获取token,然后从队头获取元素进行处理,这样就能控制速度地依次处理最先加入的node了。
驱逐逻辑NoExecuteTaintManager
pkg\controller\nodelifecycle\scheduler\taint_manager.go
pod驱逐在NewNoExecuteTaintManager控制器:
/ NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
// communicate with the API server.
func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
eventBroadcaster.StartLogging(klog.Infof)
if c != nil {
klog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.CoreV1().Events("")})
} else {
klog.Fatalf("kubeClient is nil when starting NodeController")
}
tm := &NoExecuteTaintManager{
client: c,
recorder: recorder,
getPod: getPod,
getNode: getNode,
getPodsAssignedToNode: getPodsAssignedToNode,
taintedNodes: make(map[string][]v1.Taint),
nodeUpdateQueue: workqueue.NewNamed("noexec_taint_node"),
podUpdateQueue: workqueue.NewNamed("noexec_taint_pod"),
}
tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))
return tm
}
NoExecuteTaintManager的handleNodeUpdate方法,先获取到node上Effect为NoExecute的污点,加到taintedNodes这个map中。
NoExecuteTaintManager的handlePodUpdate在pod变化时,会从map中取出污点,进入processPodOnNode方法处理当前node上的这个pod
//处理包含Execute类型污点node上的pod
func (tc *NoExecuteTaintManager) processPodOnNode(
podNamespacedName types.NamespacedName,
nodeName string,
tolerations []v1.Toleration,
taints []v1.Taint,
now time.Time,
) {
if len(taints) == 0 {
tc.cancelWorkWithEvent(podNamespacedName)
}
allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
if !allTolerated {
//不是容忍所有的污点,所以应该立即删除pod
klog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)
// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
//我们将取消预定的工作(如果有的话),因为我们将立即删除Pod
tc.cancelWorkWithEvent(podNamespacedName)
//将pod信息添加到污点驱逐队列
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
return
}
//获取到pod中NoExecute类型的tolerations中配置的tolerationSeconds时间
minTolerationTime := getMinTolerationTime(usedTolerations)
// getMinTolerationTime returns negative value to denote infinite toleration.
// getMinTolerationTime返回负值以表示一直容忍,不做处理
if minTolerationTime < 0 {
klog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())
return
}
//当前时间
startTime := now
//算出应该被删除的时间
triggerTime := startTime.Add(minTolerationTime)
//todo ???
scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
if scheduledEviction != nil {
startTime = scheduledEviction.CreatedAt
if startTime.Add(minTolerationTime).Before(triggerTime) {
return
}
tc.cancelWorkWithEvent(podNamespacedName)
}
//加入删除队列
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}
判断不是容忍所有的污点,所以应该立即删除pod,我们将取消预定的工作(如果有的话),因为我们将立即删除Pod。
获取到pod中NoExecute类型的tolerations中配置的tolerationSeconds时间;
getMinTolerationTime返回负值以表示一直容忍,不做处理;
算出应该被删除的时间;
加入删除队列;
_AddWork会创建一个协程,将工作添加到WorkerQueue中,该工作将在`fireAt之前执行删除逻辑:
// AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.
// AddWork将工作添加到WorkerQueue中,该工作将在`fireAt`之前执行
func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) {
key := args.KeyFromWorkArgs()
klog.V(4).Infof("Adding TimedWorkerQueue item %v at %v to be fired at %v", key, createdAt, fireAt)
q.Lock()
defer q.Unlock()
if _, exists := q.workers[key]; exists {
klog.Warningf("Trying to add already existing work for %+v. Skipping.", args)
return
}
worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key))
q.workers[key] = worker
}
CreateWorker创建一个TimedWorker,它将在不早于fireAt的情况下执行f:
// CreateWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
// CreateWorker创建一个TimedWorker,它将在不早于fireAt的情况下执行f
func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker {
delay := fireAt.Sub(createdAt)
if delay <= 0 {
go f(args)
return nil
}
timer := time.AfterFunc(delay, func() { f(args) })
return &TimedWorker{
WorkItem: args,
CreatedAt: createdAt,
FireAt: fireAt,
Timer: timer,
}
}
f函数是什么?就是上面创建NoExecuteTaintManager时注册的deletePodHandler(c, tm.emitPodDeletionEvent)函数。
会间隔10ms,共重试5次删除,删除失败将在下次pod更新时做处理。
//污点驱逐的核心删除逻辑
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
return func(args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
if emitEventFunc != nil {
emitEventFunc(args.NamespacedName)
}
var err error
for i := 0; i < retries; i++ {
err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{})
if err == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
return err
}
}
参考
最详细Kubernetes 1.12.0 Kube-controller-manager之node-lifecycle-controller源码阅读分析
https://my.oschina.net/u/3797264/blog/2885807
田飞雨kubernetes源码分析:
https://www.bookstack.cn/read/source-code-reading-notes/kubernetes-garbagecollector_controller.md#gc.resyncMonitors
kubernetes中node心跳处理逻辑分析:
https://www.cnblogs.com/gaorong/p/12312590.html