解析从master节点上运行kubectl apply -f app.yaml命令后,整个kubelet创建并改变pod状态的过程

关键类

PodConfig

从配置源获取pod变更消息,整理过滤后,存入updates中

  • mux:负责创建channel接收从配置源中获取到的pod消息
  • pods:负责把mux创建的channel中的消息合并整理到updates中
  • updates:存储最终消息的channel,供其他方法消费
  • sources:配置源类型
  1. // pkg/kubelet/config/config.go
  2. // PodConfig是一个配置多路复用器,它将多个Pod配置源合并到一个一致的结构中,然后按顺序向侦听器传递增量更改通知。
  3. type PodConfig struct {
  4. pods *podStorage
  5. mux *config.Mux
  6. // the channel of denormalized changes passed to listeners
  7. updates chan kubetypes.PodUpdate
  8. // contains the list of all configured sources
  9. sourcesLock sync.Mutex
  10. sources sets.String
  11. checkpointManager checkpointmanager.CheckpointManager
  12. }

Mux

Mux通过调用merger合并来自多个源的配置的类。 pod变更消息 通过channel推送并发送到Merger.Merge()方法。

  • merger:即podStorage类,用来合并消息
  • sources:存放 不同配置源 和 存放对应配置源消息的channel
// pkg/util/config/config.go
// Mux通过调用merger合并来自多个源的配置的类。 pod变更消息 通过channel推送并发送到Merger.Merge()方法。
type Mux struct {
    // Invoked when an update is sent to a source.
    merger Merger

    // Sources and their lock.
    sourceLock sync.RWMutex
    // Maps source names to channels
    sources map[string]chan interface{}
}

podStorage

整理,过滤,合并,转化当前PodUpdate

  • pods:配置源 与 存储对应pod信息map 的map
  • updates:最终消息存放channel
// pkg/kubelet/config/config.go
// podStorage在任何时间点管理当前pod状态,并确保按顺序交付对通道的更新
type podStorage struct {
    podLock sync.RWMutex

    pods map[string]map[types.UID]*v1.Pod
    mode PodConfigNotificationMode

    // 确保在更新通道上按严格顺序交付更新
    updateLock sync.Mutex
    updates    chan<- kubetypes.PodUpdate

    // 包含已发送至少一个SET的所有来源的集合
    sourcesSeenLock sync.RWMutex
    sourcesSeen     sets.String

    // the EventRecorder to use
    recorder record.EventRecorder
}

PodUpdate

储存pod消息的最小单位

  • Pods:同样变更类型的Pod数组
  • Op:pod变更类型(ADD,SET,DELETE等)
  • Source:消息来源(file,url,apiserver)
// pkg/kubelet/types/pod_update.go
type PodUpdate struct {
    Pods   []*v1.Pod
    Op     PodOperation
    Source string
}

向PodConfig.updates中存入pod变更消息

UML

流程(只分析apiserver配置源的流程):

  1. NewMainKubelet()方法中调用makePodSourceConfig()构造PodConfig对象
  2. makePodSourceConfig()方法中
    1. 先调用config.NewPodConfig()方法初始化一个PodConfig对象
    2. 为PodConfig对象添加apiserver配置源
      1. 调用PodConfig.Channel()方法创建用于接收配置源消息的channel,并定时调用merger.Merge()方法把channel中的消息整理合并到PodConfig.updates中
      2. 调用config.NewSourceApiserver()创建一个从apiserver获取消息的监听器,并以PodConfig.Channel()方法创建的channel为储存消息的数据结构

pod状态变更过程解析 - 图2

关键代码分析

创建config.PodConfig用以监听并整合pod配置与变更

NewMainKubelet()

pkg/kubelet/kubelet.go 文件中的NewMainKubelet()方法会实例化一个新的Kubelet对象以及所有必需的内部模块,其中会调用同文件下的makePodSourceConfig()创建一个config.PodConfig并赋值给kubeDeps.PodConfig,config.PodConfig对象的作用是持有多个配置源提供的pod变更消息

// pkg/kubelet/kubelet.go
// 实例化一个新的Kubelet对象以及所有必需的内部模块
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    ...) (*Kubelet, error) {

    ...

    if kubeDeps.PodConfig == nil {
        var err error
        // 创建一个config.PodConfig赋值给kubeDeps.PodConfig
        kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
        if err != nil {
            return nil, err
        }
    }

  ...
    return klet, nil
}

makePodSourceConfig()

创建一个config.PodConfig,并配置3种监听方式(file,url,apiserver)

  • 调用config.NewPodConfig()方法,初始化一个config.PodConfig
  • 为config.PodConfig添加file配置源,从指定file读取pod配置,并监听后续变动
  • 为config.PodConfig添加file配置源,从指定URl读取pod配置,并监听后续变动
  • 为config.PodConfig添加apiserver配置源,从指定apiserver中读取pod配置,并监听后续变动
    • 通过调用PodConfig.Channel()方法创建一个channel用以存放从apiserver中监听到的消息,并定时把channel中消息合并到PodConfig.updates中
    • 调用config.NewSourceApiserver()方法创建一个从apiserver配置源获取消息的监听器,并以PodConfig.Channel()方法创建的channel为存储接收到消息的数据结构
// pkg/kubelet/kubelet.go
// 创建一个config.PodConfig并返回
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
    manifestURLHeader := make(http.Header)
    if len(kubeCfg.StaticPodURLHeader) > 0 {
        for k, v := range kubeCfg.StaticPodURLHeader {
            for i := range v {
                manifestURLHeader.Add(k, v[i])
            }
        }
    }

    // 创建config.PodConfig
    cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

    // 为config.PodConfig添加file配置源,从指定file读取pod配置,并监听后续变动
    if kubeCfg.StaticPodPath != "" {
        klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
        config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
    }

    // 为config.PodConfig添加file配置源,从指定URl读取pod配置,并监听后续变动
    if kubeCfg.StaticPodURL != "" {
        klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
        config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
    }

    // 定义一个用于存储消息的channel,供NewSourceApiserver方法存储监听到的pod消息
    var updatechannel chan<- interface{}
    if bootstrapCheckpointPath != "" {
        klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
        updatechannel = cfg.Channel(kubetypes.ApiserverSource)
        err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
        if err != nil {
            return nil, err
        }
    }

    // 为config.PodConfig添加apiserver配置源,从指定apiserver中读取pod配置,并监听后续变动
    if kubeDeps.KubeClient != nil {
        klog.Infof("Watching apiserver")
        if updatechannel == nil {
            // 创建一个channel
            updatechannel = cfg.Channel(kubetypes.ApiserverSource)
        }
        // 为config配置apiserver源
        config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
    }
    return cfg, nil
}

