深度解析NIO底层
文章首发于公众号:松花皮蛋的黑板报
作者就职于京东,在稳定性保障、敏捷开发、高级JAVA、微服务架构有深入的理解
一、BIO、NIO的介绍
我们可以分别启动高版本tomcat-8和低版本tomcat-6,然后模拟连接 Socket socket = new Socket(“localhost”,8080)会发现BIO和NIO最明显的区别
所以NIO在做高并发和高吞吐量服务时比BIO更加适合,并且在长链接的情况下,同一时间对CPU的利用率也更高。
传统的BIO也叫”同步阻塞IO”,还有一种叫做”同步非阻塞I/O”
那么问题来了?我们的NIO 底层用的是那种I/O模型呢?其实是IO多路复用
这里提到了两个概念:
1、select/poll 多路由复用器(这里可以先简单的理解成一个队列,有一个单线程在不断的轮询处理队列里的事件)
2、fd 类似于一种请求事件(Socket描述符).有一点很重要,这里的select轮询是阻塞的。
刚刚说的I/O复用模型可以说是奠定了NIO的模型基础,但是我们的UNIX对这个模型做了进一步的优化,刚刚的图我们可以发现一个明显的问题,什么问题呢?select/poll线程像傻瓜一样的顺序轮询fd列表,而且处理的fd也是有限的。默认是1024个。这个时候 epoll(Event Poll 基于事件驱动的select/poll模型)模型就呼之欲出了。
所以我们说NIO是一个典型的同步非阻塞的I/O。而底层的IO模型是采用的epoll方式的多路复用模型。(在UNIX和Linux下)
总结一下,epoll模型相比select/poll模型的一些优点
select/poll模型
1、每次调用select,都需要把fd集合从用户态拷贝到内核态。
2、每次调用select都需要在内核遍历传递进来的所有fd。
3、select支持的文件描述符数量太小了,默认是1024。
epoll模型
1、初次调用select时,会挂载所有的fd进来,并且没有从用户态到内核态的内存复制,而是通过内核和用户空间mmap同一块内存来实现的。
2、epoll在事件就绪时的触发并没有遍历所有fd,而是遍历就绪态的fd链表,节省了大量的CPU时间。
3、所支持的fd的上限是操作系统的最大文件句柄数,简单理解,也就是可以支持的连接数。一般来说1GB内存的机器上大约是10W个句柄左右
二、NIO的模型介绍和实现原理
NIO这个概念,早在jdk1.4的时候就支持了,我们完全可以通过jdk中的nio功能模块去实现一个NIO框架的服务。下面给出一个简单的例子
public class NioDemo {
public static void main(String[] args)
{
try {
initServer(9999);
listenSelector();
} catch (IOException e) {
e.printStackTrace();
}
}
private static Selector selector;
public static void initServer(int port) throws IOException {
//init一个通道,并且打开对连接的接收
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
//打开多路复用器,监控监听fd
selector = Selector.open();
//注册***,SelectionKey.OP_ACCEPT OP_CONNECT OP_READ OP_WRITE
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("服务端启动成功");
}
public static void listenSelector() throws IOException {
while (true) {
System.out.println("select开始阻塞");
selector.select();
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
System.out.println("获取到新的key");
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
//删除已选的key,防止重复处理
keyIterator.remove();
try {
handler(selectionKey,keyIterator);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void handler(SelectionKey selectionKey,Iterator<SelectionKey> keyIterator) throws IOException {
if(selectionKey.isAcceptable()) {
System.out.println("新的客户端连接");
//有新的客户端连接则注册到读就就绪事件
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
channel.register(selector,SelectionKey.OP_READ);
} else if(selectionKey.isReadable()) {
//通道可读说明可以从buffer里取数据
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readData = socketChannel.read(buffer);
if(readData>0) {
String msg = new String(buffer.array(),"GBK").trim();
System.out.println("服务端收到消息"+msg);
ByteBuffer byteBuffer = ByteBuffer.wrap("我收到你的消费了".getBytes("UTF-8"));
socketChannel.write(byteBuffer);
} else {
System.out.println("客户端关闭");
selectionKey.cancel();
}
} else {
System.out.println(selectionKey.isValid());
}
}
}
复制代码
当我telnet 192.168.0.101 9999时,会打印
服务端启动成功
select开始阻塞
获取到新的key
新的客户端连接
select开始阻塞
当我通过浏览器访问时,会打印
服务端启动成功
select开始阻塞
获取到新的key
新的客户端连接
select开始阻塞
获取到新的key
新的客户端连接
select开始阻塞
获取到新的key
新的客户端连接
服务端收到消息GET / HTTP/1.1
Host: 192.168.0.101:9999
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,/;q=0.8,application/signed-exchange;v=b3
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
select开始阻塞
获取到新的key
客户端关闭
select开始阻塞
演示完了上面的代码,可以大概的看到他的执行思路是优先注册链接事件,然后监听这个事件,收到事件后处理完成后,又向select注册接下来的读取就绪、写入就绪事件。我们称这种开发模型为Reactor模型,也叫反应堆模型
事件驱动模型,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers
三、基于NIO实现的Nettty通信框架
Netty是基于NIO实现的网络通信框架,在rpc框架中被广为应用(dubbo、jsf),同时Netty可以支持多种协议的开发,非常适合高并发的网络编程(弹幕、游戏服务器等)
下面是Netty对JDK原生NIO的一些增强
1、实现了事件分发,和业务执行的线程池隔离。(也就是我们说的IO线程、工作线程职责剥离)
2、一个NIO服务端处理网络的闪断、客户端的重复接入、客户端的安全认证、消息的编解码、半包读写等情况。而这些在Netty中都得到了很好的解决。
3、代码编写较复杂,缺少封装,每增加一层处理需要修改的地方有很多,且很难调试。而Netty实现了PipeLine来实现不同的上下行的Handler。
4、需要具备其他的额外技能做铺垫,例如熟悉Java多线程编程。这是因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序。
5、Netty在健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的。
四、Netty案例演示
这里以弹幕为例,介绍一下Netty在实际项目中的应用。
这里我们使用WebSocket+Netty的方式来实践弹幕的推送,设想一下,如果我们不用Netty能不能实现弹幕系统的功能?肯定是可以实现的:
简单暴力的方案:Ajax轮询去服务器取消息。客户端按照某个时间间隔不断地向服务端发送请求,请求服务端的最新数据然后更新客户端显示。这种方式实际上浪费了大量流量并且对服务端造成了很大压力。
以上方案的弊端:
1、Http为半双工超文本协议,也就是说同一时刻,只有一个方向的数据传送。
2、Http消息冗长,包含请求行、请求头、请求体。占用很多的带宽和服务器资源。
3、空轮询问题。
使用WebSocket,可以很好的解决http协议带来的问题。
webSocket特点如下:
1、单一TCP长连接,采用全双工通信模式。这是一个二进制的协议
2、对代理、防火墙透明。
3、无头部信息、消息更精简。
4、通过ping/pong 来保活。
5、服务器可以主动推送消息给客户端,不在需要客户轮询
直接看代码吧
WebsocketDanmuServer.clss
/**
* 基于Websocket的弹幕服务
*/
public class WebsocketDanmuServer {
private static final Integer port = 7777;
public static void main(String[] args) {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebsocketDanmuChannelInitializer());
try {
ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", port).sync();
System.out.println("弹幕服务器启动,网址是 : " + "http://127.0.0.1:" + port);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
复制代码
WebsocketDanmuChannelInitializer
/**
* 弹幕服务的上下行handler
*/
public class WebsocketDanmuChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast("http-decode", new HttpRequestDecoder());//解码
channelPipeline.addLast("http-encode", new HttpResponseEncoder());//编码
channelPipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));
channelPipeline.addLast("http-chunked", new ChunkedWriteHandler());
channelPipeline.addLast("http-request",new HttpRequestHandler("/ws"));
channelPipeline.addLast("WebSocket-protocol",new WebSocketServerProtocolHandler("/ws"));
channelPipeline.addLast("WebSocket-request",new TextWebSocketFrameHandler());
}
}
复制代码
HttpRequestHandler
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
try {
String path = location.toURI() + "WebsocketDanMu.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
if (wsUri.equalsIgnoreCase(fullHttpRequest.getUri())) {
channelHandlerContext.fireChannelRead(fullHttpRequest.retain());
} else {
if (HttpHeaders.is100ContinueExpected(fullHttpRequest)) {
send100Continue(channelHandlerContext);
}
RandomAccessFile file = new RandomAccessFile(INDEX, "r");
HttpResponse response = new DefaultHttpResponse(fullHttpRequest.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(fullHttpRequest);
if (keepAlive) {
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
channelHandlerContext.write(response);
if (channelHandlerContext.pipeline().get(SslHandler.class) == null) {
channelHandlerContext.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
} else {
channelHandlerContext.write(new ChunkedNioFile(file.getChannel()));
}
ChannelFuture future = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
file.close();
}
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:" + incoming.remoteAddress() + "异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
复制代码
TextWebSocketFrameHandler
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
Channel incoming = channelHandlerContext.channel();
for (Channel channel : channels) {
if (channel != incoming) {
channel.writeAndFlush(new TextWebSocketFrame(textWebSocketFrame.text()));
} else {
channel.writeAndFlush(new TextWebSocketFrame(textWebSocketFrame.text()));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
channels.add(incoming);
System.out.println("Client:" + incoming.remoteAddress() + "加入");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开"));
System.err.println("Client:" + incoming.remoteAddress() + "离开");
// 不需要手动remove"channels.remove(ctx.channel());"
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:" + incoming.remoteAddress() + "在线");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:" + incoming.remoteAddress() + "掉线");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel incoming = ctx.channel();
System.err.println("Client:" + incoming.remoteAddress() + "异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
复制代码
文章来源:www.liangsonghua.me
作者介绍:京东资深工程师-梁松华,长期关注稳定性保障、敏捷开发、JAVA高级、微服务架构