老面试官竟问我 Reactor 在 Netty 中是如何实现的

Netty 的 Reactor

我们都知道 Netty 可以有两个线程组,一个是 bossGroup,一个是 workerGroup。
之前也提到了 bossGroup 主要是接待新连接(老板接活),workerGroup 否则负责重新连接后续的一切 I/O (员工干活)
对应到 Reactor 模型中,bossGroup 中的 eventLoop 就是主 Reactor。它的任务就是监听等待连接事件的到来,即 OP_ACCEPT。
然后创建的 channel ,从 workerGroup 中选择一个 eventLoop ,将子 channel 与这个 eventLoop 绑定,之后这个子 channel 对应的 I/O事件,都由这个 eventLoop 负责。
而这个 workerGroup 中的 eventLoop 就是所谓的子 Reactor,它的任务就是负责已经建连完毕的连接之后的所有 I/O 请求。
其实从 eventLoop 从这个名字就能看出,它的作用就是 loop event,说白了就是一个线程,死循环地等待事件的发生,然后根据不同的事件类型进行不一样的后续处理,仅此而已。
正常情况下 bossGroup 只会配置一个 eventLoop,即一个线程,因为一般服务只会暴露一个端口,所以只要一个 eventLoop 监听这个端口,然后 accept 连接。
而 workerGroup 在 Netty 中,默认是 cpu 核心数*2,例如 4 核 CPU ,默认会在 workerGroup 建 8 个 eventLoop,所以就有 8 个子 Reactor。
所以正常 Netty 服务端的配置是,1个主 Reactor,多个从 Reactor,这就是所谓的主从 Reactor。
基本上现在的主流配置都是主从 Reactor。

关于 Reactor 模型的演进

在深入 Netty 的 Reactor 实现之前,我们先来看看,为什么会演变成主从 Reactor?
最开始的模型是 单 Reactor 单线程 ,你可以理解成一个线程来监听新的连接,又要响应老的连接的请求,如果逻辑处理得很快,那没有问题,看看人家 redis 就算够用,但是如果逻辑处理得慢,那就会阻塞其他请求。
所以就有了 单 Reactor 多线程 ,还是由一个线程来监听所有的底层 Socket,但是一些耗时的操作可以分配给线程池进行业务处理,这样就不会因为逻辑处理慢导致 Reactor 的阻塞。
但是这个模型还会有瓶颈,即监听新的连接和响应老的连接的请求都由一个线程处理,积累的老连接多了,有很多事件需要响应,就会影响新连接的接入,这就不太舒服了,况且我们现在都是多核 CPU,还差这么一个线程吗?
所以就又演进成 主从 Reactor ,有一个线程,即主 Reactor 专门等待新连接的建连,然后创建多个线程作为子 Reactor,均匀的负责已经接入的老连接,这样一来既不会影响接待新连接的速度,也能更好的利用多核 CPU 的能力响应老连接的请求。
这就是关于 Reactor 模型的演进了。
好了,接下来我们再看看 Netty 实现 Reactor 核心类,我们现在一般都是用 NIO ,所以我们看 NioEventLoop 这个类。
友情提示,有条件建议在PC端看下面的内容,源码类的手机上看不太舒服

NioEventLoop

前面我们已经提到一个问题 NioEventLoop 就是一个线程,那线程的核心肯定就是它的 run 方法。
基于我们的理解,我们知道这个 run 方法的主基调肯定是死循环等待 I/O 事件产生,然后处理事件。
事实也是如此, NioEventLoop 主要做了三件事:
  1. select 等待 I/O 事件的发生
  2. 处理发生的 I/O 事件
  3. 处理提交至线程中的任务,包括提交的异步任务、定时任务、尾部任务。
首先折叠下代码,可以看到妥妥的死循环,这也是 Reactor 线程的标配,这辈子无限只为了等待事件发生且处理事件。




在 Netty 的实现里,NioEventLoop 线程不仅要处理 I/O 事件,还需要处理提交的异步任务、定时任务和尾部任务,所以这个线程需要平衡 I/O 事件处理和任务处理的时间。
因此有个 selectStrategy 这样的策略,根据判断当前是否有任务在等待被执行,如果有则立即进行一次不会阻塞的 select 来尝试获取 I/O 事件,如果没任务则会选择 SelectStrategy.SELECT 这个策略。




