13. EventBroadcaster 事件管理器
EventBroadcaster 事件管理器
Kubernetes 的事件(Event)是一种资源对象(Resource Object),用于展示集群内发生的情况,Kubernetes 系统中的各个组件会将运行时发生的各种事件上报给 Kubernetes API Server。例如,调度器做了什么决定,某些 Pod 为什么被从节点中驱逐。可以通过 kubectl get event
或 kubectl describe pod <podname>
命令显示事件,查看 Kubernetes 集群中发生了哪些事件。执行这些命令后,默认情况下只会显示最近(1 小时内)发生的事件。
注意 :此处的 Event 事件是 Kubernetes 所管理的 Event 资源对象,而非 Etcd 集群监控机制产生的回调事件,需要注意区分。
由于 Kubernetes 的事件是一种资源对象,因此它们存储在 Kubernetes API Server 的 Etcd 集群中。为避免磁盘空间被填满,故强制执行保留策略:在最后一次的事件发生后,删除 1 小时之前发生的事件。
Kubernetes 系统以 Pod 资源为核心,Deployment、StatefulSet、ReplicaSet、DaemonSet、CronJob 等,最终都会创建出 Pod。因此 Kubernetes 事件也是围绕 Pod 进行的,在 Pod 生命周期内的关键步骤中都会产生事件消息。Event 资源数据结构体定义在 core 资源组下,代码示例:
vendor/k8s.io/api/core/v1/types.go // Event is a report of an event somewhere in the cluster. Events // have a limited retention time and triggers and messages may evolve // with time. Event consumers should not rely on the timing of an event // with a given Reason reflecting a consistent underlying trigger, or the // continued existence of events with that Reason. Events should be // treated as informative, best-effort, supplemental data. type Event struct { metav1.TypeMeta `json:",inline"` // Standard object's metadata. // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"` // The object that this event is about. InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"` // This should be a short, machine understandable string that gives the reason // for the transition into the object's current status. // TODO: provide exact specification for format. // +optional Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"` // A human-readable description of the status of this operation. // TODO: decide on maximum length. // +optional Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"` // The component reporting this event. Should be a short machine understandable string. // +optional Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"` // The time at which the event was first recorded. (Time of server receipt is in TypeMeta.) // +optional FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"` // The time at which the most recent occurrence of this event was recorded. // +optional LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"` // The number of times this event has occurred. // +optional Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"` // Type of this event (Normal, Warning), new types could be added in the future // +optional Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"` // Time when this Event was first observed. // +optional EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"` // Data about the Event series this event represents or nil if it's a singleton Event. // +optional Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"` // What action was taken/failed regarding to the Regarding object. // +optional Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"` // Optional secondary object for more complex actions. // +optional Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"` // Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`. // +optional ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"` // ID of the controller instance, e.g. `kubelet-xyzf`. // +optional ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"` }
Event 资源数据结构体描述了当前时间段内发生了哪些关键性事件。事件有两种类型,分别为 Normal 和 Warning,前者为正常事件,后者为警告事件。代码示例如下:
// Valid values for event types (new types could be added in future) const ( // Information only and will not cause any problems EventTypeNormal string = "Normal" // These events are to warn that something might go wrong EventTypeWarning string = "Warning" )
<br>
EventBroadcaster事件管理机制运行原理
Actor 可以是 Kubernetes 系统中的任意组件,当组件中发生了一些关键性事件时,可通过 EventRecorder 记录该事件。EventBroadcaster 事件管理机制可分为如下部分。
● EventRecorder :事件(Event)生产者,也称为事件记录器。Kubernetes 系统组件通过 EventRecorder 记录关键性事件。
● EventBroadcaster :事件(Event)消费者,也称为事件广播器。EventBroadcaster 消费 EventRecorder 记录的事件并将其分发给目前所有已连接的 broadcasterWatcher。分发过程有两种机制,分别是非阻塞(Non-Blocking)分发机制和阻塞(Blocking)分发机制。
● broadcasterWatcher :观察者(Watcher)管理,用于定义事件的处理方式,例如上报事件至 Kubernetes API Server。
1.EventRecorder
EventRecorder 拥有如下 4 种记录方法,EventRecorder 事件记录器接口代码示例代码:
vendor/k8s.io/client-go/tools/record/event.go // EventRecorder knows how to record events on behalf of an EventSource. type EventRecorder interface { // Event constructs an event from the given information and puts it in the queue for sending. // 'object' is the object this event is about. Event will make a reference-- or you may also // pass a reference to the object directly. // 'type' of this event, and can be one of Normal, Warning. New types could be added in future // 'reason' is the reason this event is generated. 'reason' should be short and unique; it // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used // to automate handling of events, so imagine people writing switch statements to handle them. // You want to make that easy. // 'message' is intended to be human readable. // // The resulting event will be created in the same namespace as the reference object. Event(object runtime.Object, eventtype, reason, message string) // Eventf is just like Event, but with Sprintf for the message field. Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) // AnnotatedEventf is just like eventf, but with annotations attached AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) }
● Event :对刚发生的事件进行记录。
● Eventf :通过使用 fmt.Sprintf 格式化输出事件的格式。
● PastEventf :允许自定义事件发生的时间,以记录已经发生过的消息。
● AnnotatedEventf :功能与 Eventf 一样,但附加了注释(Annotations)字段。
以 Event 方法为例,记录当前发生的事件,Event→recorder.generateEvent→recorder.Action
示例代码 :vendor/k8s.io/apimachinery/pkg/watch/mux.go // Action distributes the given event among all watchers. func (m *Broadcaster) Action(action EventType, obj runtime.Object) { m.incoming <- Event{action, obj} }
Action 函数通过 goroutine 实现异步操作,该函数将事件写入 m.incommit Chan 中,完成事件生产过程。
2.EventBroadcaster
EventBroadcaster 消费 EventRecorder 记录的事件并将其分发给目前所有已连接的 broadcasterWatcher。EventBroadcaster 通过 NewBroadcaster 函数进行实例化:
代码路径:vendor/k8s.io/client-go/tools/record/event.go // Creates a new event broadcaster. func NewBroadcaster() EventBroadcaster { return &eventBroadcasterImpl{ Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, } }
在实例化过程中,会通过 watch.NewBroadcaster 函数在内部启动 goroutine(即 m.loop 函数)来监控 m.incoming,并将监控的事件通过 m.distribute 函数分发给所有已连接的 broadcasterWatcher。分发过程有两种机制,分别是非阻塞分发机制和阻塞分发机制。在非阻塞分发机制下使用 DropIfChannelFull 标识,在阻塞分发机制下使用 WaitIfChannelFull 标识,默认为 DropIfChannelFull 标识。代码路径:
vendor/k8s.io/apimachinery/pkg/watch/mux.go // distribute sends event to all watchers. Blocking. func (m *Broadcaster) distribute(event Event) { if m.fullChannelBehavior == DropIfChannelFull { for _, w := range m.watchers { select { case w.result <- event: case <-w.stopped: default: // Don't block if the event can't be queued. } } } else { for _, w := range m.watchers { select { case w.result <- event: case <-w.stopped: } } } }
在分发过程中,DropIfChannelFull 标识位于 select 多路复用中,使用 default 关键字做非阻塞分发,当 w.result 缓冲区满的时候,事件会丢失。WaitIfChannelFull 标识也位于 select 多路复用中,没有 default 关键字,当 w.result 缓冲区满的时候,分发过程会阻塞并等待。
注意 :Kubernetes 中的事件与其他的资源不同,它有一个很重要的特性,那就是它可以丢失。因为随着 Kubernetes 系统集群规模越来越大,上报的事件越来越多,每次上报事件都要对 Etcd 集群进行读/写,这样会给 Etcd 集群带来很大的压力。如果某个事件丢失了,并不会影响集群的正常工作,事件的重要性远低于集群的稳定性,所以可以看到源码中当 w.result 缓冲区满的时候,在非阻塞分发机制下事件会丢失。
3.broadcasterWatcher
broadcasterWatcher 是每个 Kubernetes 系统组件自定义处理事件的方式。例如,上报事件至 Kubernetes API Server。每个 broadcasterWatcher 拥有两种自定义处理事件的函数,分别介绍如下。
● StartLogging :将事件写入日志中。
● StartRecordingToSink :将事件上报至 Kubernetes API Server 并存储至 Etcd 集群。
以 kube-scheduler 组件为例,该组件作为一个 broadcasterWatcher,通过 StartLogging 函数将事件输出至 klog stdout 标准输出,通过 StartRecordingToSink 函数将关键性事件上报至 Kubernetes API Server。代码:
cmd/kube-scheduler/app/server.go // Prepare the event broadcaster. cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
StartLogging 和 StartRecordingToSink 函数依赖于 StartEventWatcher 函数,该函数内部运行了一个 goroutine,用于不断监控 EventBroadcaster 来发现事件并调用相关函数对事件进行处理。
下面重点介绍一下 StartRecordingToSink 函数,kube-scheduler 组件将 v1core.EventSinkImpl 作为上报事件的自定义函数。上报事件有 3 种方法,分别是 Create(即 Post 方法)、Update(即 Put 方法)、Patch(Patch 方法)。以 Create 方法为例,Create→e.Interface.CreateWithEventNamespace
代码示例如下:
代码路径:vendor/k8s.io/client-go/kubernetes/typed/core/v1/event_expansion.go // CreateWithEventNamespace makes a new event. Returns the copy of the event the server returns, // or an error. The namespace to create the event within is deduced from the // event; it must either match this event client's namespace, or this event // client must have been created with the "" namespace. func (e *events) CreateWithEventNamespace(event *v1.Event) (*v1.Event, error) { if e.ns != "" && event.Namespace != e.ns { return nil, fmt.Errorf("can't create an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns) } result := &v1.Event{} err := e.client.Post(). NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0). Resource("events"). Body(event). Do(context.TODO()). Into(result) return result, err }
上报过程通过 RESTClient 发送 Post 请求,将事件发送至 Kubernetes API Server,最终存储在 Etcd 集群中。
Kubernetes源码阅读