Node Exporter源码分析以及二次开发,玩转Prom

Node Exporter源码分析

Node Exporter 是一款使用 Go 语言编写的基于 Prometheus 生态的采集器程序,它可以用来采集目标机器的 CPU、内存、磁盘、网络 I/O 等指标信息,最后等待 Prometheus Server 通过 HTTP 接口拉取指标数据存储到 Prometheus Server 内置的 TSDB 中。

Node Exporter 是基于 Prometheus 的 Client SDK 进行编写采集逻辑的,它本身内部也提供了很多可供的扩展机制。比如说它定义的 Collector 接口可以让用户进行扩展自定义自己的采集器逻辑。那么就让我们来分析一下 Node Exporter 的源码实现。

注:Prometheus Client Golang 包在下文同称为 SDK。

启动入口

命令行参数

在 Node Exporter 根目录下可以看到一个名为 node_exporter.go 的文件,这是 Node Exporter 的程序启动入口,这个文件包含一个 main 函数。找到 main 函数可以看到它使用 kingpin 包定义了很多命令行参数,也有默认值:

func main() {
	var (
		metricsPath = kingpin.Flag(
			"web.telemetry-path",
			"Path under which to expose metrics.",
		).Default("/metrics").String()
		disableExporterMetrics = kingpin.Flag(
			"web.disable-exporter-metrics",
			"Exclude metrics about the exporter itself (promhttp_*, process_*, go_*).",
		).Bool()
		maxRequests = kingpin.Flag(
			"web.max-requests",
			"Maximum number of parallel scrape requests. Use 0 to disable.",
		).Default("40").Int()
		disableDefaultCollectors = kingpin.Flag(
			"collector.disable-defaults",
			"Set all collectors to disabled by default.",
		).Default("false").Bool()
		maxProcs = kingpin.Flag(
			"runtime.gomaxprocs", "The target number of CPUs Go will run on (GOMAXPROCS)",
		).Envar("GOMAXPROCS").Default("1").Int()
		toolkitFlags = kingpinflag.AddFlags(kingpin.CommandLine, ":9100")
	)
    // ...
  • metricsPath:定义了获取指标的 HTTP URL,默认是 /metrics
  • disableExporterMetrics:定义了是否禁用 Exporter 自身的指标,比如说 promhttp_*process_*go_*
  • maxRequests:定义了并行采集请求的最大数量,默认是 40。设置 0 可以禁用。
  • disableDefaultCollectors:定义了是否禁用默认的采集器,默认是 false,表示全部启用。
  • maxProcs:定义了 Go 运行时最大逻辑处理器数量,默认是 1。
  • toolkitFlags:定义了一些工具参数,这里是指启动的端口号为 :9100

日志信息

接着往下面看:

promlogConfig := &promlog.Config{}
flag.AddFlags(kingpin.CommandLine, promlogConfig)
kingpin.Version(version.Print("node_exporter"))
kingpin.CommandLine.UsageWriter(os.Stdout)
kingpin.HelpFlag.Short('h')
kingpin.Parse()
logger := promlog.New(promlogConfig)

if *disableDefaultCollectors {
    collector.DisableDefaultCollectors()
}

level.Info(logger).Log("msg", "Build context", "build_context", version.BuildContext())
if user, err := user.Current(); err == nil && user.Uid == "0" {
    level.Warn(logger).Log("msg", "Node Exporter is running as root user. This exporter is designed to run as unprivileged user, root is not required.")
}
runtime.GOMAXPROCS(*maxProcs)
level.Debug(logger).Log("msg", "Go MAXPROCS", "procs", runtime.GOMAXPROCS(0))

打印了一些日志信息。

路由挂载

func main() {
    // ...
    
    // 设置默认的路由对应的处理器,这边用的处理器是 Node Exporter 自定义的 Handler
    http.Handle(*metricsPath, newHandler(!*disableExporterMetrics, *maxRequests, logger))
	if *metricsPath != "/" {
		landingConfig := web.LandingConfig{
			Name:        "Node Exporter",
			Description: "Prometheus Node Exporter",
			Version:     version.Info(),
			Links: []web.LandingLinks{
				{
					Address: *metricsPath,
					Text:    "Metrics",
				},
			},
		}
        // 如果访问根路径,那么会显示一个内置的引导页面
		landingPage, err := web.NewLandingPage(landingConfig)
		if err != nil {
			level.Error(logger).Log("err", err)
			os.Exit(1)
		}
		http.Handle("/", landingPage)
	}

    // 启动服务器
	if err := web.ListenAndServe(httpServer.Server, toolkitFlags, logger); err != nil {
		level.Error(logger).Log("err", err)
		os.Exit(1)
	}
}

在上面的代码中,最核心的部分就是 newHandler 函数,这个函数返回了 Node Exporter 自定义的 HTTP Handler,这个 Handler 借助了 Prometheus Glient SDK 而实现的。

自定义路由处理器

handler结构体

上文说到 Node Exporter 自定义了一个 HTTP Handler 来处理请求,在 main 函数上面可以找到对应的实现:

package main

import (
	"fmt"
	stdlog "log"
	"net/http"
	_ "net/http/pprof"
	"os"
	"os/user"
	"runtime"
	"sort"

	"github.com/prometheus/common/promlog"
	"github.com/prometheus/common/promlog/flag"
	"github.com/alecthomas/kingpin/v2"
	"github.com/go-kit/log"
	"github.com/go-kit/log/level"
	"github.com/prometheus/client_golang/prometheus"
	promcollectors "github.com/prometheus/client_golang/prometheus/collectors"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"github.com/prometheus/common/version"
	"github.com/prometheus/exporter-toolkit/web"
	"github.com/prometheus/exporter-toolkit/web/kingpinflag"
    
	"github.com/prometheus/node_exporter/collector"
)

// handler wraps an unfiltered http.Handler but uses a filtered handler,
// created on the fly, if filtering is requested. Create instances with
// newHandler.
type handler struct {
	unfilteredHandler http.Handler
	// exporterMetricsRegistry is a separate registry for the metrics about
	// the exporter itself.
	exporterMetricsRegistry *prometheus.Registry
	includeExporterMetrics  bool
	maxRequests             int
	logger                  log.Logger
}

func newHandler(includeExporterMetrics bool, maxRequests int, logger log.Logger) *handler {
	h := &handler{
		exporterMetricsRegistry: prometheus.NewRegistry(),
		includeExporterMetrics:  includeExporterMetrics,
		maxRequests:             maxRequests,
		logger:                  logger,
	}
	if h.includeExporterMetrics {
		h.exporterMetricsRegistry.MustRegister(
			promcollectors.NewProcessCollector(promcollectors.ProcessCollectorOpts{}),
			promcollectors.NewGoCollector(),
		)
	}
    
    // 这里就表示不传入过滤条件,把unfilteredHandler赋值为默认创建的innerHandler
	if innerHandler, err := h.innerHandler(); err != nil {
		panic(fmt.Sprintf("Couldn't create metrics handler: %s", err))
	} else {
		h.unfilteredHandler = innerHandler
	}
	return h
}

// ServeHTTP implements http.Handler.
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // 这里可以指定查询参数collect[]字段可以获取指定的采集器的指标
	filters := r.URL.Query()["collect[]"]
	level.Debug(h.logger).Log("msg", "collect query:", "filters", filters)

    // 没有过滤条件,那么就使用unfilteredHandler字段
	if len(filters) == 0 {
		// No filters, use the prepared unfiltered handler.
		h.unfilteredHandler.ServeHTTP(w, r)
		return
	}
	// To serve filtered metrics, we create a filtering handler on the fly.
    // 有过滤条件,那就调用innerHandler方法传入过滤条件
	filteredHandler, err := h.innerHandler(filters...)
	if err != nil {
		level.Warn(h.logger).Log("msg", "Couldn't create filtered metrics handler:", "err", err)
		w.WriteHeader(http.StatusBadRequest)
		w.Write([]byte(fmt.Sprintf("Couldn't create filtered metrics handler: %s", err)))
		return
	}
	filteredHandler.ServeHTTP(w, r)
}

从上面的 handler 结构体中可以看到,这个结构体内部组合了一个 http.Hanlder,还有 Client SDK 的 *prometheus.Registry 以及其他几个不是太重要的字段。

http 包有一个 Handler 接口,定义了 ServeHTTP 方法,这里这个 handler 实现了 ServeHTTP 方法,也就是说它是 Handler 接口的子类型。

调用 ServerHTTP 方法之前,那么必然先调用了构造函数创建了 handler 的实例。在 newHandler 构造函数中,exporterMetricsRegistry 字段赋值使用了 Client SDK 提供的 prometheus.NewRegistry() 构造函数,这个构造函数可以自定义采集器注册器而不用 SDK 默认提供的 DefaultRegisterer 对象。

在 **********/prometheus/registry.go 文件中可以看到如下定义:

image-20230817150658039.png

newHandler 构造函数中,定义了 handler 结构体之后紧接着判断是否包含 Exporter 本身的指标,如果包含,那么就调用 NewProcessCollectorNewGoCollector 构造函数分别创建进程采集器和 Go 运行时采集器然后注册到 handler 内部的 exporterMetricsRegistry 注册器中。这样 SDK 的注册器中就知道了有采集器注册了。

到最后调用了 innerHandler 函数,创建一个内部 Handler 对象用于做一些过滤操作。代码如下:

innerHandler函数

// innerHandler is used to create both the one unfiltered http.Handler to be
// wrapped by the outer handler and also the filtered handlers created on the
// fly. The former is accomplished by calling innerHandler without any arguments
// (in which case it will log all the collectors enabled via command-line
// flags).
func (h *handler) innerHandler(filters ...string) (http.Handler, error) {
    // 这里调用了NewNodeCollector构造函数创建节点采集器对象
    // 注入logger对象和过滤器字符串可变参数
	nc, err := collector.NewNodeCollector(h.logger, filters...)
	if err != nil {
		return nil, fmt.Errorf("couldn't create collector: %s", err)
	}

	// Only log the creation of an unfiltered handler, which should happen
	// only once upon startup.
    // 没有过滤条件那么则启用所有的采集器,这里的采集器说的是子采集器
    // 到后面会详细说明子采集器,只要记住是在Node Collector内部创建出来的
	if len(filters) == 0 {
		level.Info(h.logger).Log("msg", "Enabled collectors")
		collectors := []string{}
		for n := range nc.Collectors {
			collectors = append(collectors, n)
		}
		sort.Strings(collectors)
		for _, c := range collectors {
			level.Info(h.logger).Log("collector", c)
		}
	}

    // 创建了一个新的注册器来注册采集器,这个采集器用于采集当前的版本信息
	r := prometheus.NewRegistry()
	r.MustRegister(version.NewCollector("node_exporter"))
	if err := r.Register(nc); err != nil {
		return nil, fmt.Errorf("couldn't register node collector: %s", err)
	}
    
    // 核心代码!!
    // 之前的一切工作都是为了这一步做准备,这里使用了 SDK 的 promhttp 包创建了一个新的http.Handler
    // 第一个参数是 prometheus.Gatherers,它是 []Gatherer 的别名,意思是传递一个注册器切片
    // 因为 Registry 实现了 Gatherer 接口,重写了 Gather 方法所以可以传入进去
    // 而 Gatherers 又实现了 Gatherer 接口,所以可以传入 HandlerFor 函数
    // 第二个参数表示创建 Handler 的选项,传入了 h.exporterMetricsRegistry 的字段
    handler := promhttp.HandlerFor(
        prometheus.Gatherers{h.exporterMetricsRegistry, r},
        promhttp.HandlerOpts{
            ErrorLog:            stdlog.New(log.NewStdlibAdapter(level.Error(h.logger)), "", 0),
            ErrorHandling:       promhttp.ContinueOnError,
            MaxRequestsInFlight: h.maxRequests,
            Registry:            h.exporterMetricsRegistry,
        },
    )
    if h.includeExporterMetrics {
        // Note that we have to use h.exporterMetricsRegistry here to
        // use the same promhttp metrics for all expositions.
        // 为了让所有的注册器的采集器都共用一个Handler
        handler = promhttp.InstrumentMetricHandler(
                h.exporterMetricsRegistry, handler,
        )
    }
    return handler, nil
}

Node Collector

全部评论

相关推荐

牛客162194370号:
点赞 评论 收藏
分享
牛客868257804号:九个中铁八个中建
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务