https://blog.csdn.net/screscent/article/details/51150263

https://blog.dreamfever.me/2021/07/03/kubernetes-yuan-ma-pou-xi-kubelet-podworker/

本片文章介绍 PodWorker 如何实现了一个 goroutine 池来管理 Pod

syncLoop

在kubelet中,有一个关键的函数syncLoop,其通过一个不会结束的循环来:

  • 不断更新本地的Pod,使之达到期望状态,
  • 并且将本地Pod(容器)的变化,同步给apiserver。

在这里有几个重要的参数:
plegCh : 一个只出不进的管道,从这里面我们可以取出一系列关于本地容器的事件
updates : 一个只出不进的管道,从这里面我们可以取出一系列要对Pod进行的操作

  1. func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
  2. plegCh := kl.pleg.Watch()
  3. // ...省略
  4. for {
  5. // ... 省略
  6. // 执行主要逻辑,从不同的 channel 中接收消息并分发处理
  7. // syncTicker 是 duration 为 1 秒的 Ticker; housekeepingTicker 则是 2 秒
  8. if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
  9. break
  10. }
  11. }
  12. }

syncLoopIteration

所有的变化对应的任务都会在syncLoopIteration方法中进行分发。

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
        switch u.Op {
        case kubetypes.ADD:
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
            handler.HandlePodRemoves(u.Pods)
        // ... 省略其余case
        }
    case e := <-plegCh:
        if e.Type == pleg.ContainerStarted {
            // record the most recent time we observed a container start for this pod.
            // this lets us selectively invalidate the runtimeCache when processing a delete for this pod
            // to make sure we don't miss handling graceful termination for containers we reported as having started.
            kl.lastContainerStartedTime.Add(e.ID, time.Now())
        }
        if isSyncPodWorthy(e) {
            // PLEG event for a pod; sync it.
            if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
                klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
                handler.HandlePodSyncs([]*v1.Pod{pod})
            } else {
                // If the pod no longer exists, ignore the event.
                klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
            }
        }

        if e.Type == pleg.ContainerDied {
            if containerID, ok := e.Data.(string); ok {
                kl.cleanUpContainersInPod(e.ID, containerID)
            }
        }
     // ...其余case省略
    }
    return true
}

在这里,我们主要讨论syncLoopIteration 对来自上文提到的 updates 以及 plegCh 信息的处理:
当从updates 中取出了信息,判断其是什么类型的事件,然后调用对应的方法:
如ADD事件对应的方法是HandlePodAdditions(新增Pod)

在所有对应的HandlePodXXX方法中,都会殊途同归得调用一个dispatchWork方法。

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        StartTime:  start,
    })
    // Note the number of containers for new pods.
    if syncType == kubetypes.SyncPodCreate {
        metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    }
}

到目前为止,我们走完了条重要的路线,即:
获取事件、判断事件、分发事件
image.png
这里最关键的方法就是kl.podWorkers.UpdatePod

PodWorker

kubelet采用了使用协程去管理每个Pod,而管理这些协程的角色,就是PodWorker。
在UpdatePod方法中,会检查一个Pod有没有对应的协程,如果没有的话,就要进行创建,并记录在册。

Pod Goroutine 什么时候创建

func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
    // ...
    if pod没被创建 {
        go func() {
            defer runtime.HandleCrash()
            p.managePodLoop(podUpdates)
        }()
    }
    // ...
}

对于每一个 Pod,都会有一个 goroutine 负责管理,它所执行的函数是 managePodLoop。

podWorkers 的工作就是管理这些 goroutine 。对于一个新的 Pod,PodWorker 首先会创建一个 channel podUpdates,然后赋值到自己的 map p.podUpdates 中,这个 map 维护了所有 goroutine 的消息输入端, close 这个 channel 便可以使对应的 goroutine 退出。

UpdatePod 函数会被频繁调用,消息只有在 Pod 的 管理 goroutine 的 isWorking 条件为 false(即没在工作,空闲中) 的时候才会被直接传入 channel,否则会放到 lastUndeliveredWorkUpdate 中。这个变量维护了最近一个没有被处理的消息。SyncPodKill 类型的消息具有最高的优先级,如果当 lastUndeliveredWorkUpdate 中的消息为 SyncPodKill 时,之后任何的消息都不能覆盖掉它。

为什么这里需要 lastUndeliveredWorkUpdate 而不是消息都直接放入 channel 呢?因为 Kubernetes 是按照最终状态对齐的,比如我们这里连续的 3 个 SyncPodUpdate 消息过来,managePodLoop 是按照最新的 Spec 去更新的,所以会产生两次的重复更新。lastUndeliveredWorkUpdate 的作用便是缓冲并合并

image.png

Pod Goroutine 的核心工作

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {  
    var lastSyncTime time.Time
    for update := range podUpdates {
        err := func() error {
            podUID := update.Pod.UID
            // This is a blocking call that would return only if the cache
            // has an entry for the pod that is newer than minRuntimeCache
            // Time. This ensures the worker doesn't start syncing until
            // after the cache is at least newer than the finished time of
            // the previous sync.
            status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
            if err != nil {
                // This is the legacy event thrown by manage pod loop
                // all other events are now dispatched from syncPodFn
                p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
                return err
            }
            err = p.syncPodFn(syncPodOptions{
                mirrorPod:      update.MirrorPod,
                pod:            update.Pod,
                podStatus:      status,
                killPodOptions: update.KillPodOptions,
                updateType:     update.UpdateType,
            })
            lastSyncTime = time.Now()
            return err
        }()
        // notify the call-back function if the operation succeeded or not
        if update.OnCompleteFunc != nil {
            update.OnCompleteFunc(err)
        }
        if err != nil {
            // IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
            klog.ErrorS(err, "Error syncing pod, skipping", "pod", klog.KObj(update.Pod), "podUID", update.Pod.UID)
        }
        p.wrapUp(update.Pod.UID, err)
    }
}

syncPodFn 是核心处理内容,它的实际实现在 kubelet.go 文件中

其在podworker创建中被赋值,在这里syncPodFn 被指定为syncPod函数

// kubelet 对象构造函数中
// ...
klet.podWorkers = newPodWorkers(
        klet.syncPod,
        // ...
)

在syncPod函数中,完成了对容器运行时接口的调用。

func (kl *Kubelet) syncPod(...) (isTerminal bool, err error) {
    // ...
    result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)       result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
    // ...

}

SyncPod 通过以下步骤将运行中的Pod同步至期望状态:
1. 计算sandbox和容器的变化
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create ephemeral containers.
// 6. Create init containers.
// 7. Create normal containers.