13. EventBroadcaster 事件管理器

EventBroadcaster 事件管理器

Kubernetes 的事件(Event)是一种资源对象(Resource Object),用于展示集群内发生的情况,Kubernetes 系统中的各个组件会将运行时发生的各种事件上报给 Kubernetes API Server。例如,调度器做了什么决定,某些 Pod 为什么被从节点中驱逐。可以通过 kubectl get eventkubectl describe pod <podname> 命令显示事件,查看 Kubernetes 集群中发生了哪些事件。执行这些命令后,默认情况下只会显示最近(1 小时内)发生的事件。

注意 :此处的 Event 事件是 Kubernetes 所管理的 Event 资源对象,而非 Etcd 集群监控机制产生的回调事件,需要注意区分。

由于 Kubernetes 的事件是一种资源对象,因此它们存储在 Kubernetes API Server 的 Etcd 集群中。为避免磁盘空间被填满,故强制执行保留策略:在最后一次的事件发生后,删除 1 小时之前发生的事件

Kubernetes 系统以 Pod 资源为核心,Deployment、StatefulSet、ReplicaSet、DaemonSet、CronJob 等,最终都会创建出 Pod。因此 Kubernetes 事件也是围绕 Pod 进行的,在 Pod 生命周期内的关键步骤中都会产生事件消息。Event 资源数据结构体定义在 core 资源组下,代码示例:

vendor/k8s.io/api/core/v1/types.go

// Event is a report of an event somewhere in the cluster.  Events
// have a limited retention time and triggers and messages may evolve
// with time.  Event consumers should not rely on the timing of an event
// with a given Reason reflecting a consistent underlying trigger, or the
// continued existence of events with that Reason.  Events should be
// treated as informative, best-effort, supplemental data.
type Event struct {
  metav1.TypeMeta `json:",inline"`
  // Standard object's metadata.
  // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
  metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`

  // The object that this event is about.
  InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`

  // This should be a short, machine understandable string that gives the reason
  // for the transition into the object's current status.
  // TODO: provide exact specification for format.
  // +optional
  Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`

  // A human-readable description of the status of this operation.
  // TODO: decide on maximum length.
  // +optional
  Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`

  // The component reporting this event. Should be a short machine understandable string.
  // +optional
  Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`

  // The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
  // +optional
  FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`

  // The time at which the most recent occurrence of this event was recorded.
  // +optional
  LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`

  // The number of times this event has occurred.
  // +optional
  Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`

  // Type of this event (Normal, Warning), new types could be added in the future
  // +optional
  Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`

  // Time when this Event was first observed.
  // +optional
  EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`

  // Data about the Event series this event represents or nil if it's a singleton Event.
  // +optional
  Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`

  // What action was taken/failed regarding to the Regarding object.
  // +optional
  Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`

  // Optional secondary object for more complex actions.
  // +optional
  Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`

  // Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
  // +optional
  ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`

  // ID of the controller instance, e.g. `kubelet-xyzf`.
  // +optional
  ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}

Event 资源数据结构体描述了当前时间段内发生了哪些关键性事件。事件有两种类型,分别为 Normal 和 Warning,前者为正常事件,后者为警告事件。代码示例如下:

// Valid values for event types (new types could be added in future)
const (
  // Information only and will not cause any problems
  EventTypeNormal string = "Normal"
  // These events are to warn that something might go wrong
  EventTypeWarning string = "Warning"
)

<br>

EventBroadcaster事件管理机制运行原理

Actor 可以是 Kubernetes 系统中的任意组件,当组件中发生了一些关键性事件时,可通过 EventRecorder 记录该事件。EventBroadcaster 事件管理机制可分为如下部分。

● EventRecorder :事件(Event)生产者,也称为事件记录器。Kubernetes 系统组件通过 EventRecorder 记录关键性事件。

● EventBroadcaster :事件(Event)消费者,也称为事件广播器。EventBroadcaster 消费 EventRecorder 记录的事件并将其分发给目前所有已连接的 broadcasterWatcher。分发过程有两种机制,分别是非阻塞(Non-Blocking)分发机制和阻塞(Blocking)分发机制。

● broadcasterWatcher :观察者(Watcher)管理,用于定义事件的处理方式,例如上报事件至 Kubernetes API Server。

1.EventRecorder

EventRecorder 拥有如下 4 种记录方法,EventRecorder 事件记录器接口代码示例代码:

vendor/k8s.io/client-go/tools/record/event.go

// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
   // Event constructs an event from the given information and puts it in the queue for sending.
   // 'object' is the object this event is about. Event will make a reference-- or you may also
   // pass a reference to the object directly.
   // 'type' of this event, and can be one of Normal, Warning. New types could be added in future
   // 'reason' is the reason this event is generated. 'reason' should be short and unique; it
   // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
   // to automate handling of events, so imagine people writing switch statements to handle them.
   // You want to make that easy.
   // 'message' is intended to be human readable.
   //
   // The resulting event will be created in the same namespace as the reference object.
   Event(object runtime.Object, eventtype, reason, message string)

   // Eventf is just like Event, but with Sprintf for the message field.
   Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

   // AnnotatedEventf is just like eventf, but with annotations attached
   AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

● Event :对刚发生的事件进行记录。

● Eventf :通过使用 fmt.Sprintf 格式化输出事件的格式。

