解析从master节点上运行
kubectl apply -f app.yaml
命令后,整个kubelet创建并改变pod状态的过程
关键类
PodConfig
从配置源获取pod变更消息,整理过滤后,存入updates中
- mux:负责创建channel接收从配置源中获取到的pod消息
- pods:负责把mux创建的channel中的消息合并整理到updates中
- updates:存储最终消息的channel,供其他方法消费
- sources:配置源类型
// pkg/kubelet/config/config.go
// PodConfig是一个配置多路复用器,它将多个Pod配置源合并到一个一致的结构中,然后按顺序向侦听器传递增量更改通知。
type PodConfig struct {
pods *podStorage
mux *config.Mux
// the channel of denormalized changes passed to listeners
updates chan kubetypes.PodUpdate
// contains the list of all configured sources
sourcesLock sync.Mutex
sources sets.String
checkpointManager checkpointmanager.CheckpointManager
}
Mux
Mux通过调用merger合并来自多个源的配置的类。 pod变更消息 通过channel推送并发送到Merger.Merge()方法。
- merger:即podStorage类,用来合并消息
- sources:存放 不同配置源 和 存放对应配置源消息的channel
// pkg/util/config/config.go
// Mux通过调用merger合并来自多个源的配置的类。 pod变更消息 通过channel推送并发送到Merger.Merge()方法。
type Mux struct {
// Invoked when an update is sent to a source.
merger Merger
// Sources and their lock.
sourceLock sync.RWMutex
// Maps source names to channels
sources map[string]chan interface{}
}
podStorage
整理,过滤,合并,转化当前PodUpdate
- pods:配置源 与 存储对应pod信息map 的map
- updates:最终消息存放channel
// pkg/kubelet/config/config.go
// podStorage在任何时间点管理当前pod状态,并确保按顺序交付对通道的更新
type podStorage struct {
podLock sync.RWMutex
pods map[string]map[types.UID]*v1.Pod
mode PodConfigNotificationMode
// 确保在更新通道上按严格顺序交付更新
updateLock sync.Mutex
updates chan<- kubetypes.PodUpdate
// 包含已发送至少一个SET的所有来源的集合
sourcesSeenLock sync.RWMutex
sourcesSeen sets.String
// the EventRecorder to use
recorder record.EventRecorder
}
PodUpdate
储存pod消息的最小单位
- Pods:同样变更类型的Pod数组
- Op:pod变更类型(ADD,SET,DELETE等)
- Source:消息来源(file,url,apiserver)
// pkg/kubelet/types/pod_update.go
type PodUpdate struct {
Pods []*v1.Pod
Op PodOperation
Source string
}
向PodConfig.updates中存入pod变更消息
UML
流程(只分析apiserver配置源的流程):
- 在NewMainKubelet()方法中调用makePodSourceConfig()构造PodConfig对象
- 在makePodSourceConfig()方法中
- 先调用config.NewPodConfig()方法初始化一个PodConfig对象
- 为PodConfig对象添加apiserver配置源
- 调用PodConfig.Channel()方法创建用于接收配置源消息的channel,并定时调用merger.Merge()方法把channel中的消息整理合并到PodConfig.updates中
- 调用config.NewSourceApiserver()创建一个从apiserver获取消息的监听器,并以PodConfig.Channel()方法创建的channel为储存消息的数据结构
关键代码分析
创建config.PodConfig用以监听并整合pod配置与变更
NewMainKubelet()
pkg/kubelet/kubelet.go 文件中的NewMainKubelet()方法会实例化一个新的Kubelet对象以及所有必需的内部模块,其中会调用同文件下的makePodSourceConfig()创建一个config.PodConfig并赋值给kubeDeps.PodConfig,config.PodConfig对象的作用是持有多个配置源提供的pod变更消息
// pkg/kubelet/kubelet.go
// 实例化一个新的Kubelet对象以及所有必需的内部模块
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
...) (*Kubelet, error) {
...
if kubeDeps.PodConfig == nil {
var err error
// 创建一个config.PodConfig赋值给kubeDeps.PodConfig
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
if err != nil {
return nil, err
}
}
...
return klet, nil
}
makePodSourceConfig()
创建一个config.PodConfig,并配置3种监听方式(file,url,apiserver)
- 调用config.NewPodConfig()方法,初始化一个config.PodConfig
- 为config.PodConfig添加file配置源,从指定file读取pod配置,并监听后续变动
- 为config.PodConfig添加file配置源,从指定URl读取pod配置,并监听后续变动
- 为config.PodConfig添加apiserver配置源,从指定apiserver中读取pod配置,并监听后续变动
- 通过调用PodConfig.Channel()方法创建一个channel用以存放从apiserver中监听到的消息,并定时把channel中消息合并到PodConfig.updates中
- 调用config.NewSourceApiserver()方法创建一个从apiserver配置源获取消息的监听器,并以PodConfig.Channel()方法创建的channel为存储接收到消息的数据结构
// pkg/kubelet/kubelet.go
// 创建一个config.PodConfig并返回
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
for i := range v {
manifestURLHeader.Add(k, v[i])
}
}
}
// 创建config.PodConfig
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// 为config.PodConfig添加file配置源,从指定file读取pod配置,并监听后续变动
if kubeCfg.StaticPodPath != "" {
klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// 为config.PodConfig添加file配置源,从指定URl读取pod配置,并监听后续变动
if kubeCfg.StaticPodURL != "" {
klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
// 定义一个用于存储消息的channel,供NewSourceApiserver方法存储监听到的pod消息
var updatechannel chan<- interface{}
if bootstrapCheckpointPath != "" {
klog.Infof("Adding checkpoint path: %v", bootstrapCheckpointPath)
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
err := cfg.Restore(bootstrapCheckpointPath, updatechannel)
if err != nil {
return nil, err
}
}
// 为config.PodConfig添加apiserver配置源,从指定apiserver中读取pod配置,并监听后续变动
if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
if updatechannel == nil {
// 创建一个channel
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
// 为config配置apiserver源
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
}
return cfg, nil
}
config.NewPodConfig()
初始化一个PodConfig
- PodConfig中的2个组件PodStorage,Mux和PodConfig使用的是同一个channel,这个名为updates的channel就是用来存储最终整理过滤出的pod变更消息的
- Mux的作用为创建出一些channel作为接收监听结果的中间消息缓存,把这些channel传递给PodStorage
- PodStorage的作用为整理合并过滤mux传递过来的channel,并把整理后的消息放入PodConfig.updates中
// 创建一个PodConfig并返回
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig并返回{
// 初始化一个channel供PodStorage,Mux,PodConfig使用
updates := make(chan kubetypes.PodUpdate, 50)
// 初始化一个PodStorage供PodConfig和Mux使用
storage := newPodStorage(updates, mode, recorder)
podConfig := &PodConfig{
pods: storage,
// 初始化一个Mux
mux: config.NewMux(storage),
updates: updates,
sources: sets.String{},
}
return podConfig
}
func NewMux(merger Merger) *Mux {
mux := &Mux{
sources: make(map[string]chan interface{}),
merger: merger,
}
return mux
}
func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
return &podStorage{
pods: make(map[string]map[types.UID]*v1.Pod),
mode: mode,
updates: updates,
sourcesSeen: sets.String{},
recorder: recorder,
}
}
往updates中存放从各种来源监听到的变化数据
PodConfig.Channel()
调用Mux.Channel()方法创建一个只能接收PodUpdates对象的channel
// pkg/kubelet/config/config.go
// 创建一个用于接收PodUpdates对象的channel
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
}
Mux.Channel()
返回一个可用的channel
- 从Mux.sources中取出source对应的channel返回
- 若Mux.sources不存在source对应channel,则创建channel并定时调用Mux.listen()方法合并channel中的消息到PodConfig.updates中
// pkg/util/config/config.go
// 从Mux.sources中取出source对应的channel返回,若不存在则创建channel并调用Mux.listen()方法
func (m *Mux) Channel(source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
// 若source类型已存在,则取出source对应channel返回
if exists {
return channel
}
// 若不存在source对应channel,则新建一个channel
newChannel := make(chan interface{})
// 把新channel放入sources中,以供之后调用
m.sources[source] = newChannel
// 定时调用m.listen()方法,合并channel中的消息到PodConfig.updates中
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
Mux.listen()
循环listenChannel,即若listenChannel中有消息就取出并调用podStorage.Merge()方法处理
// pkg/util/config/config.go
// 循环listenChannel,即若listenChannel中有消息就取出并调用podStorage.Merge()方法处理
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}
podStorage.Merge()
合并将来自不同来源的一组传入更改标准化为所有Pod的映射,并确保过滤掉多余的更改,然后将零个或多个最小更新推送到更新通道podConfig.updates中,确保按顺序交付更新
- 调用podStorage.merge()方法把传入的pod变更消息转化为不同类型的kubetypes.PodUpdate对象
- 把各种类型的kubetypes.PodUpdate对象整理放入podConfig.updates中
// pkg/kubelet/config/config.go
// 合并并标准化来自不同配置源的pod变更消息,过滤多余的更改,然后将更改推送到podConfig.updates中,以确保变更顺序
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
// 调用podStorage.merge()方法把传入的pod变更消息整理转化为不同类型的kubetypes.PodUpdate对象
adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)
// 把各种类型的kubetypes.PodUpdate对象整理放入podConfig.updates中
switch s.mode {
case PodConfigNotificationIncremental:
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if len(restores.Pods) > 0 {
s.updates <- *restores
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
}
case PodConfigNotificationSnapshotAndUpdates:
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
case PodConfigNotificationUnknown:
fallthrough
default:
panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
}
return nil
}
podStorage.merge()
把传入的pod变更消息整理成不通类型的kubetypes.PodUpdate对象
- 根据不通的变更类型(ADD,UPDATE等),调用updatePodsFunc方法,整理成不同类型的kubetypes.PodUpdate数组
- 把各个数组放入对应类型的kubetypes.PodUpdate对象中
// pkg/kubelet/config/config.go
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles, restores *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()
addPods := []*v1.Pod{}
updatePods := []*v1.Pod{}
deletePods := []*v1.Pod{}
removePods := []*v1.Pod{}
reconcilePods := []*v1.Pod{}
restorePods := []*v1.Pod{}
pods := s.pods[source]
if pods == nil {
pods = make(map[types.UID]*v1.Pod)
}
// updatePodFunc是用于使用新容器* newPods *更新容器缓存* oldPods *的本地函数。 更新后,新的Pod将存储在Pod缓存* pods *中。 请注意,* pods *和* oldPods *可能是同一缓存。
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered {
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} else if needReconcile {
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
deletePods = append(deletePods, existing)
}
continue
}
recordFirstSeenTime(ref)
pods[ref.UID] = ref
addPods = append(addPods, ref)
}
}
update := change.(kubetypes.PodUpdate)
// 根据变更类型调用updatePodsFunc
switch update.Op {
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
if update.Op == kubetypes.ADD {
klog.V(4).Infof("Adding new pods from source %s : %v", source, update.Pods)
} else if update.Op == kubetypes.DELETE {
klog.V(4).Infof("Graceful deleting pods from source %s : %v", source, update.Pods)
} else {
klog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods)
}
updatePodsFunc(update.Pods, pods, pods)
case kubetypes.REMOVE:
klog.V(4).Infof("Removing pods from source %s : %v", source, update.Pods)
for _, value := range update.Pods {
if existing, found := pods[value.UID]; found {
// this is a delete
delete(pods, value.UID)
removePods = append(removePods, existing)
continue
}
// this is a no-op
}
case kubetypes.SET:
klog.V(4).Infof("Setting pods for source %s", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
removePods = append(removePods, existing)
}
}
case kubetypes.RESTORE:
klog.V(4).Infof("Restoring pods for source %s", source)
restorePods = append(restorePods, update.Pods...)
default:
klog.Warningf("Received invalid update type: %v", update)
}
s.pods[source] = pods
// 把各个数组放入对应类型的kubetypes.PodUpdate对象中
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
restores = &kubetypes.PodUpdate{Op: kubetypes.RESTORE, Pods: copyPods(restorePods), Source: source}
return adds, updates, deletes, removes, reconciles, restores
}
创建用于接收apiserver消息的监听器
NewSourceApiserver()
初始化一个用于监听所有pod的ListerWatcher,调用newSourceApiserverFromLW()方法创建并启动apiserver监听器
// pkg/kubelet/config/apiserver.go
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
newSourceApiserverFromLW()
创建一个Reflector,以send方法作为存储消息的方法,并启动Reflector,send方法即把所有接收到的消息都放入传入的channel中,即PodConfig.Channel()方法创建的channel
// pkg/kubelet/config/apiserver.go
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
// Reflector即以lw作为监视器从apiserver获取消息,然后通过调用send方法处理消息
// 这里就是调用send方法把消息存入updates中
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}
传递PodConfig.updates到被消费处
在RunKubelet()方法中
- 调用createAndInitKubelet()传入kubeDeps—>kubelet.NewMainKubelet()传入kubeDeps—>makePodSourceConfig(),最终在makePodSourceConfig()方法中创建config.PodConfig并返回赋值给传入的kubeDeps.PodConfig,最终返回到createAndInitKubelet()
- 在RunKubelet()方法的最后,调用startKubelet()方法时传入kubeDeps.PodConfig,在startKubelet()中调用Kubelet.Run()时传入kubeDeps.PodConfig.updates,实现updates通道的传递,供syncLoop消费
消费PodConfig.updates中的消息,并根据消息更新pod
UML
流程:
- 在Kubelet.Run()方法中执行Kubelet.syncLoop()方法,开始循环处理updates中存储的变更,同步pod当前状态与目标状态
- 层层调用后,最终在podWorkers.managePodLoop()方法中回调在初始化podWorkers时传入的Kubelet.syncPod()方法
- Kubelet.syncPod()方法中调用kubeGenericRuntimeManager.SyncPod()方法真正**开始同步pod的当前状态与目标状态**
- 调用kubeGenericRuntimeManager.computePodActions()计算当前状态和目标状态的不同,即sandbox和container的变化
- 如果sandbox发生变化,则调用kubeGenericRuntimeManager.killPodWithSyncResult()结束sandbox
- 若sandbox没发生变化,则处理container的变化,循环调用kubeGenericRuntimeManager.killContainer()结束所有不需要持有的container
- 若sandbox发生变化且已经被结束,则需要调用kubeGenericRuntimeManager.createPodSandbox()创建新的sandbox
- 在新建的sandbox上调用start()方法创建init containers
- 在新建的sandbox上调用start()方法创建containers
- 同步完成后返回
关键代码分析
循环处理updates中的变更消息
Kubelet.syncLoop
Kubelet.syncLoop()方法是处理从updates中获取的变更消息,根据变更同步pod的目标状态和运行状态,永不返回
- 循环调用Kubelet.syncLoopIteration(),处理updates中的消息
// pkg/kubelet/kubelet.go
// Kubelet.syncLoop()方法是处理从updates中获取的变更消息,根据变更同步pod的目标状态和运行状态,永不返回
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
...
for {
...
kl.syncLoopMonitor.Store(kl.clock.Now())
// 调用Kubelet.syncLoopIteration(),处理updates中的消息
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
Kubelet.syncLoopIteration
从不通的channel中读取消息,并作出对应的操作
- configCh: 存储pod变更消息的channel
- 调用Kubelet.HandlePodAdditions()进行新增操作(这里以新增作为举例,其他操作类似)
- syncCh: 存储同步pod状态消息的channel
- housekeepingCh: 存储housekeeping消息的channel
- plegCh: 存储PLEG消息的channel
// pkg/kubelet/kubelet.go
// 从channel中取出消息,并调用对应handler进行处理
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
// 处理pod变更消息
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD: // 处理添加操作
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// 调用Kubelet.HandlePodAdditions()进行新增操作
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:// 处理更新操作
klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:// 处理移除操作
klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:// 处理删除操作
klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
// These are pods restored from the checkpoint. Treat them as new
// pods.
handler.HandlePodAdditions(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.Errorf("Kubelet does not support snapshot update")
}
if u.Op != kubetypes.RESTORE {
kl.sourcesReady.AddSource(u.Source)
}
// 处理PLEG消息
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
// 处理同步消息,即同步所有需要同步的pod,相当于和apiserver同步一次数据
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
handler.HandlePodSyncs(podsToSync)
// 触发Pod的清理
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.
// We should not use the pod from livenessManager, because it is never updated after
// initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
}
// 同步失败的Pod,或其中一个或多个容器的活动检查失败的容器
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
klog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
klog.Errorf("Failed cleaning pods: %v", err)
}
}
}
return true
}
Kubelet.HandlePodAdditions
对pod数组进行新增操作
- 循环pod数组,调用Kubelet.dispatchWork()对单个pod进行操作
// pkg/kubelet/kubelet.go
// 对pod数组进行新增操作
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
kl.podManager.AddPod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
if !kl.podIsTerminated(pod) {
activePods := kl.filterOutTerminatedPods(existingPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
// 调用Kubelet.dispatchWork()对单个pod进行操作
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
Kubelet.dispatchWork
调用podWorkers对pod进行变更处理
- 调用podWorkers.UpdatePod()处理pod
// pkg/kubelet/kubelet.go
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
...
// 调用podWorkers.UpdatePod()处理pod
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
}
},
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
podWorkers.UpdatePod
将新的配置更新到指定pod中
- 启动一个新的线程,调用podWorkers.managePodLoop()方法操作pod
// pkg/kubelet/pod_workers.go
// 将新的配置更新到指定pod中
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
// 启动一个新的线程,调用podWorkers.managePodLoop()方法操作pod
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
// if a request to kill a pod is pending, we do not let anything overwrite that request.
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
podWorkers.managePodLoop
调用podWorkers.syncPodFn()方法,即初始化podWorkers时传入的Kubelet.syncPod()方法
// pkg/kubelet/pod_workers.go
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
...
// 调用podWorkers.syncPodFn()方法,即初始化podWorkers时传入的Kubelet.syncPod()方法
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
...
}
}
Kubelet.syncPod
用于同步单个pod
- 如果正在创建pod,请记录pod worker启动延迟
- 调用generateAPIPodStatus为Pod准备v1.PodStatus
- 如果Pod首次运行,记录Pod的启动延迟
- 在statusManager中更新pod的状态
- 结束不是running状态的pod
- 如果该Pod是静态Pod,并且还没有镜像Pod,则创建一个Mirror Pod
- 如果不存在,则为容器创建数据目录
- 等待卷连接/挂载
- 为pod添加secrets
- 调用容器运行时的SyncPod回调,即调用kubeGenericRuntimeManager.SyncPod()方法操作pod
- 更新广告连播的入站和出站限制的流量调整
//
//
func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
// 如果正在创建pod,请记录pod worker启动延迟
if updateType == kubetypes.SyncPodCreate {
if !firstSeenTime.IsZero() {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
} else {
klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
}
}
// 调用generateAPIPodStatus为Pod准备v1.PodStatus
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus
podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
for _, ipInfo := range apiPodStatus.PodIPs {
podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
}
if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
podStatus.IPs = []string{apiPodStatus.PodIP}
}
// 如果Pod首次运行,记录Pod的启动延迟
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
}
runnable := kl.canRunPod(pod)
if !runnable.Admit {
// Pod is not runnable; update the Pod and Container statuses to why.
apiPodStatus.Reason = runnable.Reason
apiPodStatus.Message = runnable.Message
// Waiting containers are not creating.
const waitingReason = "Blocked"
for _, cs := range apiPodStatus.InitContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
for _, cs := range apiPodStatus.ContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
}
// 在statusManager中更新pod的状态
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// 结束不是running状态的pod
if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
var syncErr error
if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
syncErr = fmt.Errorf("error killing pod: %v", err)
utilruntime.HandleError(syncErr)
} else {
if !runnable.Admit {
// There was no error killing the pod, but the pod cannot be run.
// Return an error to signal that the sync loop should back off.
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
}
}
return syncErr
}
...
// 如果该Pod是静态Pod,并且还没有镜像Pod,则创建一个Mirror Pod
if kubetypes.IsStaticPod(pod) {
podFullName := kubecontainer.GetPodFullName(pod)
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.Infof("Trying to delete pod %s %v", podFullName, mirrorPod.ObjectMeta.UID)
var err error
deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.Warningf("Deleted mirror pod %q because it is outdated", format.Pod(mirrorPod))
} else if err != nil {
klog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil || node.DeletionTimestamp != nil {
klog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)
} else {
klog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
klog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
}
}
}
}
// 如果不存在,则为容器创建数据目录
if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
return err
}
if !kl.podIsTerminated(pod) {
// 等待卷连接/挂载
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
return err
}
}
// 为pod添加secrets
pullSecrets := kl.getPullSecretsForPod(pod)
// 调用容器运行时的SyncPod回调,即调用kubeGenericRuntimeManager.SyncPod()方法操作pod
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
...
return nil
}
根据变更消息同步pod状态
kubeGenericRuntimeManager.SyncPod
SyncPod通过执行以下步骤将正在运行的Pod同步至目标状态
- 调用kubeGenericRuntimeManager.computePodActions()计算sandbox和container的变化,返回podContainerChanges
- 如果sandbox发生变化则调用kubeGenericRuntimeManager.killPodWithSyncResult()结束sandbox
- 调用kubeGenericRuntimeManager.killContainer()结束所有pod中不需要持有的container
- 如有必要,调用kubeGenericRuntimeManager.createPodSandbox()创建新的sandbox
- 调用start()创建临时容器
// pkg/kubelet/kuberuntime/kuberuntime_manager.go
// SyncPod通过执行以下步骤将正在运行的Pod同步成目标状态
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1:调用kubeGenericRuntimeManager.computePodActions()计算sandbox和container的变化,返回podContainerChanges
podContainerChanges := m.computePodActions(pod, podStatus)
...
// Step 2: 如果sandbox发生变化则调用kubeGenericRuntimeManager.killPodWithSyncResult()结束sandbox
if podContainerChanges.KillPod {
...
// 结束pod sandbox
killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
klog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
return
}
if podContainerChanges.CreateSandbox {
m.purgeInitContainers(pod, podStatus)
}
} else {
// Step 3: 调用kubeGenericRuntimeManager.killContainer()结束所有pod中不需要持有的container
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
result.AddSyncResult(killContainerResult)
if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
return
}
}
}
...
// Step 4: 如有必要,调用kubeGenericRuntimeManager.createPodSandbox()创建新的sandbox
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
...
//调用kubeGenericRuntimeManager.createPodSandbox()创建新的sandbox
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
...
}
...
// start 方法用于创建不同类型的container
start := func(typeName string, spec *startSpec) error {
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
result.AddSyncResult(startContainerResult)
isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
klog.V(4).Infof("Backing Off restarting %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
return err
}
klog.V(4).Infof("Creating %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
// NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
klog.V(3).Infof("%v start failed: %v: %s", typeName, err, msg)
default:
utilruntime.HandleError(fmt.Errorf("%v start failed: %v: %s", typeName, err, msg))
}
return err
}
return nil
}
// Step 5: 调用start()创建临时容器
if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
for _, idx := range podContainerChanges.EphemeralContainersToStart {
start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
}
}
// Step 6: 调用start()创建init容器
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
if err := start("init container", containerStartSpec(container)); err != nil {
return
}
// Successfully started the container; clear the entry in the failure
klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
}
// Step 7: 调用start()创建正常容器
for _, idx := range podContainerChanges.ContainersToStart {
start("container", containerStartSpec(&pod.Spec.Containers[idx]))
}
return
}
为pod创建sandbox
kubeGenericRuntimeManager.createPodSandbox
调用RemoteRuntimeService.RunSandbox()创建sandbox
// pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
// 创建一个pod sandbox并返回podSandBoxID, message, error
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
...
// 调用dockerService.RunSandbox创建sandbox
podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
...
return podSandBoxID, "", nil
}
RemoteRuntimeService.RunPodSandbox
调用dockerService.RunPodSandbox()并传入配置参数
// pkg/kubelet/remote/remote_runtime.go
// 创建并运行一个pod级别的sandbox,确保其处于就绪状态
func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
...
//调用dockerService.RunPodSandbox并传入配置参数
resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
Config: config,
RuntimeHandler: runtimeHandler,
})
...
return resp.PodSandboxId, nil
}
dockerService.RunPodSandbox
- 拉取镜像
- 调用kubeDockerClient.CreateContainer()方法创建container,即最终像docker发送一个post请求,创建container
- 创建checkpoint
- 调用kubeDockerClient.StartContainer()方法启动sandbox container
- 为sandbox配置network,所有的Pod网络都是由启动时发现的CNI插件设置的。
// pkg/kubelet/dockershim/docker_sandbox.go
// 创建一个包含network namespace 的container
func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
config := r.GetConfig()
...
// 1,拉取镜像
if err := ensureSandboxImageExists(ds.client, image); err != nil {
return nil, err
}
...
// 2,调用kubeDockerClient.CreateContainer创建sandbox
createResp, err := ds.client.CreateContainer(*createConfig)
if err != nil {
createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err)
}
...
// 3, 创建checkpoint
if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
return nil, err
}
// 4, 启动sandbox
err = ds.client.StartContainer(createResp.ID)
if err != nil {
return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)
}
...
// 5, 为sandbox配置network,所有的Pod网络都是由启动时发现的CNI插件设置的。
cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
...
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)
...
return resp, nil
}
为pod创建container
start
start()主要功能是创建并启动不同类型的container,start()是在kubeGenericRuntimeManager.SyncPod()方法中声明的一个临时方法,只能在kubeGenericRuntimeManager.SyncPod()方法中被调用,
- 调用kubeGenericRuntimeManager.startContainer()方法创建container
// pkg/kubelet/kuberuntime/kuberuntime_manager.go
// start方法是在kubeGenericRuntimeManager.SyncPod()方法中声明的一个临时方法,只能在kubeGenericRuntimeManager.SyncPod()方法中被调用
start := func(typeName string, spec *startSpec) error {
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
result.AddSyncResult(startContainerResult)
// 若处在backoff状态,则创建失败
isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
klog.V(4).Infof("Backing Off restarting %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
return err
}
klog.V(4).Infof("Creating %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
// NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
// 开始创建container
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
klog.V(3).Infof("%v start failed: %v: %s", typeName, err, msg)
default:
utilruntime.HandleError(fmt.Errorf("%v start failed: %v: %s", typeName, err, msg))
}
return err
}
return nil
}
kubeGenericRuntimeManager.startContainer
启动container并返回容器状态
- 拉取镜像
- 调用RemoteRuntimeService.CreateContainer()创建container
- 调用RemoteRuntimeService.StartContainer()启动刚创建的container
- 执行启动hook,即生命周期探针
// pkg/kubelet/kuberuntime/kuberuntime_container.go
// 启动container并返回容器状态
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
container := spec.container
// Step 1: 拉取镜像
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
return msg, err
}
// Step 2: 创建镜像
...
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
...
// Step 3: 启动镜像
err = m.runtimeService.StartContainer(containerID)
...
// Step 4: 执行启动hook
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
kubeContainerID := kubecontainer.ContainerID{
Type: m.runtimeName,
ID: containerID,
}
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
klog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
}
return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)
}
}
return "", nil
}
RemoteRuntimeService.CreateContainer
在指定的podSandbox中创建container
- 调用dockerService.CreateContainer()方法创建container
// pkg/kubelet/remote/remote_runtime.go
// 在指定的podSandbox中创建container
func (r *RemoteRuntimeService) CreateContainer(podSandBoxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
// 调用dockerService.CreateContainer()创建container
resp, err := r.runtimeClient.CreateContainer(ctx, &runtimeapi.CreateContainerRequest{
PodSandboxId: podSandBoxID,
Config: config,
SandboxConfig: sandboxConfig,
})
if err != nil {
klog.Errorf("CreateContainer in sandbox %q from runtime service failed: %v", podSandBoxID, err)
return "", err
}
if resp.ContainerId == "" {
errorMessage := fmt.Sprintf("ContainerId is not set for container %q", config.GetMetadata())
klog.Errorf("CreateContainer failed: %s", errorMessage)
return "", errors.New(errorMessage)
}
return resp.ContainerId, nil
}
RemoteRuntimeService.StartContainer
启动指定的container
- 调用dockerService.StartContainer()方法启动container
// 启动指定的container
func (r *RemoteRuntimeService) StartContainer(containerID string) error {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
//
_, err := r.runtimeClient.StartContainer(ctx, &runtimeapi.StartContainerRequest{
ContainerId: containerID,
})
if err != nil {
klog.Errorf("StartContainer %q from runtime service failed: %v", containerID, err)
return err
}
return nil
}
dockerService.CreateContainer
在指定podSandbox中创建container
- 调用kubeDockerClient.CreateContainer()创建container
// pkg/kubelet/dockershim/docker_container.go
// 在指定podSandbox中创建container
func (ds *dockerService) CreateContainer(_ context.Context, r *runtimeapi.CreateContainerRequest) (*runtimeapi.CreateContainerResponse, error) {
...
// 调用kubeDockerClient.CreateContainer创建container
createResp, createErr := ds.client.CreateContainer(createConfig)
...
return nil, createErr
}
dockerService.StartContainer
启动指定container
- 调用kubeDockerClient.StartContainer()方法启动container
- 为所有容器(包括失败的容器)创建容器日志符号链接。
// pkg/kubelet/dockershim/docker_container.go
// 启动指定container
func (ds *dockerService) StartContainer(_ context.Context, r *runtimeapi.StartContainerRequest) (*runtimeapi.StartContainerResponse, error) {
// 调用kubeDockerClient.StartContainer()方法启动container
err := ds.client.StartContainer(r.ContainerId)
// 为所有容器(包括失败的容器)创建容器日志符号链接。
if linkError := ds.createContainerLogSymlink(r.ContainerId); linkError != nil {
// Do not stop the container if we failed to create symlink because:
// 1. This is not a critical failure.
// 2. We don't have enough information to properly stop container here.
// Kubelet will surface this error to user via an event.
return nil, linkError
}
if err != nil {
err = transformStartContainerError(err)
return nil, fmt.Errorf("failed to start container %q: %v", r.ContainerId, err)
}
return &runtimeapi.StartContainerResponse{}, nil
}
底层公用方法
kubeDockerClient.CreateContainer
调用github.com/docker/docker/client包下的Client对象,向docker服务发送post请求创建container
// pkg/kubelet/dockershim/libdocker/kube_docker_client.go
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
// we provide an explicit default shm size as to not depend on docker daemon.
// TODO: evaluate exposing this as a knob in the API
if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
opts.HostConfig.ShmSize = defaultShmSize
}
// 调用dokcer.client向docker服务发送创建请求
createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return &createResp, nil
}
// github.com/docker/docker/client包下的代码
func (cli *Client) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, containerName string) (container.ContainerCreateCreatedBody, error) {
...
// 发送post请求,创建container
serverResp, err := cli.post(ctx, "/containers/create", query, body, nil)
defer ensureReaderClosed(serverResp)
if err != nil {
return response, err
}
err = json.NewDecoder(serverResp.body).Decode(&response)
return response, err
}
kubeDockerClient.StartContainer
调用github.com/docker/docker/client包下的Client对象,向docker服务发送post请求启动container
// pkg/kubelet/dockershim/libdocker/kube_docker_client.go
func (d *kubeDockerClient) StartContainer(id string) error {
ctx, cancel := d.getTimeoutContext()
defer cancel()
// 调用dokcer.client向docker服务发送启动请求
err := d.client.ContainerStart(ctx, id, dockertypes.ContainerStartOptions{})
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
// github.com/docker/docker/client包下的代码
func (cli *Client) ContainerStart(ctx context.Context, containerID string, options types.ContainerStartOptions) error {
query := url.Values{}
if len(options.CheckpointID) != 0 {
query.Set("checkpoint", options.CheckpointID)
}
if len(options.CheckpointDir) != 0 {
query.Set("checkpoint-dir", options.CheckpointDir)
}
// 发送post请求,启动指定container
resp, err := cli.post(ctx, "/containers/"+containerID+"/start", query, nil, nil)
ensureReaderClosed(resp)
return err
}
PLEG(本地状态监控与回写)
UML
关键代码分析
初始化PLEG
func NewMainKubelet(){
...
// 初始化一个PLEG
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
...
}
启动PLEG,监控pod状态并把差异写入GenericPLEG.eventChannel中
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// Start the pod lifecycle event generator.
kl.pleg.Start()
}
// Start spawns a goroutine to relist periodically.
func (g *GenericPLEG) Start() {
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}
// 比较新旧pod状态,生成事件
func (g *GenericPLEG) relist() {
timestamp := g.clock.Now()
// 从apiserver获取pod列表
podList, err := g.runtime.GetPods(true)
g.updateRelistTime(timestamp)
pods := kubecontainer.Pods(podList)
// 把从apiserver获取到的pod放入podRecords中作为新pod
g.podRecords.setCurrent(pods)
// Compare the old and the current pods, and generate events.
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
for pid := range g.podRecords {
// 取出podRecords中的旧pod
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// Get all containers in the old and the new pod.
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}
var needsReinspection map[types.UID]*kubecontainer.Pod
if g.cacheEnabled() {
needsReinspection = make(map[types.UID]*kubecontainer.Pod)
}
// If there are events associated with a pod, we should update the
// podCache.
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
if g.cacheEnabled() {
g.updateCache(pod, pid)
}
// Update the internal storage and send out the events.
g.podRecords.update(pid)
for i := range events {
// Filter out events that are not reliable and no other components use yet.
if events[i].Type == ContainerChanged {
continue
}
g.eventChannel <- events[i]
}
}
}
消费GenericPLEG.eventChannel中的消息,转化写入消息到manager.podStatusChannel中
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
...
plegCh := kl.pleg.Watch()
kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
...
}
// * plegCh: update the runtime cache; sync pod
func (kl *Kubelet) syncLoopIteration(){
...
select {
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
handler.HandlePodSyncs([]*v1.Pod{pod})
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
}
...
}
后续最终调用kubelet.syncPod()方法
func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
// 往statusManager中更新pod状态
kl.statusManager.SetPodStatus(pod, apiPodStatus)
...
return nil
}
func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
...
m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil)
}
func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {
...
select {
// 生成消息放入podStatusChannel中
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
pod.UID, newStatus.version, newStatus.status)
return true
default:
// Let the periodic syncBatch handle the update if the channel is full.
// We can't block, since we hold the mutex lock.
klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
format.Pod(pod), status)
return false
}
}
消费manager.podStatusChannel中的消息,调用kubeClient向apiserver中回写pod状态
func (m *manager) Start() {
...
go wait.Forever(func() {
for {
select {
// 消费podStatusChannel中的消息,向apiserver回写pod状态
case syncRequest := <-m.podStatusChannel:
klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
klog.V(5).Infof("Status Manager: syncing batch")
// remove any entries in the status channel since the batch will handle them
for i := len(m.podStatusChannel); i > 0; i-- {
<-m.podStatusChannel
}
m.syncBatch()
}
}
}, 0)
}
// 向apiserver回写pod状态
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if !m.needsUpdate(uid, status) {
klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
return
}
...
// 向apiserver回写状态
newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
...
}
流程总图
从apiserver中读取pod消息,并存入updates中
- PodConfig利用Mux创建updatechannel
- PodConfig通过Reflector机制,从apiserver中同步pod消息,并使用Mux创建的updatechannel来接收Reflector同步的消息
- 循环从updatechannel中取出PodUpdate,调用podStorage.Merge()方法把消息整理同步到updates中
从updates中消费消息,并根据消息变更pod状态
PLEG过程
参考文章
https://cloud.tencent.com/developer/article/1492108
https://www.jianshu.com/p/eec91fa9d57a