config.NewPodConfig()

初始化一个PodConfig

  • PodConfig中的2个组件PodStorage,Mux和PodConfig使用的是同一个channel,这个名为updates的channel就是用来存储最终整理过滤出的pod变更消息的
  • Mux的作用为创建出一些channel作为接收监听结果的中间消息缓存,把这些channel传递给PodStorage
  • PodStorage的作用为整理合并过滤mux传递过来的channel,并把整理后的消息放入PodConfig.updates中
// 创建一个PodConfig并返回
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig并返回{
    // 初始化一个channel供PodStorage,Mux,PodConfig使用
    updates := make(chan kubetypes.PodUpdate, 50)
    // 初始化一个PodStorage供PodConfig和Mux使用
    storage := newPodStorage(updates, mode, recorder)
    podConfig := &PodConfig{
        pods:    storage,
        // 初始化一个Mux
        mux:     config.NewMux(storage),
        updates: updates,
        sources: sets.String{},
    }
    return podConfig
}

func NewMux(merger Merger) *Mux {
    mux := &Mux{
        sources: make(map[string]chan interface{}),
        merger:  merger,
    }
    return mux
}

func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
    return &podStorage{
        pods:        make(map[string]map[types.UID]*v1.Pod),
        mode:        mode,
        updates:     updates,
        sourcesSeen: sets.String{},
        recorder:    recorder,
    }
}

往updates中存放从各种来源监听到的变化数据

PodConfig.Channel()

调用Mux.Channel()方法创建一个只能接收PodUpdates对象的channel

// pkg/kubelet/config/config.go
// 创建一个用于接收PodUpdates对象的channel
func (c *PodConfig) Channel(source string) chan<- interface{} {
    c.sourcesLock.Lock()
    defer c.sourcesLock.Unlock()
    c.sources.Insert(source)
    return c.mux.Channel(source)
}

Mux.Channel()

返回一个可用的channel

  • 从Mux.sources中取出source对应的channel返回
  • 若Mux.sources不存在source对应channel,则创建channel并定时调用Mux.listen()方法合并channel中的消息到PodConfig.updates中
// pkg/util/config/config.go
// 从Mux.sources中取出source对应的channel返回,若不存在则创建channel并调用Mux.listen()方法
func (m *Mux) Channel(source string) chan interface{} {
    if len(source) == 0 {
        panic("Channel given an empty name")
    }
    m.sourceLock.Lock()
    defer m.sourceLock.Unlock()

    channel, exists := m.sources[source]
    // 若source类型已存在,则取出source对应channel返回
    if exists {
        return channel
    }
    // 若不存在source对应channel,则新建一个channel
    newChannel := make(chan interface{})
    // 把新channel放入sources中,以供之后调用
    m.sources[source] = newChannel
    // 定时调用m.listen()方法,合并channel中的消息到PodConfig.updates中
    go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
    return newChannel
}

Mux.listen()

循环listenChannel,即若listenChannel中有消息就取出并调用podStorage.Merge()方法处理

// pkg/util/config/config.go
// 循环listenChannel,即若listenChannel中有消息就取出并调用podStorage.Merge()方法处理
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
    for update := range listenChannel {
        m.merger.Merge(source, update)
    }
}

podStorage.Merge()

合并将来自不同来源的一组传入更改标准化为所有Pod的映射,并确保过滤掉多余的更改,然后将零个或多个最小更新推送到更新通道podConfig.updates中,确保按顺序交付更新

  • 调用podStorage.merge()方法把传入的pod变更消息转化为不同类型的kubetypes.PodUpdate对象
  • 把各种类型的kubetypes.PodUpdate对象整理放入podConfig.updates中
// pkg/kubelet/config/config.go
// 合并并标准化来自不同配置源的pod变更消息,过滤多余的更改,然后将更改推送到podConfig.updates中,以确保变更顺序
func (s *podStorage) Merge(source string, change interface{}) error {
    s.updateLock.Lock()
    defer s.updateLock.Unlock()

    seenBefore := s.sourcesSeen.Has(source)
    // 调用podStorage.merge()方法把传入的pod变更消息整理转化为不同类型的kubetypes.PodUpdate对象
    adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
    firstSet := !seenBefore && s.sourcesSeen.Has(source)

    // 把各种类型的kubetypes.PodUpdate对象整理放入podConfig.updates中
    switch s.mode {
    case PodConfigNotificationIncremental:
        if len(removes.Pods) > 0 {
            s.updates <- *removes
        }
        if len(adds.Pods) > 0 {
            s.updates <- *adds
        }
        if len(updates.Pods) > 0 {
            s.updates <- *updates
        }
        if len(deletes.Pods) > 0 {
            s.updates <- *deletes
        }
        if len(restores.Pods) > 0 {
            s.updates <- *restores
        }
        if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
            // Send an empty update when first seeing the source and there are
            // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
            // the source is ready.
            s.updates <- *adds
        }
        // Only add reconcile support here, because kubelet doesn't support Snapshot update now.
        if len(reconciles.Pods) > 0 {
            s.updates <- *reconciles
        }

    case PodConfigNotificationSnapshotAndUpdates:
        if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
            s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
        }
        if len(updates.Pods) > 0 {
            s.updates <- *updates
        }
        if len(deletes.Pods) > 0 {
            s.updates <- *deletes
        }

    case PodConfigNotificationSnapshot:
        if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
            s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
        }

    case PodConfigNotificationUnknown:
        fallthrough
    default:
        panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
    }

    return nil
}

podStorage.merge()

把传入的pod变更消息整理成不通类型的kubetypes.PodUpdate对象

  • 根据不通的变更类型(ADD,UPDATE等),调用updatePodsFunc方法,整理成不同类型的kubetypes.PodUpdate数组
  • 把各个数组放入对应类型的kubetypes.PodUpdate对象中
// pkg/kubelet/config/config.go
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles, restores *kubetypes.PodUpdate) {
    s.podLock.Lock()
    defer s.podLock.Unlock()

    addPods := []*v1.Pod{}
    updatePods := []*v1.Pod{}
    deletePods := []*v1.Pod{}
    removePods := []*v1.Pod{}
    reconcilePods := []*v1.Pod{}
    restorePods := []*v1.Pod{}

    pods := s.pods[source]
    if pods == nil {
        pods = make(map[types.UID]*v1.Pod)
    }

    // updatePodFunc是用于使用新容器* newPods *更新容器缓存* oldPods *的本地函数。 更新后,新的Pod将存储在Pod缓存* pods *中。 请注意,* pods *和* oldPods *可能是同一缓存。
    updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
        filtered := filterInvalidPods(newPods, source, s.recorder)
        for _, ref := range filtered {
            // Annotate the pod with the source before any comparison.
            if ref.Annotations == nil {
                ref.Annotations = make(map[string]string)
            }
            ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
            if existing, found := oldPods[ref.UID]; found {
                pods[ref.UID] = existing
                needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
                if needUpdate {
                    updatePods = append(updatePods, existing)
                } else if needReconcile {
                    reconcilePods = append(reconcilePods, existing)
                } else if needGracefulDelete {
                    deletePods = append(deletePods, existing)
                }
                continue
            }
            recordFirstSeenTime(ref)
            pods[ref.UID] = ref
            addPods = append(addPods, ref)
        }
    }

    update := change.(kubetypes.PodUpdate)
    // 根据变更类型调用updatePodsFunc
    switch update.Op {
    case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
        if update.Op == kubetypes.ADD {
            klog.V(4).Infof("Adding new pods from source %s : %v", source, update.Pods)
        } else if update.Op == kubetypes.DELETE {
            klog.V(4).Infof("Graceful deleting pods from source %s : %v", source, update.Pods)
        } else {
            klog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods)
        }
        updatePodsFunc(update.Pods, pods, pods)

    case kubetypes.REMOVE:
        klog.V(4).Infof("Removing pods from source %s : %v", source, update.Pods)
        for _, value := range update.Pods {
            if existing, found := pods[value.UID]; found {
                // this is a delete
                delete(pods, value.UID)
                removePods = append(removePods, existing)
                continue
            }
            // this is a no-op
        }

    case kubetypes.SET:
        klog.V(4).Infof("Setting pods for source %s", source)
        s.markSourceSet(source)
        // Clear the old map entries by just creating a new map
        oldPods := pods
        pods = make(map[types.UID]*v1.Pod)
        updatePodsFunc(update.Pods, oldPods, pods)
        for uid, existing := range oldPods {
            if _, found := pods[uid]; !found {
                // this is a delete
                removePods = append(removePods, existing)
            }
        }
    case kubetypes.RESTORE:
        klog.V(4).Infof("Restoring pods for source %s", source)
        restorePods = append(restorePods, update.Pods...)

    default:
        klog.Warningf("Received invalid update type: %v", update)

    }

    s.pods[source] = pods
    // 把各个数组放入对应类型的kubetypes.PodUpdate对象中
    adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
    updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
    deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
    removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
    reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
    restores = &kubetypes.PodUpdate{Op: kubetypes.RESTORE, Pods: copyPods(restorePods), Source: source}

    return adds, updates, deletes, removes, reconciles, restores
}

创建用于接收apiserver消息的监听器

NewSourceApiserver()

初始化一个用于监听所有pod的ListerWatcher,调用newSourceApiserverFromLW()方法创建并启动apiserver监听器

// pkg/kubelet/config/apiserver.go
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
    newSourceApiserverFromLW(lw, updates)
}

newSourceApiserverFromLW()

创建一个Reflector,以send方法作为存储消息的方法,并启动Reflector,send方法即把所有接收到的消息都放入传入的channel中,即PodConfig.Channel()方法创建的channel

// pkg/kubelet/config/apiserver.go
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
    send := func(objs []interface{}) {
        var pods []*v1.Pod
        for _, o := range objs {
            pods = append(pods, o.(*v1.Pod))
        }
        updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
    }
    // Reflector即以lw作为监视器从apiserver获取消息,然后通过调用send方法处理消息
    // 这里就是调用send方法把消息存入updates中
    r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
    go r.Run(wait.NeverStop)
}

传递PodConfig.updates到被消费处

在RunKubelet()方法中

  • 调用createAndInitKubelet()传入kubeDeps—>kubelet.NewMainKubelet()传入kubeDeps—>makePodSourceConfig(),最终在makePodSourceConfig()方法中创建config.PodConfig并返回赋值给传入的kubeDeps.PodConfig,最终返回到createAndInitKubelet()
  • 在RunKubelet()方法的最后,调用startKubelet()方法时传入kubeDeps.PodConfig,在startKubelet()中调用Kubelet.Run()时传入kubeDeps.PodConfig.updates,实现updates通道的传递,供syncLoop消费

pod状态变更过程解析 - 图3

消费PodConfig.updates中的消息,并根据消息更新pod

UML

流程:

  1. Kubelet.Run()方法中执行Kubelet.syncLoop()方法,开始循环处理updates中存储的变更,同步pod当前状态与目标状态
  2. 层层调用后,最终在podWorkers.managePodLoop()方法中回调在初始化podWorkers时传入的Kubelet.syncPod()方法
  3. Kubelet.syncPod()方法中调用kubeGenericRuntimeManager.SyncPod()方法真正**开始同步pod的当前状态与目标状态**
    1. 调用kubeGenericRuntimeManager.computePodActions()计算当前状态和目标状态的不同,即sandbox和container的变化
    2. 如果sandbox发生变化,则调用kubeGenericRuntimeManager.killPodWithSyncResult()结束sandbox
    3. 若sandbox没发生变化,则处理container的变化,循环调用kubeGenericRuntimeManager.killContainer()结束所有不需要持有的container
    4. 若sandbox发生变化且已经被结束,则需要调用kubeGenericRuntimeManager.createPodSandbox()创建新的sandbox
    5. 在新建的sandbox上调用start()方法创建init containers
    6. 在新建的sandbox上调用start()方法创建containers
  4. 同步完成后返回

pod状态变更过程解析 - 图4

关键代码分析

循环处理updates中的变更消息

Kubelet.syncLoop

Kubelet.syncLoop()方法是处理从updates中获取的变更消息,根据变更同步pod的目标状态和运行状态,永不返回

// pkg/kubelet/kubelet.go
// Kubelet.syncLoop()方法是处理从updates中获取的变更消息,根据变更同步pod的目标状态和运行状态,永不返回
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    ...
    for {
        ...

        kl.syncLoopMonitor.Store(kl.clock.Now())

        // 调用Kubelet.syncLoopIteration(),处理updates中的消息
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
        kl.syncLoopMonitor.Store(kl.clock.Now())
    }
}

Kubelet.syncLoopIteration

从不通的channel中读取消息,并作出对应的操作

  • configCh: 存储pod变更消息的channel
  • syncCh: 存储同步pod状态消息的channel
  • housekeepingCh: 存储housekeeping消息的channel
  • plegCh: 存储PLEG消息的channel
// pkg/kubelet/kubelet.go
// 从channel中取出消息,并调用对应handler进行处理
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {

    // 处理pod变更消息
    case u, open := <-configCh:
        // Update from a config source; dispatch it to the right handler
        // callback.
        if !open {
            klog.Errorf("Update channel is closed. Exiting the sync loop.")
            return false
        }

        switch u.Op {
        case kubetypes.ADD: // 处理添加操作
            klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
            // 调用Kubelet.HandlePodAdditions()进行新增操作
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:// 处理更新操作
            klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:// 处理移除操作
            klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
            handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
            klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
            handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:// 处理删除操作
            klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
            // DELETE is treated as a UPDATE because of graceful deletion.
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.RESTORE:
            klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
            // These are pods restored from the checkpoint. Treat them as new
            // pods.
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.SET:
            // TODO: Do we want to support this?
            klog.Errorf("Kubelet does not support snapshot update")
        }

        if u.Op != kubetypes.RESTORE {
            kl.sourcesReady.AddSource(u.Source)
        }

    // 处理PLEG消息
    case e := <-plegCh:
        if isSyncPodWorthy(e) {
            // PLEG event for a pod; sync it.
            if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
                klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
                handler.HandlePodSyncs([]*v1.Pod{pod})
            } else {
                // If the pod no longer exists, ignore the event.
                klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
            }
        }

        if e.Type == pleg.ContainerDied {
            if containerID, ok := e.Data.(string); ok {
                kl.cleanUpContainersInPod(e.ID, containerID)
            }
        }

    // 处理同步消息,即同步所有需要同步的pod,相当于和apiserver同步一次数据
    case <-syncCh:
        // Sync pods waiting for sync
        podsToSync := kl.getPodsToSync()
        if len(podsToSync) == 0 {
            break
        }
        klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
        handler.HandlePodSyncs(podsToSync)

    // 触发Pod的清理    
    case update := <-kl.livenessManager.Updates():
        if update.Result == proberesults.Failure {
            // The liveness manager detected a failure; sync the pod.

            // We should not use the pod from livenessManager, because it is never updated after
            // initialization.
            pod, ok := kl.podManager.GetPodByUID(update.PodUID)
            if !ok {
                // If the pod no longer exists, ignore the update.
                klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
                break
            }
            klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
            handler.HandlePodSyncs([]*v1.Pod{pod})
        }

    // 同步失败的Pod,或其中一个或多个容器的活动检查失败的容器    
    case <-housekeepingCh:
        if !kl.sourcesReady.AllReady() {
            // If the sources aren't ready or volume manager has not yet synced the states,
            // skip housekeeping, as we may accidentally delete pods from unready sources.
            klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
        } else {
            klog.V(4).Infof("SyncLoop (housekeeping)")
            if err := handler.HandlePodCleanups(); err != nil {
                klog.Errorf("Failed cleaning pods: %v", err)
            }
        }
    }
    return true
}

Kubelet.HandlePodAdditions

对pod数组进行新增操作

// pkg/kubelet/kubelet.go
// 对pod数组进行新增操作
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    start := kl.clock.Now()
    sort.Sort(sliceutils.PodsByCreationTime(pods))
    for _, pod := range pods {
        existingPods := kl.podManager.GetPods()

        kl.podManager.AddPod(pod)

        if kubetypes.IsMirrorPod(pod) {
            kl.handleMirrorPod(pod, start)
            continue
        }

        if !kl.podIsTerminated(pod) {

            activePods := kl.filterOutTerminatedPods(existingPods)

            // Check if we can admit the pod; if not, reject it.
            if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
                kl.rejectPod(pod, reason, message)
                continue
            }
        }
        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        // 调用Kubelet.dispatchWork()对单个pod进行操作
        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
        kl.probeManager.AddPod(pod)
    }
}

Kubelet.dispatchWork

调用podWorkers对pod进行变更处理

// pkg/kubelet/kubelet.go
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    ...
    // 调用podWorkers.UpdatePod()处理pod
    kl.podWorkers.UpdatePod(&UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        OnCompleteFunc: func(err error) {
            if err != nil {
                metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
            }
        },
    })
    // Note the number of containers for new pods.
    if syncType == kubetypes.SyncPodCreate {
        metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    }
}