● PastEventf :允许自定义事件发生的时间,以记录已经发生过的消息。

● AnnotatedEventf :功能与 Eventf 一样,但附加了注释(Annotations)字段。

以 Event 方法为例,记录当前发生的事件,Event→recorder.generateEvent→recorder.Action

示例代码 :vendor/k8s.io/apimachinery/pkg/watch/mux.go

// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
   m.incoming <- Event{action, obj}
}

Action 函数通过 goroutine 实现异步操作,该函数将事件写入 m.incommit Chan 中,完成事件生产过程。

2.EventBroadcaster

EventBroadcaster 消费 EventRecorder 记录的事件并将其分发给目前所有已连接的 broadcasterWatcher。EventBroadcaster 通过 NewBroadcaster 函数进行实例化:

代码路径:vendor/k8s.io/client-go/tools/record/event.go

// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
   return &eventBroadcasterImpl{
      Broadcaster:   watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
      sleepDuration: defaultSleepDuration,
   }
}

在实例化过程中,会通过 watch.NewBroadcaster 函数在内部启动 goroutine(即 m.loop 函数)来监控 m.incoming,并将监控的事件通过 m.distribute 函数分发给所有已连接的 broadcasterWatcher。分发过程有两种机制,分别是非阻塞分发机制和阻塞分发机制。在非阻塞分发机制下使用 DropIfChannelFull 标识,在阻塞分发机制下使用 WaitIfChannelFull 标识,默认为 DropIfChannelFull 标识。代码路径:

vendor/k8s.io/apimachinery/pkg/watch/mux.go

// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
  if m.fullChannelBehavior == DropIfChannelFull {
    for _, w := range m.watchers {
      select {
      case w.result <- event:
      case <-w.stopped:
      default: // Don't block if the event can't be queued.
      }
    }
  } else {
    for _, w := range m.watchers {
      select {
      case w.result <- event:
      case <-w.stopped:
      }
    }
  }
}

在分发过程中,DropIfChannelFull 标识位于 select 多路复用中,使用 default 关键字做非阻塞分发,当 w.result 缓冲区满的时候,事件会丢失。WaitIfChannelFull 标识也位于 select 多路复用中,没有 default 关键字,当 w.result 缓冲区满的时候,分发过程会阻塞并等待。

注意 :Kubernetes 中的事件与其他的资源不同,它有一个很重要的特性,那就是它可以丢失。因为随着 Kubernetes 系统集群规模越来越大,上报的事件越来越多,每次上报事件都要对 Etcd 集群进行读/写,这样会给 Etcd 集群带来很大的压力。如果某个事件丢失了,并不会影响集群的正常工作,事件的重要性远低于集群的稳定性,所以可以看到源码中当 w.result 缓冲区满的时候,在非阻塞分发机制下事件会丢失。

3.broadcasterWatcher

broadcasterWatcher 是每个 Kubernetes 系统组件自定义处理事件的方式。例如,上报事件至 Kubernetes API Server。每个 broadcasterWatcher 拥有两种自定义处理事件的函数,分别介绍如下。

● StartLogging :将事件写入日志中。

● StartRecordingToSink :将事件上报至 Kubernetes API Server 并存储至 Etcd 集群。

以 kube-scheduler 组件为例,该组件作为一个 broadcasterWatcher,通过 StartLogging 函数将事件输出至 klog stdout 标准输出,通过 StartRecordingToSink 函数将关键性事件上报至 Kubernetes API Server。代码:

cmd/kube-scheduler/app/server.go

// Prepare the event broadcaster.
  cc.EventBroadcaster.StartRecordingToSink(ctx.Done())

StartLogging 和 StartRecordingToSink 函数依赖于 StartEventWatcher 函数,该函数内部运行了一个 goroutine,用于不断监控 EventBroadcaster 来发现事件并调用相关函数对事件进行处理。

下面重点介绍一下 StartRecordingToSink 函数,kube-scheduler 组件将 v1core.EventSinkImpl 作为上报事件的自定义函数。上报事件有 3 种方法,分别是 Create(即 Post 方法)、Update(即 Put 方法)、Patch(Patch 方法)。以 Create 方法为例,Create→e.Interface.CreateWithEventNamespace 代码示例如下:

代码路径:vendor/k8s.io/client-go/kubernetes/typed/core/v1/event_expansion.go

// CreateWithEventNamespace makes a new event. Returns the copy of the event the server returns,
// or an error. The namespace to create the event within is deduced from the
// event; it must either match this event client's namespace, or this event
// client must have been created with the "" namespace.
func (e *events) CreateWithEventNamespace(event *v1.Event) (*v1.Event, error) {
  if e.ns != "" && event.Namespace != e.ns {
    return nil, fmt.Errorf("can't create an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns)
  }
  result := &v1.Event{}
  err := e.client.Post().
    NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0).
    Resource("events").
    Body(event).
    Do(context.TODO()).
    Into(result)
  return result, err
}

上报过程通过 RESTClient 发送 Post 请求,将事件发送至 Kubernetes API Server,最终存储在 Etcd 集群中

Kubernetes源码阅读 文章被收录于专栏

Kubernetes源码阅读

全部评论

相关推荐

评论
点赞
收藏
分享
牛客网
牛客企业服务