nsq 之 nsqlookupd 源码解析
nsq 之 nsqlookupd 源码解析
nsqlookupd是nsq管理集群拓扑信息以及用于注册和发现nsqd服务;所以,也可以把nsqlookupd理解为注册发现服务;当nsq集群中有多个nsqlookupd服务时,因为每个nsqd都会向所有的nsqlookupd上报本地信息,因此nsqlookupd具有最终一致性。
重点数据结构
NSQLookupd
NSQLookupd
是nsqlookupd
的核心数据结构,它包含了nsqlookupd
启动和服务的核心配置和组件。type NSQLookupd struct { sync.RWMutex // 配置项 opts *Options // TCP listener tcpListener net.Listener // HTTP listener httpListener net.Listener // TCP handler tcpServer *tcpServer waitGroup util.WaitGroupWrapper // 存储topic、channel对应的生产者信息,可以看作是一个注册中心 DB *RegistrationDB }
Registration
Registration
相关可以看作类似注册中心,负责注册记录各个nsqd
实例上的topic
、channel
信息。// RegistrationDB type RegistrationDB struct { sync.RWMutex // registrationMap 记录了每个topic/channel 对应的一组生产者 registrationMap map[Registration]ProducerMap } // RegistrationDB.AddRegistration 添加一个Registration // RegistrationDB.AddProducer 为一个Registration 添加一个producer // RegistrationDB.RemoveProducer 为一个Registration 移除一个producer // RegistrationDB.RemoveRegistration 移除一个Registration以及它的所有producer // RegistrationDB.FindRegistrations 根据 Category、key、subKey找到一组对应的Registration // RegistrationDB.FindProducers 根据 Category、key、subKey找到一组对应的producer // RegistrationDB.LookupRegistrations 根据producer peerInfo.id 找到其对应的Registrations // Registration 唯一标识一个消费单位(topic级别或者channel级别) type Registration struct { // category 表明该Registration是topic级别还是channel级别 Category string // topic name Key string // 如果Category 是channel则subKey 是channel name, 如果Category是topic则SubKey为空 SubKey string } type ProducerMap map[string]*Producer type Producer struct { peerInfo *PeerInfo } // 记录prodcer的信息 type PeerInfo struct { lastUpdate int64 id string RemoteAddress string `json:"remote_address"` Hostname string `json:"hostname"` BroadcastAddress string `json:"broadcast_address"` TCPPort int `json:"tcp_port"` HTTPPort int `json:"http_port"` Version string `json:"version"` } type Registrations []Registration
源码分析流程
NSQLookupd.Main
nsqLookupd
启动主流程入口func (l *NSQLookupd) Main() error { ctx := &Context{l} exitCh := make(chan error) var once sync.Once exitFunc := func(err error) { once.Do(func() { if err != nil { l.logf(LOG_FATAL, "%s", err) } exitCh <- err }) } l.tcpServer = &tcpServer{ctx: ctx} // 启动TCPServer 监听响应nsqd register/unregister/ping 请求 l.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(l.tcpListener, l.tcpServer, l.logf)) }) // 启动HTTPServer 监听响应HTTP 请求(主要是topic/channel 的管理以及一些供debug 的信息) httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf)) }) err := <-exitCh return err }
nsqlookupd TCPServer
TCPServer
负责监听和处理TCP
请求。func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error { logf(lg.INFO, "TCP: listening on %s", listener.Addr()) var wg sync.WaitGroup for { // accept TCP conn clientConn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { logf(lg.WARN, "temporary Accept() failure - %s", err) runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { return fmt.Errorf("listener.Accept() error - %s", err) } break } wg.Add(1) go func() { // go TCP handler.Handle handler.Handle(clientConn) wg.Done() }() } // wait to return until all handler goroutines complete wg.Wait() logf(lg.INFO, "TCP: closing %s", listener.Addr()) return nil }
这个
TCPServer
函数和nsqd
的是共用的,但handler 不是同一个handler,nsqlookupd TCP Handle
方法如下:func (p *tcpServer) Handle(clientConn net.Conn) { p.ctx.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) /** 读取并解析 protocolMagic */ buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { clientConn.Close() return } protocolMagic := string(buf) var prot protocol.Protocol switch protocolMagic { case " V1": prot = &LookupProtocolV1{ctx: p.ctx} default: // 返回协议错误 protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL")) clientConn.Close() return } p.conns.Store(clientConn.RemoteAddr(), clientConn) // LookupProtocolV1.IOLoop 处理请求 err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clien
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
<p> <span style="font-size:14px;">本专刊是Go开源项目源码分析专栏,共 17 篇文章,挑选了Go 开源界知名的 4 个开源项目gnet(高效的网络库)、gin(知名的Go微型web框架)、fasthttp(高性能web框架)、nsq(Go消息队列)来对它们进行源码分析,分析它们的设计思想和代码实现。每个项目的讲解都是由浅入深,由设计思想的剖析到源码实现的分析,更易于读者理解。</span> </p> <p> <br /> </p> <h2> <b><span style="font-size:16px;line-height:1;">购买须知:</span></b> </h2> <span style="font-size:14px;">订阅成功后,用户即可通过牛客网 PC 端、App 端享有永久阅读的权限;</span><br /> <span style="font-size:14px;">牛客专刊为虚拟内容服务,订阅成功后概不退款;</span><br /> <span style="font-size:14px;line-height:1;">在专刊阅</span><span style="font-size:14px;line-height:1;">读过程中,如有任何问题,可在文章评论区底部留言,或添加牛客导师,加入读者交流群;</span><br /> <span style="font-size:14px;">想成为牛客作者,请邮件联系yinxiaoxiao@nowcoder.com,邮件主题【牛客作者+写作方向】,并附上个人简历一份及近期作品一份;</span><br /> <p> <span style="font-size:14px;">牛客专刊版权归本平台所有,任何机构、媒体、网站或个人未经本网协议授权不得转载、链接、转贴或以其他方式复制发布 / 发表,违者将依法追究责任</span><span style="font-size:14px;">。</span> </p> <p> <br /> </p>