podWorkers.UpdatePod

将新的配置更新到指定pod中

// pkg/kubelet/pod_workers.go
// 将新的配置更新到指定pod中
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
    pod := options.Pod
    uid := pod.UID
    var podUpdates chan UpdatePodOptions
    var exists bool

    p.podLock.Lock()
    defer p.podLock.Unlock()
    if podUpdates, exists = p.podUpdates[uid]; !exists {
        podUpdates = make(chan UpdatePodOptions, 1)
        p.podUpdates[uid] = podUpdates

        // 启动一个新的线程,调用podWorkers.managePodLoop()方法操作pod
        go func() {
            defer runtime.HandleCrash()
            p.managePodLoop(podUpdates)
        }()
    }
    if !p.isWorking[pod.UID] {
        p.isWorking[pod.UID] = true
        podUpdates <- *options
    } else {
        // if a request to kill a pod is pending, we do not let anything overwrite that request.
        update, found := p.lastUndeliveredWorkUpdate[pod.UID]
        if !found || update.UpdateType != kubetypes.SyncPodKill {
            p.lastUndeliveredWorkUpdate[pod.UID] = *options
        }
    }
}

podWorkers.managePodLoop

调用podWorkers.syncPodFn()方法,即初始化podWorkers时传入的Kubelet.syncPod()方法

// pkg/kubelet/pod_workers.go
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
    var lastSyncTime time.Time
    for update := range podUpdates {
        err := func() error {
            ...
            // 调用podWorkers.syncPodFn()方法,即初始化podWorkers时传入的Kubelet.syncPod()方法
            err = p.syncPodFn(syncPodOptions{
                mirrorPod:      update.MirrorPod,
                pod:            update.Pod,
                podStatus:      status,
                killPodOptions: update.KillPodOptions,
                updateType:     update.UpdateType,
            })
            lastSyncTime = time.Now()
            return err
        }()

        ...
    }
}

Kubelet.syncPod

用于同步单个pod

  • 如果正在创建pod,请记录pod worker启动延迟
  • 调用generateAPIPodStatus为Pod准备v1.PodStatus
  • 如果Pod首次运行,记录Pod的启动延迟
  • 在statusManager中更新pod的状态
  • 结束不是running状态的pod
  • 如果该Pod是静态Pod,并且还没有镜像Pod,则创建一个Mirror Pod
  • 如果不存在,则为容器创建数据目录
  • 等待卷连接/挂载
  • 为pod添加secrets
  • 调用容器运行时的SyncPod回调,即调用kubeGenericRuntimeManager.SyncPod()方法操作pod
  • 更新广告连播的入站和出站限制的流量调整
//
//
func (kl *Kubelet) syncPod(o syncPodOptions) error {
    ...

    // 如果正在创建pod,请记录pod worker启动延迟
    if updateType == kubetypes.SyncPodCreate {
        if !firstSeenTime.IsZero() {
            // This is the first time we are syncing the pod. Record the latency
            // since kubelet first saw the pod if firstSeenTime is set.
            metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
        } else {
            klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
        }
    }

    // 调用generateAPIPodStatus为Pod准备v1.PodStatus
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
    // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
    // set pod IP to hostIP directly in runtime.GetPodStatus
    podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
    for _, ipInfo := range apiPodStatus.PodIPs {
        podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
    }

    if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
        podStatus.IPs = []string{apiPodStatus.PodIP}
    }

    // 如果Pod首次运行,记录Pod的启动延迟
    existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
    if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
        !firstSeenTime.IsZero() {
        metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
    }

    runnable := kl.canRunPod(pod)
    if !runnable.Admit {
        // Pod is not runnable; update the Pod and Container statuses to why.
        apiPodStatus.Reason = runnable.Reason
        apiPodStatus.Message = runnable.Message
        // Waiting containers are not creating.
        const waitingReason = "Blocked"
        for _, cs := range apiPodStatus.InitContainerStatuses {
            if cs.State.Waiting != nil {
                cs.State.Waiting.Reason = waitingReason
            }
        }
        for _, cs := range apiPodStatus.ContainerStatuses {
            if cs.State.Waiting != nil {
                cs.State.Waiting.Reason = waitingReason
            }
        }
    }

    // 在statusManager中更新pod的状态
    kl.statusManager.SetPodStatus(pod, apiPodStatus)

    // 结束不是running状态的pod
    if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
        var syncErr error
        if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
            kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
            syncErr = fmt.Errorf("error killing pod: %v", err)
            utilruntime.HandleError(syncErr)
        } else {
            if !runnable.Admit {
                // There was no error killing the pod, but the pod cannot be run.
                // Return an error to signal that the sync loop should back off.
                syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
            }
        }
        return syncErr
    }
    ...

    // 如果该Pod是静态Pod,并且还没有镜像Pod,则创建一个Mirror Pod
    if kubetypes.IsStaticPod(pod) {
        podFullName := kubecontainer.GetPodFullName(pod)
        deleted := false
        if mirrorPod != nil {
            if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
                // The mirror pod is semantically different from the static pod. Remove
                // it. The mirror pod will get recreated later.
                klog.Infof("Trying to delete pod %s %v", podFullName, mirrorPod.ObjectMeta.UID)
                var err error
                deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
                if deleted {
                    klog.Warningf("Deleted mirror pod %q because it is outdated", format.Pod(mirrorPod))
                } else if err != nil {
                    klog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
                }
            }
        }
        if mirrorPod == nil || deleted {
            node, err := kl.GetNode()
            if err != nil || node.DeletionTimestamp != nil {
                klog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)
            } else {
                klog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
                if err := kl.podManager.CreateMirrorPod(pod); err != nil {
                    klog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
                }
            }
        }
    }

    // 如果不存在,则为容器创建数据目录
    if err := kl.makePodDataDirs(pod); err != nil {
        kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
        klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
        return err
    }


    if !kl.podIsTerminated(pod) {
        // 等待卷连接/挂载
        if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
            kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
            klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
            return err
        }
    }

    // 为pod添加secrets
    pullSecrets := kl.getPullSecretsForPod(pod)

    // 调用容器运行时的SyncPod回调,即调用kubeGenericRuntimeManager.SyncPod()方法操作pod
    result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)

    ...

    return nil
}

