Kubernetes事件是一种资源对象,用于记录和展示集群内发生的情况,kubernetes中的各个组件会将运行时发生的事件上报给kube-apiserver。然后我们可以通过 kubectl get events或者使用kubectl describe 来查看事件信息,默认只会保留最近1小时的事件信息。

由于Kubernetes是一种资源对象,它的所有信息都会存储在Etcd中,为了减少磁盘的开销,故强制执行保留策略,即在最后一次事件发生后,删除1小时之前发生的事件。

Kubernetes系统以Pod资源为核心,Deployment、StatefulSet、ReplicaSet、DaemonSet、CronJob等,最终都会创建出Pod。因此Kubernetes事件也是围绕Pod进行的,在Pod生命周期内的关键步骤中都会产生事件消息。Event资源数据结构体定义在core资源组下,代码示例如下(源码:staging\src\k8s.io\api\core\v1\types.go):

  1. type Event struct {
  2. metav1.TypeMeta `json:",inline"`
  3. metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
  4. InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
  5. Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
  6. Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
  7. Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
  8. FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
  9. LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
  10. Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
  11. Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
  12. EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
  13. Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
  14. Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
  15. Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
  16. ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
  17. ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
  18. }

Event资源数据结构体描述了当前时间段内发生了哪些关键性事件。事件有两种类型,分别为Normal和Warning,前者为正常事件,后者为警告事件。代码示例如下:

  1. const (
  2. EventTypeNormal string = "Normal"
  3. EventTypeWarning string = "Warning"
  4. )

其中:

  • Normal是正常事件
  • Warning是警告事件

EventBroadcaster

在kubernetes中,事件是通过EventBroadcaster来进行管理的。它主要包含:

  • EventRecorder:事件(Event)生产者,也称为事件记录器。Kubernetes系统组件通过EventRecorder记录关键性事件。
  • EventBroadcaster:事件(Event)消费者,也称为事件广播器。EventBroadcaster消费EventRecorder记录的事件并将其分发给目前所有已连接的broadcasterWatcher。分发过程有两种机制,分别是非阻塞(Non-Blocking)分发机制和阻塞(Blocking)分发机制。
  • broadcasterWatcher:观察者(Watcher)管理,用于定义事件的处理方式,例如上报事件至Kubernetes API Server。

EventRecorder

EventRecorder是事件记录器,其接口代码如下(源码:staging\src\k8s.io\client-go\tools\events\interfaces.go):

  1. type EventRecorder interface {
  2. Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})
  3. }

其中Eventf就是使用fmt.Sprintf格式化输出事件的格式。

EventBroadcaster

EventBroadcaster是事件消费者,消费EventRecorder记录的事件并将其分发给目前所有已连接的broadcasterWatcher。EventBroadcaster通过NewBroadcaster函数进行实例化(源码:staging\src\k8s.io\client-go\tools\events\event_broadcaster.go):

  1. func NewBroadcaster(sink EventSink) EventBroadcaster {
  2. return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*eventsv1.Event{})
  3. }

在实例化过程中,会通过watch.NewBroadcaster函数在内部启动goroutine(即m.loop函数)来监控m.incoming,并将监控的事件通过m.distribute函数分发给所有已连接的broadcasterWatcher。
过程如下:
(1)调用newBroadcaster,代码如下(源码:staging\src\k8s.io\client-go\tools\events\event_broadcaster.go):

  1. func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*eventsv1.Event) EventBroadcaster {
  2. return &eventBroadcasterImpl{
  3. Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
  4. eventCache: eventCache,
  5. sleepDuration: sleepDuration,
  6. sink: sink,
  7. }
  8. }

(2)然后watch.NewBroadcaster会启动goroutine,如下(源码:staging\src\k8s.io\apimachinery\pkg\watch\mux.go):

  1. func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
  2. m := &Broadcaster{
  3. watchers: map[int64]*broadcasterWatcher{},
  4. incoming: make(chan Event, incomingQueueLength),
  5. watchQueueLength: queueLength,
  6. fullChannelBehavior: fullChannelBehavior,
  7. }
  8. m.distributing.Add(1)
  9. go m.loop()
  10. return m
  11. }

(3)m.loop()会监控m.incoming,获取事件,代码如下(源码:staging\src\k8s.io\apimachinery\pkg\watch\mux.go):

  1. func (m *Broadcaster) loop() {
  2. // Deliberately not catching crashes here. Yes, bring down the process if there's a
  3. // bug in watch.Broadcaster.
  4. for event := range m.incoming {
  5. if event.Type == internalRunFunctionMarker {
  6. event.Object.(functionFakeRuntimeObject)()
  7. continue
  8. }
  9. m.distribute(event)
  10. }
  11. m.closeAll()
  12. m.distributing.Done()
  13. }

(4)获取到事件后就传递给m.distribute(event),再传递给broadcasterWatcher,代码如下:

  1. func (m *Broadcaster) distribute(event Event) {
  2. m.lock.Lock()
  3. defer m.lock.Unlock()
  4. if m.fullChannelBehavior == DropIfChannelFull {
  5. for _, w := range m.watchers {
  6. select {
  7. case w.result <- event:
  8. case <-w.stopped:
  9. default: // Don't block if the event can't be queued.
  10. }
  11. }
  12. } else {
  13. for _, w := range m.watchers {
  14. select {
  15. case w.result <- event:
  16. case <-w.stopped:
  17. }
  18. }
  19. }
  20. }

其中m.watchersmap[int64]*broadcasterWatcher类型。

分发过程有两种机制,分别是非阻塞分发机制和阻塞分发机制。在非阻塞分发机制下使用DropIfChannelFull标识,在阻塞分发机制下使用WaitIfChannelFull标识,默认为DropIfChannelFull标识。

broadcasterWatcher

broadcasterWatcher是每个Kubernetes系统组件自定义处理事件的方式。它拥有两种自定义处理事件的函数,分别介绍如下:

  • StartLogging:将事件写入日志中。
  • StartRecordingToSink:将事件上报至Kubernetes API Server并存储至Etcd集群。

那么整个执行的流程如下:
image.png