从图中也可以看到,这个策略会根据最近将要发生的定时任务的执行时间来控制 select 最长阻塞的时间。
从下面的代码可以看到,根据定时任务即将执行的时间还预留了 5 微秒的时间窗口,如果 5 微秒内就要到了,那就不阻塞了,直接进行一个非阻塞的 select 立刻尝试获取 I/O 事件。




经过上面的这个操作,select 算是完毕了,最终会把就绪的 I/O 事件个数赋值给 strategy,如果没有的话那 strategy 就是 0 ,接着就该处理 I/O 事件和任务了。




上面代码我把重点几个部分都框出来了,这里有个 selectCnt 来统计 select 的次数,这个用于处理 JDK Selector 空轮询的 bug ,下面会提。
ioRatio 这个参数用来控制 I/O 事件执行的时间和任务执行时间的占比,毕竟一个线程要做多个事情,要做到 雨露均沾 对吧,不能冷落了谁。
可以看到,具体的实现是记录 I/O 事件的执行时间,然后再根据比例算出任务能执行的最长的时间来控制任务的执行。

I/O 事件的处理

我们来看看 I/O 事件具体是如何处理的,也就是 processSelectedKeys 方法。
点进去可以看到,实际上会有两种处理的方法,一种是优化版,一种是普通版。




这两个版本的逻辑都是一样的,区别就在于优化版会替换 selectedKeys 的类型,JDK 实现的 selectedKeys 是 set 类型,而 Netty 认为这个类型的选择还是有优化的余地的。
Netty 用 SelectedSelectionKeySet 类型来替换了 set 类型,其实就是用数组来替换了 set




相比 set 类型而言,数组的遍历更加高效,其次数组尾部添加的效率也高于 set,毕竟 set 还可能会有 hash冲突。当然这是 Netty 为追求底层极致优化所做的,我们平日的代码没必要这般“斤斤计较”,意义不大。
那 Netty 是通过什么办法替换了这个类型呢?
反射 。
看下代码哈,不是很复杂:




这也能给我们提供一些思路,比方你调用三方提供的 jar 包,你无法修改它的源码,但是你又想对它做一些增强,那么就可以仿照 Netty 的做法,通过反射来替换之~
我们打个断点看下替换前后 selectedKey 的类型,之前是 HashSet:




替换了后就变成了 SelectedSelectionKeySet 了。




ok,现在我们再看下优化版的处理 I/O 事件的遍历方法,和普通版逻辑一样的,只是遍历是利用数组罢了。




没啥好说的,就那个帮助 GC 可以提一下,如果你看过很多开源软件你就会发现有很多这样的实现,直接置为 null 的语句,这是为了帮助 GC。
紧接着看下真正处理 I/O 事件的方法 processSelectedKey




可以看到,这个方法本质就是根据不同的事件进行不同的处理,实际上会将事件在对应的 channel 的 pipeline 上面传播,并且触发各种相应的自定义事件,我拿 OP_ACCEPT 事件作为例子分析。
针对 OP_ACCEPT 事件,unsafe.read 实际会调用 NioMessageUnsafe#read 方法。




从上面代码来看,逻辑并不复杂,主要就是循环读取新建立的子 channel,并触发 ChannelRead 和 ChannelReadComplete 事件,使之在 pipeline 中传播,期间就会触发之前添加的 ServerBootstrapAcceptor#channelRead,将其分配给 workerGroup 中的 eventLoop ,即子 Reactor 线程。
当然,我们自定义的 handler 也可以实现这两个事件方法,这样对应的事件到来后,我们能进行相应的逻辑处理。
好了,Netty 的 OP_ACCEPT 事件处理分析到此结束,其他事件也是类似的,都会触发相应的事件,然后在 pipeline 中传递,触发不同 Channelhandler 的方法,进行逻辑处理。
以上,就是 Netty 实现的主从 Reactor 模型。
当然,Netty 也支持单 Reactor,无非就是不要 workerGroup,至于线程数也可以自行配置,十分灵活,不过现在一般用的都是主从 Reactor 模型。
想要了解更多,点击这里即可
全部评论

相关推荐

11-26 22:34
已编辑
重庆邮电大学 Java
快手 客户端开发 (n+5)k*16 公积金12
牛客895077908号:佬 什么双非硕啊
点赞 评论 收藏
分享
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务