根据变更消息同步pod状态

kubeGenericRuntimeManager.SyncPod

SyncPod通过执行以下步骤将正在运行的Pod同步至目标状态

  1. 调用kubeGenericRuntimeManager.computePodActions()计算sandbox和container的变化,返回podContainerChanges
  2. 如果sandbox发生变化则调用kubeGenericRuntimeManager.killPodWithSyncResult()结束sandbox
  3. 调用kubeGenericRuntimeManager.killContainer()结束所有pod中不需要持有的container
  4. 如有必要,调用kubeGenericRuntimeManager.createPodSandbox()创建新的sandbox
  5. 调用start()创建临时容器
// pkg/kubelet/kuberuntime/kuberuntime_manager.go
// SyncPod通过执行以下步骤将正在运行的Pod同步成目标状态
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {

    // Step 1:调用kubeGenericRuntimeManager.computePodActions()计算sandbox和container的变化,返回podContainerChanges
    podContainerChanges := m.computePodActions(pod, podStatus)
    ...

    // Step 2: 如果sandbox发生变化则调用kubeGenericRuntimeManager.killPodWithSyncResult()结束sandbox
    if podContainerChanges.KillPod {
        ...
        // 结束pod sandbox
        killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
        result.AddPodSyncResult(killResult)
        if killResult.Error() != nil {
            klog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
            return
        }

        if podContainerChanges.CreateSandbox {
            m.purgeInitContainers(pod, podStatus)
        }
    } else {
        // Step 3: 调用kubeGenericRuntimeManager.killContainer()结束所有pod中不需要持有的container
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
            killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
            result.AddSyncResult(killContainerResult)
            if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
                killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
                klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
                return
            }
        }
    }

    ...

    // Step 4: 如有必要,调用kubeGenericRuntimeManager.createPodSandbox()创建新的sandbox
    podSandboxID := podContainerChanges.SandboxID
    if podContainerChanges.CreateSandbox {
        ...
        //调用kubeGenericRuntimeManager.createPodSandbox()创建新的sandbox
        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
        ...
    }

    ...

    // start 方法用于创建不同类型的container
    start := func(typeName string, spec *startSpec) error {
        startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
        result.AddSyncResult(startContainerResult)

        isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
        if isInBackOff {
            startContainerResult.Fail(err, msg)
            klog.V(4).Infof("Backing Off restarting %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
            return err
        }

        klog.V(4).Infof("Creating %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
        // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
            startContainerResult.Fail(err, msg)
            // known errors that are logged in other places are logged at higher levels here to avoid
            // repetitive log spam
            switch {
            case err == images.ErrImagePullBackOff:
                klog.V(3).Infof("%v start failed: %v: %s", typeName, err, msg)
            default:
                utilruntime.HandleError(fmt.Errorf("%v start failed: %v: %s", typeName, err, msg))
            }
            return err
        }

        return nil
    }

    // Step 5: 调用start()创建临时容器
    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
        for _, idx := range podContainerChanges.EphemeralContainersToStart {
            start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
        }
    }

    // Step 6: 调用start()创建init容器
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        if err := start("init container", containerStartSpec(container)); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
    }

    // Step 7: 调用start()创建正常容器
    for _, idx := range podContainerChanges.ContainersToStart {
        start("container", containerStartSpec(&pod.Spec.Containers[idx]))
    }

    return
}

为pod创建sandbox

kubeGenericRuntimeManager.createPodSandbox

调用RemoteRuntimeService.RunSandbox()创建sandbox

// pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
// 创建一个pod sandbox并返回podSandBoxID, message, error
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {

    ...

    // 调用dockerService.RunSandbox创建sandbox
    podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)    

    ...

    return podSandBoxID, "", nil
}

RemoteRuntimeService.RunPodSandbox

调用dockerService.RunPodSandbox()并传入配置参数

// pkg/kubelet/remote/remote_runtime.go
// 创建并运行一个pod级别的sandbox,确保其处于就绪状态
func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {

    ...

    //调用dockerService.RunPodSandbox并传入配置参数
    resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
        Config:         config,
        RuntimeHandler: runtimeHandler,
    })

    ...

    return resp.PodSandboxId, nil
}

dockerService.RunPodSandbox

// pkg/kubelet/dockershim/docker_sandbox.go
// 创建一个包含network namespace 的container
func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
    config := r.GetConfig()


    ...
    // 1,拉取镜像
    if err := ensureSandboxImageExists(ds.client, image); err != nil {
        return nil, err
    }

    ...

    // 2,调用kubeDockerClient.CreateContainer创建sandbox
    createResp, err := ds.client.CreateContainer(*createConfig)
    if err != nil {
        createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err)
    }

    ...

    // 3, 创建checkpoint
    if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
        return nil, err
    }

    // 4, 启动sandbox
    err = ds.client.StartContainer(createResp.ID)
    if err != nil {
        return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)
    }

    ...

    // 5, 为sandbox配置network,所有的Pod网络都是由启动时发现的CNI插件设置的。
        cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
    ...
    err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)

    ...

    return resp, nil
}

为pod创建container

start

start()主要功能是创建并启动不同类型的container,start()是在kubeGenericRuntimeManager.SyncPod()方法中声明的一个临时方法,只能在kubeGenericRuntimeManager.SyncPod()方法中被调用,

