https://blog.csdn.net/screscent/article/details/51150263
https://blog.dreamfever.me/2021/07/03/kubernetes-yuan-ma-pou-xi-kubelet-podworker/
本片文章介绍 PodWorker 如何实现了一个 goroutine 池来管理 Pod
syncLoop
在kubelet中,有一个关键的函数syncLoop,其通过一个不会结束的循环来:
- 不断更新本地的Pod,使之达到期望状态,
- 并且将本地Pod(容器)的变化,同步给apiserver。
在这里有几个重要的参数:
plegCh : 一个只出不进的管道,从这里面我们可以取出一系列关于本地容器的事件
updates : 一个只出不进的管道,从这里面我们可以取出一系列要对Pod进行的操作
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {plegCh := kl.pleg.Watch()// ...省略for {// ... 省略// 执行主要逻辑,从不同的 channel 中接收消息并分发处理// syncTicker 是 duration 为 1 秒的 Ticker; housekeepingTicker 则是 2 秒if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break}}}
syncLoopIteration
所有的变化对应的任务都会在syncLoopIteration方法中进行分发。
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
switch u.Op {
case kubetypes.ADD:
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
handler.HandlePodRemoves(u.Pods)
// ... 省略其余case
}
case e := <-plegCh:
if e.Type == pleg.ContainerStarted {
// record the most recent time we observed a container start for this pod.
// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
// to make sure we don't miss handling graceful termination for containers we reported as having started.
kl.lastContainerStartedTime.Add(e.ID, time.Now())
}
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
// ...其余case省略
}
return true
}
在这里,我们主要讨论syncLoopIteration 对来自上文提到的 updates 以及 plegCh 信息的处理:
当从updates 中取出了信息,判断其是什么类型的事件,然后调用对应的方法:
如ADD事件对应的方法是HandlePodAdditions(新增Pod)
在所有对应的HandlePodXXX方法中,都会殊途同归得调用一个dispatchWork方法。
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
到目前为止,我们走完了条重要的路线,即:
获取事件、判断事件、分发事件
这里最关键的方法就是kl.podWorkers.UpdatePod
PodWorker
kubelet采用了使用协程去管理每个Pod,而管理这些协程的角色,就是PodWorker。
在UpdatePod方法中,会检查一个Pod有没有对应的协程,如果没有的话,就要进行创建,并记录在册。
Pod Goroutine 什么时候创建
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// ...
if pod没被创建 {
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
// ...
}
对于每一个 Pod,都会有一个 goroutine 负责管理,它所执行的函数是 managePodLoop。
podWorkers 的工作就是管理这些 goroutine 。对于一个新的 Pod,PodWorker 首先会创建一个 channel podUpdates,然后赋值到自己的 map p.podUpdates 中,这个 map 维护了所有 goroutine 的消息输入端, close 这个 channel 便可以使对应的 goroutine 退出。
UpdatePod 函数会被频繁调用,消息只有在 Pod 的 管理 goroutine 的 isWorking 条件为 false(即没在工作,空闲中) 的时候才会被直接传入 channel,否则会放到 lastUndeliveredWorkUpdate 中。这个变量维护了最近一个没有被处理的消息。SyncPodKill 类型的消息具有最高的优先级,如果当 lastUndeliveredWorkUpdate 中的消息为 SyncPodKill 时,之后任何的消息都不能覆盖掉它。
为什么这里需要 lastUndeliveredWorkUpdate 而不是消息都直接放入 channel 呢?因为 Kubernetes 是按照最终状态对齐的,比如我们这里连续的 3 个 SyncPodUpdate 消息过来,managePodLoop 是按照最新的 Spec 去更新的,所以会产生两次的重复更新。lastUndeliveredWorkUpdate 的作用便是缓冲并合并

Pod Goroutine 的核心工作
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
podUID := update.Pod.UID
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
// This is the legacy event thrown by manage pod loop
// all other events are now dispatched from syncPodFn
p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
return err
}
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
// notify the call-back function if the operation succeeded or not
if update.OnCompleteFunc != nil {
update.OnCompleteFunc(err)
}
if err != nil {
// IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
klog.ErrorS(err, "Error syncing pod, skipping", "pod", klog.KObj(update.Pod), "podUID", update.Pod.UID)
}
p.wrapUp(update.Pod.UID, err)
}
}
syncPodFn 是核心处理内容,它的实际实现在 kubelet.go 文件中
其在podworker创建中被赋值,在这里syncPodFn 被指定为syncPod函数
// kubelet 对象构造函数中
// ...
klet.podWorkers = newPodWorkers(
klet.syncPod,
// ...
)
在syncPod函数中,完成了对容器运行时接口的调用。
func (kl *Kubelet) syncPod(...) (isTerminal bool, err error) {
// ...
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff) result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
// ...
}
SyncPod 通过以下步骤将运行中的Pod同步至期望状态:
1. 计算sandbox和容器的变化
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create ephemeral containers.
// 6. Create init containers.
// 7. Create normal containers.
