nsq 之 nsqd 源码解析
nsq 之 nsqd 源码解析
nsqd 是nsq 最核心的组件,它负责负责接收、排队、投递消息给客户端,一方面它要接收生产者生产的消息,要和producer通信,一方面它要投递消息给消费者,要和consumer通信,一方面它要和nsqlookupd通信来向其报告它的信息,所以nsqd地处核心,乃兵家必争之地,本节就从源码角度来分析nsqd,通过本节后可以理清nsqd重要的数据结构以及一条消息从生产到入队列到被消费到底经历了什么。
nsqd运行流程图
再看源码之前我觉得用图来理清一些流程和脉络是事半功倍的,nsqd
的topic
、channel
等重要概念已经在上一节介绍过了,nsqd
的运行流程也是基于这些重要的概念来的,我从自己的理解角度出发画了下面这张nsqd
运行流程图:
对比这张图简单解释一下,在nsqd.Main
函数启动运行之前会去解析配置加载topic
、channel
以及存储在磁盘的message
,之后调用nsqd.Main
,nsqd.Main
函数里面它会开启一个TCPServer
和client
(对应的消费者或者生产者) 进行TCP
通信(PUB
生产、SUB
订阅等命令),接收请求->执行请求->返回响应。开启一个HTTP server
(其实另外还有一个HTTPS server
)来接收client
的HTTP
请求(比如创建/删除topic、创建/删除 channel等)。开启一个queueScanLoop
(本质上是一组goroutine) 来处理延时和超时的消息。开启一个lookupLoop
(goroutine)来和lookupd
组件通信。在退出时也会将元数据(topic
、channel
、message
) 持久化到文件。这就是nsqd
的运行流程以及工作内容。
message 流转路径图
除了nsqd
运行流程图之外,message
的流转路径图也是很重要的,它能让我们知道一条消息从被生产者生产->nsqd->被消费者消费的流转路径,我也以自己的理解花了下面的message
流转路径图:
还是对比这张图解释一下,
topic
,一个topic
的消息会被存储在它的内存消息队列和磁盘消息队列中(当内存消息数量已经达到配置的最大数量时) ,并且一个topic
在被初始化的时候会有一个对应的messagePump
goroutine 专门负责从这个topic
的内存消息队列和磁盘消息队列里面读取数据放到它下面所有的channel
里面channel
,一个channel
的消息同样会被存储在它的内存消息队列和磁盘消息队列中(当内存消息数量已经达到配置的最大数量时),在TCPServer Handler
里面,会为每一个client
开启一个messagePump
goroutine 来专门负责从它订阅的channel
的消息队列(内存消息队列和磁盘消息队列)里面取消息然后写回响应给对应consumer
,如果有多个consumer
订阅了同一个topic
下的同一个channel
,则会形成竞争关系。
源码分析之前
上面两张图描述了nsqd
最核心的也是最主要的流程,但是本文不可能把所有nsqd
的功能实现都讲解,比如nsqd
的channel
除了常规的消息队列之外还支持延时消息队列,nsqd
除了pub
之外还支持mpub
一次性发布多条消息等等。本文受限于篇幅只能分析主要的流程,即一条常规消息的生产到被消费,相信弄清主流程足以能够快速理解其它部分以及体会nsqd
的设计。
重要的数据结构
弄清楚重要数据结构对源码分析来说是至关重要的,所以直接来看这些重要数据结构的源代码,为了能够更清晰,省略了一些次要的字段:
NSQD
type NSQD struct { // 配置项 opts atomic.Value // 是否正在初始化 isLoading int32 // 该NSQD 拥有的Topic topicMap map[string]*Topic // 和该NSQD通信的所有client clients map[int64]Client // 和该NSQD通信的所有 lookupd lookupPeers atomic.Value // TCP handler tcpServer *tcpServer // TCP listener tcpListener net.Listener // HTTP listener httpListener net.Listener // HTTPS listener httpsListener net.Listener tlsConfig *tls.Config poolSize int // topic/channel 增加/删除 变更通知channel notifyChan chan interface{} // 配置变更通知 optsNotificationChan chan struct{} // exit channel exitChan chan int // nsqd waitGroup waitGroup util.WaitGroupWrapper }
topic
type Topic struct { // topic name name string // 该topic下所有的channel channelMap map[string]*Channel // 存储该topic 消息的磁盘队列 backend BackendQueue // 存储该topic 消息的内存队列 memoryMsgChan chan *Message // topic start 通知 channel startChan chan int // topic exit 通知 channel exitChan chan int // topic 下channel 变更通知 channel channelUpdateChan chan int // 是否正在退出 exitFlag int32 // 是否临时topic(不会持久topic和channel信息) ephemeral bool // 删除topic时调用的回调函数 deleteCallback func(*Topic) // 是否暂停 paused int32 // topic 暂停通知channel pauseChan chan int }
channel
type Channel struct { // requeue(重新入队列消费的消息数) requeueCount uint64 // channel的消息数量 messageCount uint64 // 超时(未得到消费者FIN)的消息数量 timeoutCount uint64 // 所属的topic name topicName string // channel name name string //存储该channel 消息的磁盘队列 backend BackendQueue // 存储该channel 消息的内存队列 memoryMsgChan chan *Message // 是否正在退出 exitFlag int32 // 该channel 的消费者 clients map[int64]Consumer // 该channel 是否暂停 paused int32 // 该channel 是否临时 ephemeral bool // 删除该channel 执行的callback deleteCallback func(*Channel) // 延时消息 deferredMessages map[MessageID]*pqueue.Item // 延时消息队列 deferredPQ pqueue.PriorityQueue // 等待客户端消费完确认的消息 inFlightMessages map[MessageID]*Message // 等待消费确认的消息队列 inFlightPQ inFlightPqueue }
client
type clientV2 struct { // 该client 订阅的目前可以被发送的消息数量 ReadyCount int64 // 该client 订阅的目前已经发送但是等待client 确认消费的消息数量 InFlightCount int64 // 发送给该client 的消息数量 MessageCount uint64 // 收到该 client 消费确认的消息数量 FinishCount uint64 // 入队列重新供该client 消费的消息数量 RequeueCount uint64 // key topic value published messages count pubCounts map[string]uint64 writeLock sync.RWMutex metaLock sync.RWMutex // TCP connection net.Conn // reading/writing interfaces Reader *bufio.Reader Writer *bufio.Writer OutputBufferSize int // 如果距离上一次响应client的时间超过OutputBufferTimeout则强制flush writer buffer来响应client OutputBufferTimeout time.Duration // 心跳定时器 HeartbeatInterval time.Duration // message timeout 时长 MsgTimeout time.Duration // client 认证channel IdentifyEventChan chan identifyEvent // 订阅的topic channel SubEventChan chan *Channel }
源码分析流程
NSQD.Main
func (n *NSQD) Main() error { ctx := &context{n} exitCh := make(chan error) var once sync.Once exitFunc := func(err error) { once.Do(func() { if err != nil { n.logf(LOG_FATAL, "%s", err) } exitCh <- err }) } n.tcpServer.ctx = ctx // 启动TCPServer 监听响应TCP请求 n.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) }) // 启动HTTPServer 监听响应HTTP请求 httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) // 启动HTTPSServer 监听响应HTTPS请求 if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)) }) } // 开启一组工作goroutine 来处理延时和超时消息 n.waitGroup.Wrap(n.queueScanLoop) // lookupLoop来和lookupd组件通信同步该nsqd的相关消息(端口地址,topic/channel 信息) n.waitGroup.Wrap(n.lookupLoop) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(n.statsdLoop) } err := <-exitCh return err }
可以看到
NSQD.Main
开启了TCP server
、HTTP server
、HTTPS server
、一组专门用来处理延时和超时消息的goroutine
以及和lookupd
通信的goroutine,来看看TCP server
干了什么事。TCP Handler
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error { logf(lg.INFO, "TCP: listening on %s", listener.Addr()) var wg sync.WaitGroup for { // accept TCP 连接 clientConn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { logf(lg.WARN, "temporary Accept() failure - %s", err) runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { return fmt.Errorf("listener.Accept() error - %s", err) } break } wg.Add(1) go func() { // 处理连接 handler.Handle(clientConn) wg.Done() }() } // 等待所有的gorotine退出 wg.Wait() logf(lg.INFO, "TCP: closing %s", listener.Addr()) return nil }
针对每一个连接直接开启
go hander.Handle(clientConn)
去处理,而Handle
方法里面调用了TCPServer.IoLoop
方法,来看看TCPServer.IoLoop
:func (p *protocolV2) IOLoop(conn net.Conn) error { var err error var line []byte var zeroTime time.Time clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) // new client by conn client := newClientV2(clientID,
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
<p> <span style="font-size:14px;">本专刊是Go开源项目源码分析专栏,共 17 篇文章,挑选了Go 开源界知名的 4 个开源项目gnet(高效的网络库)、gin(知名的Go微型web框架)、fasthttp(高性能web框架)、nsq(Go消息队列)来对它们进行源码分析,分析它们的设计思想和代码实现。每个项目的讲解都是由浅入深,由设计思想的剖析到源码实现的分析,更易于读者理解。</span> </p> <p> <br /> </p> <h2> <b><span style="font-size:16px;line-height:1;">购买须知:</span></b> </h2> <span style="font-size:14px;">订阅成功后,用户即可通过牛客网 PC 端、App 端享有永久阅读的权限;</span><br /> <span style="font-size:14px;">牛客专刊为虚拟内容服务,订阅成功后概不退款;</span><br /> <span style="font-size:14px;line-height:1;">在专刊阅</span><span style="font-size:14px;line-height:1;">读过程中,如有任何问题,可在文章评论区底部留言,或添加牛客导师,加入读者交流群;</span><br /> <span style="font-size:14px;">想成为牛客作者,请邮件联系yinxiaoxiao@nowcoder.com,邮件主题【牛客作者+写作方向】,并附上个人简历一份及近期作品一份;</span><br /> <p> <span style="font-size:14px;">牛客专刊版权归本平台所有,任何机构、媒体、网站或个人未经本网协议授权不得转载、链接、转贴或以其他方式复制发布 / 发表,违者将依法追究责任</span><span style="font-size:14px;">。</span> </p> <p> <br /> </p>