// pkg/kubelet/kuberuntime/kuberuntime_manager.go
// start方法是在kubeGenericRuntimeManager.SyncPod()方法中声明的一个临时方法,只能在kubeGenericRuntimeManager.SyncPod()方法中被调用
start := func(typeName string, spec *startSpec) error {
        startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
        result.AddSyncResult(startContainerResult)

        // 若处在backoff状态,则创建失败
        isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
        if isInBackOff {
            startContainerResult.Fail(err, msg)
            klog.V(4).Infof("Backing Off restarting %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
            return err
        }

        klog.V(4).Infof("Creating %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
        // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
        // 开始创建container
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
            startContainerResult.Fail(err, msg)
            // known errors that are logged in other places are logged at higher levels here to avoid
            // repetitive log spam
            switch {
            case err == images.ErrImagePullBackOff:
                klog.V(3).Infof("%v start failed: %v: %s", typeName, err, msg)
            default:
                utilruntime.HandleError(fmt.Errorf("%v start failed: %v: %s", typeName, err, msg))
            }
            return err
        }

        return nil
    }

kubeGenericRuntimeManager.startContainer

启动container并返回容器状态

// pkg/kubelet/kuberuntime/kuberuntime_container.go
// 启动container并返回容器状态
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
    container := spec.container

    // Step 1: 拉取镜像
    imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
    if err != nil {
        s, _ := grpcstatus.FromError(err)
        m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
        return msg, err
    }

    // Step 2: 创建镜像
    ...
    containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    ...

    // Step 3: 启动镜像
    err = m.runtimeService.StartContainer(containerID)
    ...

    // Step 4: 执行启动hook
    if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
        kubeContainerID := kubecontainer.ContainerID{
            Type: m.runtimeName,
            ID:   containerID,
        }
        msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
        if handlerErr != nil {
            m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
            if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
                klog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
                    container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
            }
            return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)
        }
    }

    return "", nil
}

RemoteRuntimeService.CreateContainer

在指定的podSandbox中创建container

// pkg/kubelet/remote/remote_runtime.go
// 在指定的podSandbox中创建container
func (r *RemoteRuntimeService) CreateContainer(podSandBoxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
    ctx, cancel := getContextWithTimeout(r.timeout)
    defer cancel()

    // 调用dockerService.CreateContainer()创建container
    resp, err := r.runtimeClient.CreateContainer(ctx, &runtimeapi.CreateContainerRequest{
        PodSandboxId:  podSandBoxID,
        Config:        config,
        SandboxConfig: sandboxConfig,
    })
    if err != nil {
        klog.Errorf("CreateContainer in sandbox %q from runtime service failed: %v", podSandBoxID, err)
        return "", err
    }

    if resp.ContainerId == "" {
        errorMessage := fmt.Sprintf("ContainerId is not set for container %q", config.GetMetadata())
        klog.Errorf("CreateContainer failed: %s", errorMessage)
        return "", errors.New(errorMessage)
    }

    return resp.ContainerId, nil
}

RemoteRuntimeService.StartContainer

启动指定的container

// 启动指定的container
func (r *RemoteRuntimeService) StartContainer(containerID string) error {
    ctx, cancel := getContextWithTimeout(r.timeout)
    defer cancel()

    //
    _, err := r.runtimeClient.StartContainer(ctx, &runtimeapi.StartContainerRequest{
        ContainerId: containerID,
    })
    if err != nil {
        klog.Errorf("StartContainer %q from runtime service failed: %v", containerID, err)
        return err
    }

    return nil
}

dockerService.CreateContainer

在指定podSandbox中创建container

// pkg/kubelet/dockershim/docker_container.go
// 在指定podSandbox中创建container
func (ds *dockerService) CreateContainer(_ context.Context, r *runtimeapi.CreateContainerRequest) (*runtimeapi.CreateContainerResponse, error) {
    ...

    // 调用kubeDockerClient.CreateContainer创建container
    createResp, createErr := ds.client.CreateContainer(createConfig)

    ...
    return nil, createErr
}

dockerService.StartContainer

启动指定container

// pkg/kubelet/dockershim/docker_container.go
// 启动指定container
func (ds *dockerService) StartContainer(_ context.Context, r *runtimeapi.StartContainerRequest) (*runtimeapi.StartContainerResponse, error) {
    // 调用kubeDockerClient.StartContainer()方法启动container
    err := ds.client.StartContainer(r.ContainerId)

    // 为所有容器(包括失败的容器)创建容器日志符号链接。
    if linkError := ds.createContainerLogSymlink(r.ContainerId); linkError != nil {
        // Do not stop the container if we failed to create symlink because:
        //   1. This is not a critical failure.
        //   2. We don't have enough information to properly stop container here.
        // Kubelet will surface this error to user via an event.
        return nil, linkError
    }

    if err != nil {
        err = transformStartContainerError(err)
        return nil, fmt.Errorf("failed to start container %q: %v", r.ContainerId, err)
    }

    return &runtimeapi.StartContainerResponse{}, nil
}

底层公用方法

kubeDockerClient.CreateContainer

调用github.com/docker/docker/client包下的Client对象,向docker服务发送post请求创建container

// pkg/kubelet/dockershim/libdocker/kube_docker_client.go
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) {
    ctx, cancel := d.getTimeoutContext()
    defer cancel()
    // we provide an explicit default shm size as to not depend on docker daemon.
    // TODO: evaluate exposing this as a knob in the API
    if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
        opts.HostConfig.ShmSize = defaultShmSize
    }
    // 调用dokcer.client向docker服务发送创建请求
    createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
    if ctxErr := contextError(ctx); ctxErr != nil {
        return nil, ctxErr
    }
    if err != nil {
        return nil, err
    }
    return &createResp, nil
}

// github.com/docker/docker/client包下的代码
func (cli *Client) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, containerName string) (container.ContainerCreateCreatedBody, error) {
    ...

    // 发送post请求,创建container
    serverResp, err := cli.post(ctx, "/containers/create", query, body, nil)
    defer ensureReaderClosed(serverResp)
    if err != nil {
        return response, err
    }

    err = json.NewDecoder(serverResp.body).Decode(&response)
    return response, err
}

kubeDockerClient.StartContainer

调用github.com/docker/docker/client包下的Client对象,向docker服务发送post请求启动container

