Kubernetes事件是一种资源对象,用于记录和展示集群内发生的情况,kubernetes中的各个组件会将运行时发生的事件上报给kube-apiserver。然后我们可以通过 kubectl get events或者使用kubectl describe
由于Kubernetes是一种资源对象,它的所有信息都会存储在Etcd中,为了减少磁盘的开销,故强制执行保留策略,即在最后一次事件发生后,删除1小时之前发生的事件。
Kubernetes系统以Pod资源为核心,Deployment、StatefulSet、ReplicaSet、DaemonSet、CronJob等,最终都会创建出Pod。因此Kubernetes事件也是围绕Pod进行的,在Pod生命周期内的关键步骤中都会产生事件消息。Event资源数据结构体定义在core资源组下,代码示例如下(源码:staging\src\k8s.io\api\core\v1\types.go):
type Event struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}
Event资源数据结构体描述了当前时间段内发生了哪些关键性事件。事件有两种类型,分别为Normal和Warning,前者为正常事件,后者为警告事件。代码示例如下:
const (
EventTypeNormal string = "Normal"
EventTypeWarning string = "Warning"
)
其中:
- Normal是正常事件
- Warning是警告事件
EventBroadcaster
在kubernetes中,事件是通过EventBroadcaster来进行管理的。它主要包含:
- EventRecorder:事件(Event)生产者,也称为事件记录器。Kubernetes系统组件通过EventRecorder记录关键性事件。
- EventBroadcaster:事件(Event)消费者,也称为事件广播器。EventBroadcaster消费EventRecorder记录的事件并将其分发给目前所有已连接的broadcasterWatcher。分发过程有两种机制,分别是非阻塞(Non-Blocking)分发机制和阻塞(Blocking)分发机制。
- broadcasterWatcher:观察者(Watcher)管理,用于定义事件的处理方式,例如上报事件至Kubernetes API Server。
EventRecorder
EventRecorder是事件记录器,其接口代码如下(源码:staging\src\k8s.io\client-go\tools\events\interfaces.go):
type EventRecorder interface {
Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})
}
其中Eventf就是使用fmt.Sprintf格式化输出事件的格式。
EventBroadcaster
EventBroadcaster是事件消费者,消费EventRecorder记录的事件并将其分发给目前所有已连接的broadcasterWatcher。EventBroadcaster通过NewBroadcaster函数进行实例化(源码:staging\src\k8s.io\client-go\tools\events\event_broadcaster.go):
func NewBroadcaster(sink EventSink) EventBroadcaster {
return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*eventsv1.Event{})
}
在实例化过程中,会通过watch.NewBroadcaster函数在内部启动goroutine(即m.loop函数)来监控m.incoming,并将监控的事件通过m.distribute函数分发给所有已连接的broadcasterWatcher。
过程如下:
(1)调用newBroadcaster,代码如下(源码:staging\src\k8s.io\client-go\tools\events\event_broadcaster.go):
func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*eventsv1.Event) EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
eventCache: eventCache,
sleepDuration: sleepDuration,
sink: sink,
}
}
(2)然后watch.NewBroadcaster会启动goroutine,如下(源码:staging\src\k8s.io\apimachinery\pkg\watch\mux.go):
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, incomingQueueLength),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1)
go m.loop()
return m
}
(3)m.loop()会监控m.incoming,获取事件,代码如下(源码:staging\src\k8s.io\apimachinery\pkg\watch\mux.go):
func (m *Broadcaster) loop() {
// Deliberately not catching crashes here. Yes, bring down the process if there's a
// bug in watch.Broadcaster.
for event := range m.incoming {
if event.Type == internalRunFunctionMarker {
event.Object.(functionFakeRuntimeObject)()
continue
}
m.distribute(event)
}
m.closeAll()
m.distributing.Done()
}
(4)获取到事件后就传递给m.distribute(event)
,再传递给broadcasterWatcher,代码如下:
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
default: // Don't block if the event can't be queued.
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}
其中m.watchers
是map[int64]*broadcasterWatcher
类型。
分发过程有两种机制,分别是非阻塞分发机制和阻塞分发机制。在非阻塞分发机制下使用DropIfChannelFull标识,在阻塞分发机制下使用WaitIfChannelFull标识,默认为DropIfChannelFull标识。
broadcasterWatcher
broadcasterWatcher是每个Kubernetes系统组件自定义处理事件的方式。它拥有两种自定义处理事件的函数,分别介绍如下:
- StartLogging:将事件写入日志中。
- StartRecordingToSink:将事件上报至Kubernetes API Server并存储至Etcd集群。
那么整个执行的流程如下: