nsq 之 nsqd 源码解析

nsq 之 nsqd 源码解析

nsqd 是nsq 最核心的组件,它负责负责接收、排队、投递消息给客户端,一方面它要接收生产者生产的消息,要和producer通信,一方面它要投递消息给消费者,要和consumer通信,一方面它要和nsqlookupd通信来向其报告它的信息,所以nsqd地处核心,乃兵家必争之地,本节就从源码角度来分析nsqd,通过本节后可以理清nsqd重要的数据结构以及一条消息从生产到入队列到被消费到底经历了什么。

nsqd运行流程图

再看源码之前我觉得用图来理清一些流程和脉络是事半功倍的,nsqdtopicchannel等重要概念已经在上一节介绍过了,nsqd 的运行流程也是基于这些重要的概念来的,我从自己的理解角度出发画了下面这张nsqd 运行流程图:

图片说明

对比这张图简单解释一下,在nsqd.Main 函数启动运行之前会去解析配置加载topicchannel 以及存储在磁盘的message,之后调用nsqd.Mainnsqd.Main 函数里面它会开启一个TCPServerclient(对应的消费者或者生产者) 进行TCP通信(PUB 生产、SUB订阅等命令),接收请求->执行请求->返回响应。开启一个HTTP server (其实另外还有一个HTTPS server)来接收clientHTTP 请求(比如创建/删除topic、创建/删除 channel等)。开启一个queueScanLoop(本质上是一组goroutine) 来处理延时和超时的消息。开启一个lookupLoop (goroutine)来和lookupd 组件通信。在退出时也会将元数据(topicchannelmessage) 持久化到文件。这就是nsqd 的运行流程以及工作内容。

message 流转路径图

除了nsqd 运行流程图之外,message 的流转路径图也是很重要的,它能让我们知道一条消息从被生产者生产->nsqd->被消费者消费的流转路径,我也以自己的理解花了下面的message 流转路径图:

图片说明

还是对比这张图解释一下,

  • topic ,一个topic的消息会被存储在它的内存消息队列和磁盘消息队列中(当内存消息数量已经达到配置的最大数量时) ,并且一个topic 在被初始化的时候会有一个对应的messagePump goroutine 专门负责从这个topic 的内存消息队列和磁盘消息队列里面读取数据放到它下面所有的channel 里面
  • channel ,一个channel的消息同样会被存储在它的内存消息队列和磁盘消息队列中(当内存消息数量已经达到配置的最大数量时),在TCPServer Handler 里面,会为每一个client 开启一个messagePump goroutine 来专门负责从它订阅的channel 的消息队列(内存消息队列和磁盘消息队列)里面取消息然后写回响应给对应consumer ,如果有多个consumer 订阅了同一个topic下的同一个channel ,则会形成竞争关系。

源码分析之前

上面两张图描述了nsqd 最核心的也是最主要的流程,但是本文不可能把所有nsqd 的功能实现都讲解,比如nsqdchannel 除了常规的消息队列之外还支持延时消息队列,nsqd除了pub之外还支持mpub 一次性发布多条消息等等。本文受限于篇幅只能分析主要的流程,即一条常规消息的生产到被消费,相信弄清主流程足以能够快速理解其它部分以及体会nsqd 的设计。

重要的数据结构

弄清楚重要数据结构对源码分析来说是至关重要的,所以直接来看这些重要数据结构的源代码,为了能够更清晰,省略了一些次要的字段:

  • NSQD

    type NSQD struct {
       // 配置项
       opts atomic.Value
    
       // 是否正在初始化
       isLoading int32
    
       // 该NSQD 拥有的Topic
       topicMap map[string]*Topic
    
       // 和该NSQD通信的所有client
       clients    map[int64]Client
    
       // 和该NSQD通信的所有 lookupd
       lookupPeers atomic.Value
    
       // TCP handler
       tcpServer     *tcpServer
       // TCP listener
       tcpListener   net.Listener
       // HTTP listener
       httpListener  net.Listener
       // HTTPS listener
       httpsListener net.Listener
       tlsConfig     *tls.Config
       poolSize int
    
       // topic/channel 增加/删除 变更通知channel
       notifyChan           chan interface{}
       // 配置变更通知
       optsNotificationChan chan struct{}
       // exit channel
       exitChan             chan int
      // nsqd waitGroup 
       waitGroup            util.WaitGroupWrapper
    }

    源代码

  • topic

    type Topic struct {
       // topic name
       name              string
       // 该topic下所有的channel
       channelMap        map[string]*Channel
       // 存储该topic 消息的磁盘队列
       backend           BackendQueue
       // 存储该topic 消息的内存队列
       memoryMsgChan     chan *Message
    
       // topic start 通知 channel
       startChan         chan int
       // topic exit 通知 channel
       exitChan          chan int
       // topic 下channel 变更通知 channel
       channelUpdateChan chan int
       // 是否正在退出
       exitFlag          int32
       // 是否临时topic(不会持久topic和channel信息)
       ephemeral      bool
       // 删除topic时调用的回调函数
       deleteCallback func(*Topic)
    
       // 是否暂停
       paused    int32
       // topic 暂停通知channel
       pauseChan chan int
    }

    源代码

  • channel

    type Channel struct {
       // requeue(重新入队列消费的消息数)
       requeueCount uint64
       // channel的消息数量
       messageCount uint64
       // 超时(未得到消费者FIN)的消息数量
       timeoutCount uint64
    
       // 所属的topic name
       topicName string
       // channel name
       name      string
    
       //存储该channel 消息的磁盘队列
       backend BackendQueue
       // 存储该channel 消息的内存队列
       memoryMsgChan chan *Message
       // 是否正在退出
       exitFlag      int32
    
       // 该channel 的消费者
       clients        map[int64]Consumer
       // 该channel 是否暂停
       paused         int32
       // 该channel 是否临时
       ephemeral      bool
       // 删除该channel 执行的callback
       deleteCallback func(*Channel)
    
       // 延时消息
       deferredMessages map[MessageID]*pqueue.Item
       // 延时消息队列
       deferredPQ       pqueue.PriorityQueue
       // 等待客户端消费完确认的消息
       inFlightMessages map[MessageID]*Message
       // 等待消费确认的消息队列
       inFlightPQ       inFlightPqueue
    }

    源代码

  • client

    type clientV2 struct {
       // 该client 订阅的目前可以被发送的消息数量
       ReadyCount    int64
       // 该client 订阅的目前已经发送但是等待client 确认消费的消息数量
       InFlightCount int64
       // 发送给该client 的消息数量
       MessageCount  uint64
       // 收到该 client 消费确认的消息数量
       FinishCount   uint64
       // 入队列重新供该client 消费的消息数量
       RequeueCount  uint64
       // key topic value published messages count
       pubCounts map[string]uint64
    
       writeLock sync.RWMutex
       metaLock  sync.RWMutex
    
       // TCP connection
       net.Conn
    
       // reading/writing interfaces
       Reader *bufio.Reader
       Writer *bufio.Writer
    
       OutputBufferSize    int
       // 如果距离上一次响应client的时间超过OutputBufferTimeout则强制flush writer buffer来响应client
       OutputBufferTimeout time.Duration
    
      // 心跳定时器
       HeartbeatInterval time.Duration
    
      // message timeout 时长
       MsgTimeout time.Duration
    
       // client 认证channel
       IdentifyEventChan chan identifyEvent
       // 订阅的topic channel
       SubEventChan      chan *Channel
    }

    源代码