// pkg/kubelet/dockershim/libdocker/kube_docker_client.go
func (d *kubeDockerClient) StartContainer(id string) error {
    ctx, cancel := d.getTimeoutContext()
    defer cancel()

    // 调用dokcer.client向docker服务发送启动请求
    err := d.client.ContainerStart(ctx, id, dockertypes.ContainerStartOptions{})
    if ctxErr := contextError(ctx); ctxErr != nil {
        return ctxErr
    }
    return err
}

// github.com/docker/docker/client包下的代码
func (cli *Client) ContainerStart(ctx context.Context, containerID string, options types.ContainerStartOptions) error {
    query := url.Values{}
    if len(options.CheckpointID) != 0 {
        query.Set("checkpoint", options.CheckpointID)
    }
    if len(options.CheckpointDir) != 0 {
        query.Set("checkpoint-dir", options.CheckpointDir)
    }

    // 发送post请求,启动指定container
    resp, err := cli.post(ctx, "/containers/"+containerID+"/start", query, nil, nil)
    ensureReaderClosed(resp)
    return err
}

PLEG(本地状态监控与回写)

UML

pod状态变更过程解析 - 图5

关键代码分析

初始化PLEG

func NewMainKubelet(){
    ...
    // 初始化一个PLEG
    klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})     
    ...
}

启动PLEG,监控pod状态并把差异写入GenericPLEG.eventChannel中

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    // Start the pod lifecycle event generator.
    kl.pleg.Start()        
}
// Start spawns a goroutine to relist periodically.
func (g *GenericPLEG) Start() {
    go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}
// 比较新旧pod状态,生成事件
func (g *GenericPLEG) relist() {
    timestamp := g.clock.Now()

    // 从apiserver获取pod列表
    podList, err := g.runtime.GetPods(true)

    g.updateRelistTime(timestamp)

    pods := kubecontainer.Pods(podList)
    // 把从apiserver获取到的pod放入podRecords中作为新pod
    g.podRecords.setCurrent(pods)

    // Compare the old and the current pods, and generate events.
    eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
    for pid := range g.podRecords {
        // 取出podRecords中的旧pod
        oldPod := g.podRecords.getOld(pid)
        pod := g.podRecords.getCurrent(pid)
        // Get all containers in the old and the new pod.
        allContainers := getContainersFromPods(oldPod, pod)
        for _, container := range allContainers {
            events := computeEvents(oldPod, pod, &container.ID)
            for _, e := range events {
                updateEvents(eventsByPodID, e)
            }
        }
    }

    var needsReinspection map[types.UID]*kubecontainer.Pod
    if g.cacheEnabled() {
        needsReinspection = make(map[types.UID]*kubecontainer.Pod)
    }

    // If there are events associated with a pod, we should update the
    // podCache.
    for pid, events := range eventsByPodID {
        pod := g.podRecords.getCurrent(pid)
        if g.cacheEnabled() {
            g.updateCache(pod, pid)
        }

        // Update the internal storage and send out the events.
        g.podRecords.update(pid)
        for i := range events {
            // Filter out events that are not reliable and no other components use yet.
            if events[i].Type == ContainerChanged {
                continue
            }
            g.eventChannel <- events[i]
        }
    }
}

消费GenericPLEG.eventChannel中的消息,转化写入消息到manager.podStatusChannel中

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    ...
    plegCh := kl.pleg.Watch()
    kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
    ...
}

// * plegCh: update the runtime cache; sync pod
func (kl *Kubelet) syncLoopIteration(){
    ...
    select {
        case e := <-plegCh:
        if isSyncPodWorthy(e) {
            // PLEG event for a pod; sync it.
            if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
                handler.HandlePodSyncs([]*v1.Pod{pod})
            } 
        }

        if e.Type == pleg.ContainerDied {
            if containerID, ok := e.Data.(string); ok {
                kl.cleanUpContainersInPod(e.ID, containerID)
            }
        }
    }
    ...
}

后续最终调用kubelet.syncPod()方法

func (kl *Kubelet) syncPod(o syncPodOptions) error {
    ...

    // 往statusManager中更新pod状态
    kl.statusManager.SetPodStatus(pod, apiPodStatus)

    ...

    return nil
}
func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
    m.podStatusesLock.Lock()
    defer m.podStatusesLock.Unlock()
    ...
    m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil)
}

func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {
    ...

    select {
    // 生成消息放入podStatusChannel中
    case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
        klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
            pod.UID, newStatus.version, newStatus.status)
        return true
    default:
        // Let the periodic syncBatch handle the update if the channel is full.
        // We can't block, since we hold the mutex lock.
        klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
            format.Pod(pod), status)
        return false
    }
}

消费manager.podStatusChannel中的消息,调用kubeClient向apiserver中回写pod状态

func (m *manager) Start() {
    ...
    go wait.Forever(func() {
        for {
            select {
            // 消费podStatusChannel中的消息,向apiserver回写pod状态
            case syncRequest := <-m.podStatusChannel:
                klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
                    syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
                m.syncPod(syncRequest.podUID, syncRequest.status)
            case <-syncTicker:
                klog.V(5).Infof("Status Manager: syncing batch")
                // remove any entries in the status channel since the batch will handle them
                for i := len(m.podStatusChannel); i > 0; i-- {
                    <-m.podStatusChannel
                }
                m.syncBatch()
            }
        }
    }, 0)
}

// 向apiserver回写pod状态
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
    if !m.needsUpdate(uid, status) {
        klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
        return
    }

    ...
    // 向apiserver回写状态
    newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
    ...
}



流程总图

从apiserver中读取pod消息,并存入updates中

  1. PodConfig利用Mux创建updatechannel
  2. PodConfig通过Reflector机制,从apiserver中同步pod消息,并使用Mux创建的updatechannel来接收Reflector同步的消息
  3. 循环从updatechannel中取出PodUpdate,调用podStorage.Merge()方法把消息整理同步到updates中

传入updates.jpeg

从updates中消费消息,并根据消息变更pod状态

PLEG过程

参考文章

https://cloud.tencent.com/developer/article/1492108
https://www.jianshu.com/p/eec91fa9d57a