从Apollo看长轮询
前言
如果让我设计一个配置中心,最先想到的两个核心功能:一个是如何将配置存储下来,另一个是怎么能够实时的获取到最新的配置;最简单的方式我们可以直接利用现有的一些中间件:Zookeeper、Redis等;
Zookeeper
: 本身提供了持久化功能,同时客户端可以监听某个节点,节点数据变更,可以实时推送给客户端;Redis
: Redis也提供了持久化的方案,同时可以通过psubscribe提供的订阅功能做到配置的实时推动;
以上两个中间件在客户端进行连接的时候,其实建立的是长连接;长连接就是客户端和服务端建立连接后连接是保持的,这样服务端可以很容易的把最新的配置推送给客户端.
数据交互模式
常见的数据交互模式:Push模式和Pull模式
- Push模式:服务端主动推送数据给客户端,上面说到的Zookeeper和Redis就是这种模式; 这种模式实时性很高,对客户端来说也简单,接收处理消息即可;缺点就是服务端不知道客户端处理消息的能力,可能会导致数据积压,同时也增加了服务端的工作量,影响服务端的性能;
- Pull模式: 拉取模式,即客户端主动去服务端拉取数据,主动权在客户端,拉取数据,然后处理数据,再拉取数据,一直循环下去,具体拉取数据的时间间隔不好设定,太短可能会导致大量的连接拉取不到数据,太长导致数据接收不及时;
可以发现两种模式各有优缺点;Apollo既没有使用Push模式也没有使用Pull模式,而是使用了长轮询的数据交互模式;
- 长轮询模式:通过客户端和服务端的配合,达到主动权在客户端,同时也能保证数据的实时性;长轮询本质上也是轮询,只不过对普通的轮询做了优化处理,服务端在没有数据的时候并不是马上返回数据,会 hold 住请求,等待服务端有数据,或者一直没有数据超时处理;
长轮询模式其实在很多中间件中被广泛使用比如:RocketMQ、Kakfa、Nacos等;当然具体每个中间件是如何实现自己的长轮询方案是不一样的,本文重点介绍的是Apollo如何利用Servlet3.0中提供的异步请求处理机制来实现自己的长轮询;下面先看一下Servlet3.0的异步处理机制原理.
Servlet异步处理
Servlet 3.0开始支持异步处理请求, 在接收到请求之后Servlet线程可以将耗时的操作委派给另一个线程来完成, 这样Servlet线程就可以被释放出来,可以去接收其他的请求,可以提高系统的吞吐量;为了更加清楚的了解异步处理,我们需要了解一下线程模型,下面以常用的容器Tomcat为例,来看一下Tomcat的线程模型;
Tomcat线程模型
Unix系统I/O模型主要包含以下五种类型:同步阻塞I/O、同步非阻塞I/O、I/O多路复用、信号驱动I/O和异步I/O;
目前主流使用的是I/O多路复用模型,像以高性能著称的Netty,以及下面要介绍的Tomcat都是用来此模型;此模型依赖非阻塞Channel,可以 通过一个线程来管理多个数据通道(Channel)的状态,极大的提供了性能; 当然此模型下也演变出多个变种包括:单线程模型、多线程模型、主从多线程模型, 这里就不展开讲了。
Tomcat的NioEndpoint主要包括:Acceptor
、Poller
、SocketProcessor
和Executor
几个组件,大致关系如下图所示:
- Acceptor:默认启动一个线程通过
ServerSocketChannel
监听连接请求,同时往Selecotr中注册读事件; - Poller: 内部其实就是一个选择器
Selector
,选择哪些SocketChannel
已经准备好读写了,这里一般会启动多个pollerThread,也就是我们常见的多Selector模型; - Executor:执行器线程池,默认最大可以创建200个线程,是真正处理I/O读写的地方,这个线程很大程度上也影响了系统的吞吐量;其实异步处理也就是释放这里面的线程;
- SocketProcessor:可以理解就是处理I/O读写的任务,被
Executor
来调度;
异步处理
常见的同步处理是Executor
组件分配一个线程,这个线程同时处理消息的读取、消息的处理以及消息的返回;
而异步处理Executor组件会分配一个线程处理消息的读取,读取完线程就被回收了;接下来消息的处理用户可以使用自己的线程去处理;而消息的返回会再次通过Executor分配一个新的线程来处理; 以下是一个简单的实例:
public void doGet(HttpServletRequest request, HttpServletResponse response) { //1.获取asyncContext AsyncContext asyncContext = request.startAsync(); //设置超时时间,不设置默认30秒 asyncContext.setTimeout(5000); //设置监听器 asyncContext.addListener(new AsyncListener() {...}); //2.异步处理 new Thread(() -> { PrintWriter out = asyncContext.getResponse().getWriter(); out.println("test"); //3.任务完成返回结果 asyncContext.complete(); }.start(); }
- 异步开始: 首先生成一个
AsyncContext
类保存了requst、response、超时时间以及监听器等信息;以上doGet方法中的内容会在Executor组件分配的第一个线程中执行,这个线程一般会命名为:http-nio-8080-exec-x; - 异步处理业务逻辑: 自定义一个线程,用来处理耗时的业务逻辑;这时候自定义线程启动完,Executor就回收了http-nio-8080-exec-x线程; 注意这时处理当前
SocketChannel
的SocketProcessor
还在,只是没有线程处理它,导致客户端一直阻塞着; - 任务结束: 重新让
SocketProcessor
参与调度,从Executor
组件中分配新的线程,返回客户端结果;这里可以执行的操作包括:complete
和dispatch
;当然也可能任务太耗时,导致超过设定的超时时间,从而进入超时处理; - 超时处理:Tomcat会专门启动一个
AsyncTimeout
线程,用来检查等待中的SocketProcessor
是否已经超时,如果超时同样会结束阻塞.
大致流程如下图所示:
DeferredResult实现长轮询
Apollo并没有直接使用Servlet异步处理的原生接口,而是使用DeferredResult类,此类其实是对原生异步处理类的包装;从字面意思“延迟结果”也能大概看出来它的作用,同样的也可以在接受到请求之后,处理异步任务,归还主线程,等有了结果通知再次申请主线程,返回结果;
下面首先看一个使用DeferredResult的简短实例:
public static ExecutorService exec = Executors.newCachedThreadPool(); @GetMapping("/DeferredResult") public DeferredResult<String> testDeferredResult() { //1.创建DeferredResult并设置超时时间 DeferredResult<String> deferredResult = new DeferredResult<String>(10 * 1000L); //设置一些监听器 deferredResult.onTimeout(...) deferredResult.onCompletion(...) deferredResult.onError(...) //2.异步处理 new Thread(() -> { //3.异步处理耗时任务 Thread.sleep(2000); //4.给deferredResult设置结果 deferredResult.setResult("异步线程执行完毕"); }.start(); }
具体的使用结构和使用Servlet异步处理基本一致,setResult()其实和complete()功能一样;
原理分析
DeferredResult
使用在Controller中返回的就是一个DeferredResult对象,Spring会根据每种返回的类型有对应的处理器,DeferredResult对应的处理器DeferredResultMethodReturnValueHandler
:
@Override public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { if (returnValue == null) { mavContainer.setRequestHandled(true); return; } DeferredResult<?> result; // 返回类型为DeferredResult if (returnValue instanceof DeferredResult) { result = (DeferredResult<?>) returnValue; } // 返回类型为ListenableFuture做适配转换,可以转换成DeferredResult else if (returnValue instanceof ListenableFuture) { result = adaptListenableFuture((ListenableFuture<?>) returnValue); } //返回类型为CompletionStage做适配转换,可以转换成DeferredResult else if (returnValue instanceof CompletionStage) { result = adaptCompletionStage((CompletionStage<?>) returnValue); } else { // Should not happen... throw new IllegalStateException("Unexpected return value type: " + returnValue); } //处理DeferredResult WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer); }
接下来就是处理DeferredResult,里面包含设置超时时间、设置监听器等操作,看里面的部分重点代码:
public void startDeferredResultProcessing( final DeferredResult<?> deferredResult, Object... processingContext) throws Exception { ...省略部源码(设置超时时间、设置监听器等操作)... //开始异步处理,对request.startAsync()的包装 startAsyncProcessing(processingContext); try { interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult); //设置结果处理器,在接收到result的时候会执行此处理器,这里面其实就是对complete()、dispatch()方法的包装 deferredResult.setResultHandler(result -> { result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result); setConcurrentResultAndDispatch(result); }); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } }
在执行deferredResult.setResult()方法后,就会触发DeferredResultHandler,这里面会请求重新调度,从而返回结果给客户端;
以下是一个大致处理的流程图:
长轮询
在了解了以上这些之后,我们就可以考虑一下Apollo为什么使用DeferredResult来实现长轮询机制;从上面对长轮询的描述中,有几个点很重要分别是:
- 没有结果能hold住请求,能阻塞多久,这里面是存在一个超时时间的;
- 在被hold的这段时间内,需要释放资源,主要是线程资源;
- 有了结果能立即通知,对时效性要求高;
基于以上这三点,使用DeferredResult来实现长轮询是非常合适的:
- 能够将request和response保存到AsyncContext中,不给Channel响应SocketChannel本身就会阻塞住;
- 返回一个DeferredResult对象,处理完上下文信息之后,就释放Executor主线程了;
- 提供了complete和dispatch操作,有结果能实时通知;
Apollo分析
Apollo的长轮询机制完成依赖DeferredResult
,重点可以看两个类:NotificationControllerV2
和ReleaseMessageScanner
- NotificationControllerV2:主要用来接收请求,并且生成DeferredResult,并且统一保存到Map中;
- ReleaseMessageScanner:扫描发布记录,哪些namespace有更新,立即通知Map中等待的DeferredResult;
以下是一个大致处理的流程图:
其他长轮询实现
上面提到长轮询模式其实在很多中间件中被广泛使用比如:RocketMQ、Kakfa、Nacos等;
Nacos本身也是使用的DeferredResult机制来实现的长轮询;RocketMQ和Kafka在客户端消费消息的时候,同意也采用了长轮询的方式,虽然没有直接使用DeferredResult,其实实现原理基本一致,下面重点看一下RocketMQ的长轮询机制;
RocketMQ长轮询
消费者在消费消息的时候,如果发现本轮没有消息,其实并不会立即返回,同样会hold住一段时间等待是否有消息;
在看RocketMQ源码到过程中,同样重点关注Broker中的两个类:PullMessageProcessor
和PullRequestHoldService
PullMessageProcessor
从名字就可以看出这是一个拉起消息的处理类,重点看一下在没有拉取到消息是如何处理的
case ResponseCode.PULL_NOT_FOUND: if(brokerAllowSuspend &&hasSuspendFlag){ long pollingTimeMills = suspendTimeoutMillisLong; if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break; }
返回编码为PULL_NOT_FOUND
,会使用PullRequestHoldService
服务,hold住当前的PullRequest
,其实就是把PullRequest
保存下来;这里面有一个重要的代码就是response 赋值为null,在给SocketChannel发送消息的时候会判断response是否为空,如果我空就返回消息,这时候的SocketChannel其实就处于阻塞状态了,并且当前的线程处理完也释放了;
PullRequestHoldService
一方面保存了没有拉取到消息的PullRequest
,另一方面内部会启动一个线程检查被hold住的PullRequest
释放有新消息,如果有会通知拉取消息,并返回给客户端;
public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp = this.systemClock.now(); //异步检查hold住的请求 this.checkHoldRequest(); long costTime = this.systemClock.now() - beginLockTimestamp; if (costTime > 5 * 1000) { log.info("[NOTIFYME] check hold request cost {} ms.", costTime); } } catch (Throwable e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info("{} service end", this.getServiceName()); } private void checkHoldRequest() { //遍历所有被保存的没有拉取到消息的pullRequest for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); try { //通知拉取最新的消息并且重新分配处理线程,将消息发送给客户端 this.notifyMessageArriving(topic, queueId, offset); } catch (Throwable e) { log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); } } } }
总结
本文从了解Apollo长轮询机制出发,在了解到使用的是DeferredResult来实现的,深入的过程中发现其底层其实使用的是Servlet的异步处理机制,再深入的过程中发现有释放主线程、申请主线程的过程;这时候就有必要去了解Tomcat的线程模型,这样整个处理流程就串起来了;同时为了了解长轮询机制更多的应用范围,已RocketMQ为例做了一定的分析;这样对整个长轮询机制有更深入的了解.