源码分析流程

  • NSQD.Main

    func (n *NSQD) Main() error {
        ctx := &context{n}
    
        exitCh := make(chan error)
        var once sync.Once
        exitFunc := func(err error) {
            once.Do(func() {
                if err != nil {
                    n.logf(LOG_FATAL, "%s", err)
                }
                exitCh <- err
            })
        }
    
        n.tcpServer.ctx = ctx
        // 启动TCPServer 监听响应TCP请求
        n.waitGroup.Wrap(func() {
            exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
        })
        // 启动HTTPServer 监听响应HTTP请求
        httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
        n.waitGroup.Wrap(func() {
            exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
        })
    
        // 启动HTTPSServer 监听响应HTTPS请求
        if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
            httpsServer := newHTTPServer(ctx, true, true)
            n.waitGroup.Wrap(func() {
                exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
            })
        }
    
        // 开启一组工作goroutine 来处理延时和超时消息
        n.waitGroup.Wrap(n.queueScanLoop)
        // lookupLoop来和lookupd组件通信同步该nsqd的相关消息(端口地址,topic/channel 信息)
        n.waitGroup.Wrap(n.lookupLoop)
        if n.getOpts().StatsdAddress != "" {
            n.waitGroup.Wrap(n.statsdLoop)
        }
    
        err := <-exitCh
        return err
    }

    源代码

    可以看到NSQD.Main 开启了TCP serverHTTP serverHTTPS server、一组专门用来处理延时和超时消息的goroutine以及和lookupd 通信的goroutine,来看看TCP server 干了什么事。

  • TCP Handler

    func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
       logf(lg.INFO, "TCP: listening on %s", listener.Addr())
    
       var wg sync.WaitGroup
    
       for {
         // accept TCP 连接
          clientConn, err := listener.Accept()
          if err != nil {
             if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                logf(lg.WARN, "temporary Accept() failure - %s", err)
                runtime.Gosched()
                continue
             }
             // theres no direct way to detect this error because it is not exposed
             if !strings.Contains(err.Error(), "use of closed network connection") {
                return fmt.Errorf("listener.Accept() error - %s", err)
             }
             break
          }
    
          wg.Add(1)
          go func() {
            // 处理连接
             handler.Handle(clientConn)
             wg.Done()
          }()
       }
    
       // 等待所有的gorotine退出
       wg.Wait()
    
       logf(lg.INFO, "TCP: closing %s", listener.Addr())
    
       return nil
    }

    源代码

    针对每一个连接直接开启go hander.Handle(clientConn) 去处理,而Handle 方法里面调用了TCPServer.IoLoop 方法,来看看TCPServer.IoLoop

    func (p *protocolV2) IOLoop(conn net.Conn) error {
        var err error
        var line []byte
        var zeroTime time.Time
    
        clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
        // new client by conn
        client := newClientV2(clientID,

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

go高薪必备:面试框架17讲 文章被收录于专栏

<p> <span style="font-size:14px;">本专刊是Go开源项目源码分析专栏,共 17 篇文章,挑选了Go 开源界知名的 4 个开源项目gnet(高效的网络库)、gin(知名的Go微型web框架)、fasthttp(高性能web框架)、nsq(Go消息队列)来对它们进行源码分析,分析它们的设计思想和代码实现。每个项目的讲解都是由浅入深,由设计思想的剖析到源码实现的分析,更易于读者理解。</span> </p> <p> <br /> </p> <h2> <b><span style="font-size:16px;line-height:1;">购买须知:</span></b> </h2> <span style="font-size:14px;">订阅成功后,用户即可通过牛客网 PC 端、App 端享有永久阅读的权限;</span><br /> <span style="font-size:14px;">牛客专刊为虚拟内容服务,订阅成功后概不退款;</span><br /> <span style="font-size:14px;line-height:1;">在专刊阅</span><span style="font-size:14px;line-height:1;">读过程中,如有任何问题,可在文章评论区底部留言,或添加牛客导师,加入读者交流群;</span><br /> <span style="font-size:14px;">想成为牛客作者,请邮件联系yinxiaoxiao@nowcoder.com,邮件主题【牛客作者+写作方向】,并附上个人简历一份及近期作品一份;</span><br /> <p> <span style="font-size:14px;">牛客专刊版权归本平台所有,任何机构、媒体、网站或个人未经本网协议授权不得转载、链接、转贴或以其他方式复制发布 / 发表,违者将依法追究责任</span><span style="font-size:14px;">。</span> </p> <p> <br /> </p>

全部评论

相关推荐

不愿透露姓名的神秘牛友
11-26 15:46
已编辑
字节国际 电商后端 24k-35k
点赞 评论 收藏
分享
10-10 17:54
点赞 评论 收藏
分享
评论
点赞
1
分享
牛客网
牛客企业服务