gnet重点源码解析
gnet重点源码解析
之前用几篇文章依次讲解了Go 原生网络库
的优势和实现原理、Go 原生网络库
在某些场景下可能稍有的劣势(没有银弹)、gnet
的架构设计介绍、gnet
架构之下相比于Go 原生网络库
的优势、gnet
的基本使用。我相信经过前几篇文章的循序渐进,大家对Go 原生网络库
和 gnet
都有了一定的认识和理解。那么这篇就开始真正的进入gnet
的源码来通过分析gnet
的主要流程代码来进一步深入体会它的设计,我相信从这中间可以学到很多东西,本篇文章可能篇幅会有点长,我尽量挑选和精简重点代码来讲解。大家之后也可以直接拉取一下gnet 的源代码自己分析一下它的流程。
目录结构,代码层次
先来看看gnet
的代码目录层次,阅读一个项目的代码之前弄清楚一个项目的目录代码层次结构是事半功倍的。项目根目录下面的文件数量有点多,实际上因为gnet
兼容多个系统(linux
、mac
、windows
),所以在不同的系统上会有不同的实现,但是实现大同小异,因为我自己在用的是mac
,所以本文选取的是mac os
下的流程来分析,大家也可以自行去分析其他系统下的主要流程,大同小异 。
|-- examples 目录下面是一些使用`gnet` 构建的`server` 例子,上篇文章`gnet基本使用` 的代码基本上也取自该目录下的示例。 |-- poll 目录下面主要是封装的`bytebuffer pool` 和`goroutine pool` 的使用 |-- ringbuffer 目录下封装实现了`Ring-Buffer` 来管理`Tcp` 数据流和管理内存 |-- acceptor_unix.go 实现的是accept过程 |-- codec.go 封装了多种消息编码和解码的实现 |-- connection_unix.go封装实现了TCP connection 和 UDP connection 相关操作,如连接的新建和连接的读写等 |-- errors.go 是一些错误码和错误信息的定义 |-- eventloop_unix.go 封装了事件循环相关处理 |-- listener_unix.go 实现的是对TCP listener 和 UDP listenr 的初始化和关闭 |-- load_balancing.go 实现的是多种负载均衡算法,使得多个goroutine worker 能够相对公平的合作 |-- reactor_linux.go 实现的是gnet 的main reactor 和sub reactor 的启动初始化 |-- server_unix.go 实现的是gnet 整个服务的启动初始化和服务关闭相关操作
上面简单的介绍了目录代码结构和功能,其实这里面有很多东西是次要的,比如消息编解码相关的代码只要知道它定义了一系列的编解码算法,最终提供了encode
和 decode
两个api
来对发送的消息编码和对收到的消息解码即可,不太需要进到源码里看它是怎么实现的。pool
相关的也只要知道它封装提供了goroutine pool
供使用就行了。我认为主要的流程是分析服务启动的初始化流程?数据结构是怎么被初始化的?重要的数据结构之间的联系是怎么建立起来的?连接从被建立->读取->处理->响应->关闭这个流程是怎么走通的?gnet
的main sub reactor
模式在这期间是如何良好工作的?通过理清整个主要的流程体会代码的设计。所以我们会从一个简单的gnet
构建的echo server
作为入口,看看几行代码的背后gnet
是如何做到这些事情的。为了更好的理解,可以配合一下运行时序图:
从echo server
开始
type echoServer struct { // echoServer 实现了event handler interface, 成为了一个gnet event handler *gnet.EventServer } func (es *echoServer) OnInitComplete(srv gnet.Server) (action gnet.Action) { // OnInitComplete 回调实现 log.Printf("Echo server is listening on %s (multi-cores: %t, loops: %d)\n", srv.Addr.String(), srv.Multicore, srv.NumEventLoop) return } func (es *echoServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) { // React 回调实现 // Echo synchronously. out = frame return } func main() { var port int var multicore bool // Example command: go run echo.go --port 9000 --multicore=true flag.IntVar(&port, "port", 9000, "--port 9000") // 接收端口配置,默认9000 flag.BoolVar(&multicore, "multicore", false, "--multicore true")// 是否开启多核配置,默认否 flag.Parse() echo := new(echoServer) // event handler log.Fatal(gnet.Serve(echo, fmt.Sprintf("tcp://:%d", port), gnet.WithMulticore(multicore))) // 开启gnet 服务 }
上述代码是一个gnet
构建的echo server
,它做的事情就是把接收到的消息直接原样返回(react
回调实现),重点是gnet.Servre
方法,我们传入了event handler
和一些服务配置项,通过这个方法我们启动了gnet
服务,所以让我们从这个方法入手,一层层地看下去基本上就可以理清楚它的启动流程。
gnet.Serve
gnet.Serve 初始化配置、初始化listener,然后和用户定义的event handler作为参数调用serve启动gnet服务
再看gnet.Serve
方法之前我们先来看一个重要的数据结构,那就是listener
结构,因为gnet.Serve
做的事情就是初始化listener
之后将它作为参数启动:
type listener struct { f *os.File fd int // unix文件描述符 ln net.Listener // Listener是一个用于面向流(TCP)的网络协议的公用的网络监听器接口。多个线程可能会同时调用一个Listener的方法。 once sync.Once // 这里用sync.once 防止对listener多次close pconn net.PacketConn // PacketConn接口代表通用的面向数据包(UDP)的网络连接。多个线程可能会同时调用同一个Conn的方法。 lnaddr net.Addr // Addr代表一个网络终端地址 addr, network string }
弄清listener
结构之后再来看代码就更容易理解gnet.Serve
方法的功能其实就是根据通信协议来初始化listener
结构:
func Serve(eventHandler EventHandler, addr string, opts ...Option) error { var ln listener // listener options := loadOptions(opts...) // 解析配置 ... ... var err error ln.network, ln.addr = parseAddr(addr) if ln.network == "udp" { // udp 连接 初始化 listener.pconn if options.ReusePort && runtime.GOOS != "windows" { ln.pconn, err = netpoll.ReusePortListenPacket(ln.network, ln.addr) } else { ln.pconn, err = net.ListenPacket(ln.network, ln.addr) } } else { // tcp 连接 初始化 listener.ln if options.ReusePort && runtime.GOOS != "windows" { ln.ln, err = netpoll.ReusePortListen(ln.network, ln.addr) } else { ln.ln, err = net.Listen(ln.network, ln.addr) } } if err != nil { return err } if ln.pconn != nil { // UDP 初始化 listenr.lnaddr ln.lnaddr = ln.pconn.LocalAddr() } else { // TCP 初始化 listenr.lnaddr ln.lnaddr = ln.ln.Addr() } if err := ln.system(); err != nil { // 通过ln.system() 初始化 listener.fd return err } return serve(eventHandler, &ln, options) }
初始化完listener
之后作为参数(和开发人员之前定义的eventHandler 和 配置项一起)调用了serve
方法,那接下来看看serve
方法在底层做了什么事情
serve
serve 初始化 server结构体(在这期间会调用一些用户定义的回调方法,比如server结构体初始化后会调用
OnInitComplete
),调用server.start 启动event loops
同样,serve方法中非常重要的一个数据结构是server
,整个方法都是为这个数据结构服务的,所以在看serve
方法的代码之前还是要来看看server
:
type server struct { ln *listener // listener wg sync.WaitGroup // wait group 用于多个event loop的同步 opts *Options // server 配置项 once sync.Once // 确保只 signalShutdown 一次 cond *sync.Cond // shutdown signaler codec ICodec // TCP 流编码解码器 logger Logger // logger for log info ticktock chan time.Duration // 定时器channel mainLoop *eventloop // main evnet loop (对应运行时序图的base IO thread,用于accpet connections) eventHandler EventHandler // 开发人员实现的eventHandler subLoopGroup IEventLoopGroup // 所有 event loop 的集合,IEventLoopGroup 是一个interface,定义了四个方法用于将ac cpet 的connetion均匀投递到各个event loop } type IEventLoopGroup interface { register(*eventloop) // 将event loop 注册到IEventLoopGroup next(int) *eventloop // 调用则在集合中根据选择的负载算法来分配返回一个event loop给该连接(根据选择的负载算法来决定,例如轮询算法、最少连接数算法、源地址hash算法等) iterate(func(int, *eventloop) bool) // 遍历IEventLoopGroup 所有的event loop len() int // 返回event loop 的数量 }
了解了上面重要的数据结构之后就可以对serve
方法的职责和代码更加易于理解了:
func serve(eventHandler EventHandler, listener *listener, options *Options) error { // 根据配置选项或者默认选项来确定需要多少个event loop / goroutines numEventLoop := 1 if options.Multicore { numEventLoop = runtime.NumCPU() } if options.NumEventLoop > 0 { numEventLoop = options.NumEventLoop } svr := new(server) svr.opts = options // 初始化 server.opts svr.eventHandler = eventHandler // 初始化 server.eventHandler svr.ln = listener // 初始化 server.listener switch options.LB { // 根据负载均衡算法配置来对应初始化server.subLoopGroup case RoundRobin: // 轮询负载均衡算法 svr.subLoopGroup = new(roundRobinEventLoopGroup) case LeastConnections: // 最少连接数优先 svr.subLoopGroup = new(leastConnectionsEventLoopGroup) case SourceAddrHash: // 源地址hash(同一个源地址发起的请求会被分到同一个event loop 处理) svr.subLoopGroup = new(sourceAddrHashEventLoopGroup) } svr.cond = sync.NewCond(&sync.Mutex{}) // server.cond 初始化,用于server shutdown 通知 svr.ticktock = make(chan time.Duration, 1) // server 定时channel,用于定时事件 svr.logger = func() Logger { // server.logger 初始化 if options.Logger == nil { return defaultLogger } return options.Logger }() svr.codec = func() ICodec { // server TCP编码解码器初始化 if options
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
<p> <span style="font-size:14px;">本专刊是Go开源项目源码分析专栏,共 17 篇文章,挑选了Go 开源界知名的 4 个开源项目gnet(高效的网络库)、gin(知名的Go微型web框架)、fasthttp(高性能web框架)、nsq(Go消息队列)来对它们进行源码分析,分析它们的设计思想和代码实现。每个项目的讲解都是由浅入深,由设计思想的剖析到源码实现的分析,更易于读者理解。</span> </p> <p> <br /> </p> <h2> <b><span style="font-size:16px;line-height:1;">购买须知:</span></b> </h2> <span style="font-size:14px;">订阅成功后,用户即可通过牛客网 PC 端、App 端享有永久阅读的权限;</span><br /> <span style="font-size:14px;">牛客专刊为虚拟内容服务,订阅成功后概不退款;</span><br /> <span style="font-size:14px;line-height:1;">在专刊阅</span><span style="font-size:14px;line-height:1;">读过程中,如有任何问题,可在文章评论区底部留言,或添加牛客导师,加入读者交流群;</span><br /> <span style="font-size:14px;">想成为牛客作者,请邮件联系yinxiaoxiao@nowcoder.com,邮件主题【牛客作者+写作方向】,并附上个人简历一份及近期作品一份;</span><br /> <p> <span style="font-size:14px;">牛客专刊版权归本平台所有,任何机构、媒体、网站或个人未经本网协议授权不得转载、链接、转贴或以其他方式复制发布 / 发表,违者将依法追究责任</span><span style="font-size:14px;">。</span> </p> <p> <br /> </p>