从Apollo看长轮询

前言

如果让我设计一个配置中心,最先想到的两个核心功能:一个是如何将配置存储下来,另一个是怎么能够实时的获取到最新的配置;最简单的方式我们可以直接利用现有的一些中间件:Zookeeper、Redis等;

  • Zookeeper: 本身提供了持久化功能,同时客户端可以监听某个节点,节点数据变更,可以实时推送给客户端;
  • Redis: Redis也提供了持久化的方案,同时可以通过psubscribe提供的订阅功能做到配置的实时推动;

以上两个中间件在客户端进行连接的时候,其实建立的是长连接;长连接就是客户端和服务端建立连接后连接是保持的,这样服务端可以很容易的把最新的配置推送给客户端.

数据交互模式

常见的数据交互模式:Push模式和Pull模式

  • Push模式:服务端主动推送数据给客户端,上面说到的Zookeeper和Redis就是这种模式; 这种模式实时性很高,对客户端来说也简单,接收处理消息即可;缺点就是服务端不知道客户端处理消息的能力,可能会导致数据积压,同时也增加了服务端的工作量,影响服务端的性能;
  • Pull模式: 拉取模式,即客户端主动去服务端拉取数据,主动权在客户端,拉取数据,然后处理数据,再拉取数据,一直循环下去,具体拉取数据的时间间隔不好设定,太短可能会导致大量的连接拉取不到数据,太长导致数据接收不及时;

可以发现两种模式各有优缺点;Apollo既没有使用Push模式也没有使用Pull模式,而是使用了长轮询的数据交互模式;

  • 长轮询模式:通过客户端和服务端的配合,达到主动权在客户端,同时也能保证数据的实时性;长轮询本质上也是轮询,只不过对普通的轮询做了优化处理,服务端在没有数据的时候并不是马上返回数据,会 hold 住请求,等待服务端有数据,或者一直没有数据超时处理;

长轮询模式其实在很多中间件中被广泛使用比如:RocketMQ、Kakfa、Nacos等;当然具体每个中间件是如何实现自己的长轮询方案是不一样的,本文重点介绍的是Apollo如何利用Servlet3.0中提供的异步请求处理机制来实现自己的长轮询;下面先看一下Servlet3.0的异步处理机制原理.

Servlet异步处理

Servlet 3.0开始支持异步处理请求, 在接收到请求之后Servlet线程可以将耗时的操作委派给另一个线程来完成, 这样Servlet线程就可以被释放出来,可以去接收其他的请求,可以提高系统的吞吐量;为了更加清楚的了解异步处理,我们需要了解一下线程模型,下面以常用的容器Tomcat为例,来看一下Tomcat的线程模型;

Tomcat线程模型

Unix系统I/O模型主要包含以下五种类型:同步阻塞I/O、同步非阻塞I/O、I/O多路复用、信号驱动I/O和异步I/O;

目前主流使用的是I/O多路复用模型,像以高性能著称的Netty,以及下面要介绍的Tomcat都是用来此模型;此模型依赖非阻塞Channel,可以 通过一个线程来管理多个数据通道(Channel)的状态,极大的提供了性能; 当然此模型下也演变出多个变种包括:单线程模型、多线程模型、主从多线程模型, 这里就不展开讲了。

Tomcat的NioEndpoint主要包括:AcceptorPollerSocketProcessorExecutor几个组件,大致关系如下图所示:

  • Acceptor:默认启动一个线程通过ServerSocketChannel监听连接请求,同时往Selecotr中注册读事件;
  • Poller: 内部其实就是一个选择器Selector,选择哪些SocketChannel已经准备好读写了,这里一般会启动多个pollerThread,也就是我们常见的多Selector模型;
  • Executor:执行器线程池,默认最大可以创建200个线程,是真正处理I/O读写的地方,这个线程很大程度上也影响了系统的吞吐量;其实异步处理也就是释放这里面的线程;
  • SocketProcessor:可以理解就是处理I/O读写的任务,被Executor来调度;

异步处理

常见的同步处理是Executor组件分配一个线程,这个线程同时处理消息的读取、消息的处理以及消息的返回;

而异步处理Executor组件会分配一个线程处理消息的读取,读取完线程就被回收了;接下来消息的处理用户可以使用自己的线程去处理;而消息的返回会再次通过Executor分配一个新的线程来处理; 以下是一个简单的实例:

 public void doGet(HttpServletRequest request, HttpServletResponse response) {
      //1.获取asyncContext
      AsyncContext asyncContext = request.startAsync();
      //设置超时时间,不设置默认30秒
      asyncContext.setTimeout(5000);
      //设置监听器
      asyncContext.addListener(new AsyncListener() {...});
      //2.异步处理
      new Thread(() -> {
 	PrintWriter out = asyncContext.getResponse().getWriter();
 	out.println("test");
 	//3.任务完成返回结果
 	asyncContext.complete();
      }.start();
 }

  • 异步开始: 首先生成一个AsyncContext类保存了requst、response、超时时间以及监听器等信息;以上doGet方法中的内容会在Executor组件分配的第一个线程中执行,这个线程一般会命名为:http-nio-8080-exec-x;
  • 异步处理业务逻辑: 自定义一个线程,用来处理耗时的业务逻辑;这时候自定义线程启动完,Executor就回收了http-nio-8080-exec-x线程; 注意这时处理当前SocketChannelSocketProcessor还在,只是没有线程处理它,导致客户端一直阻塞着;
  • 任务结束: 重新让SocketProcessor参与调度,从Executor组件中分配新的线程,返回客户端结果;这里可以执行的操作包括:completedispatch;当然也可能任务太耗时,导致超过设定的超时时间,从而进入超时处理;
  • 超时处理:Tomcat会专门启动一个AsyncTimeout线程,用来检查等待中的SocketProcessor是否已经超时,如果超时同样会结束阻塞.

大致流程如下图所示:

DeferredResult实现长轮询

Apollo并没有直接使用Servlet异步处理的原生接口,而是使用DeferredResult类,此类其实是对原生异步处理类的包装;从字面意思“延迟结果”也能大概看出来它的作用,同样的也可以在接受到请求之后,处理异步任务,归还主线程,等有了结果通知再次申请主线程,返回结果;

下面首先看一个使用DeferredResult的简短实例:

 public static ExecutorService exec = Executors.newCachedThreadPool();

    @GetMapping("/DeferredResult")
    public DeferredResult<String> testDeferredResult() {
        //1.创建DeferredResult并设置超时时间
    	DeferredResult<String> deferredResult = new DeferredResult<String>(10 * 1000L);
    	//设置一些监听器
    	deferredResult.onTimeout(...)
    	deferredResult.onCompletion(...)
    	deferredResult.onError(...)
    		
    	//2.异步处理
    	new Thread(() -> {
 		//3.异步处理耗时任务
 		Thread.sleep(2000);
 		//4.给deferredResult设置结果
 		deferredResult.setResult("异步线程执行完毕");
 	}.start();
    }

具体的使用结构和使用Servlet异步处理基本一致,setResult()其实和complete()功能一样;

原理分析

DeferredResult使用在Controller中返回的就是一个DeferredResult对象,Spring会根据每种返回的类型有对应的处理器,DeferredResult对应的处理器DeferredResultMethodReturnValueHandler:

@Override
	public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

		if (returnValue == null) {
			mavContainer.setRequestHandled(true);
			return;
		}

		DeferredResult<?> result;
		// 返回类型为DeferredResult
		if (returnValue instanceof DeferredResult) {
			result = (DeferredResult<?>) returnValue;
		}
                // 返回类型为ListenableFuture做适配转换,可以转换成DeferredResult
		else if (returnValue instanceof ListenableFuture) {
			result = adaptListenableFuture((ListenableFuture<?>) returnValue);
		}
                //返回类型为CompletionStage做适配转换,可以转换成DeferredResult
		else if (returnValue instanceof CompletionStage) {
			result = adaptCompletionStage((CompletionStage<?>) returnValue);
		}
		else {
			// Should not happen...
			throw new IllegalStateException("Unexpected return value type: " + returnValue);
		}
		//处理DeferredResult
		WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
	}

接下来就是处理DeferredResult,里面包含设置超时时间、设置监听器等操作,看里面的部分重点代码:

public void startDeferredResultProcessing(
	final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
	...省略部源码(设置超时时间、设置监听器等操作)...
	//开始异步处理,对request.startAsync()的包装
	startAsyncProcessing(processingContext);
        
        try {
		interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
		//设置结果处理器,在接收到result的时候会执行此处理器,这里面其实就是对complete()、dispatch()方法的包装
	        deferredResult.setResultHandler(result -> {
		result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
		setConcurrentResultAndDispatch(result);
	        });
	}
	catch (Throwable ex) {
		setConcurrentResultAndDispatch(ex);
	}	
}

在执行deferredResult.setResult()方法后,就会触发DeferredResultHandler,这里面会请求重新调度,从而返回结果给客户端;

以下是一个大致处理的流程图:

长轮询

在了解了以上这些之后,我们就可以考虑一下Apollo为什么使用DeferredResult来实现长轮询机制;从上面对长轮询的描述中,有几个点很重要分别是:

  • 没有结果能hold住请求,能阻塞多久,这里面是存在一个超时时间的;
  • 在被hold的这段时间内,需要释放资源,主要是线程资源;
  • 有了结果能立即通知,对时效性要求高;

基于以上这三点,使用DeferredResult来实现长轮询是非常合适的:

  • 能够将request和response保存到AsyncContext中,不给Channel响应SocketChannel本身就会阻塞住;
  • 返回一个DeferredResult对象,处理完上下文信息之后,就释放Executor主线程了;
  • 提供了complete和dispatch操作,有结果能实时通知;

Apollo分析

Apollo的长轮询机制完成依赖DeferredResult,重点可以看两个类:NotificationControllerV2ReleaseMessageScanner

  • NotificationControllerV2:主要用来接收请求,并且生成DeferredResult,并且统一保存到Map中;
  • ReleaseMessageScanner:扫描发布记录,哪些namespace有更新,立即通知Map中等待的DeferredResult;

以下是一个大致处理的流程图:

其他长轮询实现

上面提到长轮询模式其实在很多中间件中被广泛使用比如:RocketMQ、Kakfa、Nacos等;

Nacos本身也是使用的DeferredResult机制来实现的长轮询;RocketMQ和Kafka在客户端消费消息的时候,同意也采用了长轮询的方式,虽然没有直接使用DeferredResult,其实实现原理基本一致,下面重点看一下RocketMQ的长轮询机制;

RocketMQ长轮询

消费者在消费消息的时候,如果发现本轮没有消息,其实并不会立即返回,同样会hold住一段时间等待是否有消息;

在看RocketMQ源码到过程中,同样重点关注Broker中的两个类:PullMessageProcessorPullRequestHoldService

PullMessageProcessor从名字就可以看出这是一个拉起消息的处理类,重点看一下在没有拉取到消息是如何处理的

case ResponseCode.PULL_NOT_FOUND:
     if(brokerAllowSuspend &&hasSuspendFlag){
        long pollingTimeMills = suspendTimeoutMillisLong;
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }

        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }

返回编码为PULL_NOT_FOUND,会使用PullRequestHoldService服务,hold住当前的PullRequest,其实就是把PullRequest保存下来;这里面有一个重要的代码就是response 赋值为null,在给SocketChannel发送消息的时候会判断response是否为空,如果我空就返回消息,这时候的SocketChannel其实就处于阻塞状态了,并且当前的线程处理完也释放了;

PullRequestHoldService一方面保存了没有拉取到消息的PullRequest,另一方面内部会启动一个线程检查被hold住的PullRequest释放有新消息,如果有会通知拉取消息,并返回给客户端;

public void run() {
        log.info("{} service started", this.getServiceName());
        while (!this.isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.waitForRunning(5 * 1000);
                } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

                long beginLockTimestamp = this.systemClock.now();
                //异步检查hold住的请求
                this.checkHoldRequest();
                long costTime = this.systemClock.now() - beginLockTimestamp;
                if (costTime > 5 * 1000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
                }
            } catch (Throwable e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        log.info("{} service end", this.getServiceName());
    }

private void checkHoldRequest() {
        //遍历所有被保存的没有拉取到消息的pullRequest
        for (String key : this.pullRequestTable.keySet()) {
            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == kArray.length) {
                String topic = kArray[0];
                int queueId = Integer.parseInt(kArray[1]);
                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                try {
                    //通知拉取最新的消息并且重新分配处理线程,将消息发送给客户端
                    this.notifyMessageArriving(topic, queueId, offset);
                } catch (Throwable e) {
                    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
                }
            }
        }
    }

总结

本文从了解Apollo长轮询机制出发,在了解到使用的是DeferredResult来实现的,深入的过程中发现其底层其实使用的是Servlet的异步处理机制,再深入的过程中发现有释放主线程、申请主线程的过程;这时候就有必要去了解Tomcat的线程模型,这样整个处理流程就串起来了;同时为了了解长轮询机制更多的应用范围,已RocketMQ为例做了一定的分析;这样对整个长轮询机制有更深入的了解.

全部评论

相关推荐

11-05 07:29
贵州大学 Java
点赞 评论 收藏
分享
我在朝九晚六双休的联想等你:如果我是你,身体素质好我会去参军,然后走士兵计划考研211只需要200多分。
点赞 评论 收藏
分享
评论
点赞
1
分享
牛客网
牛客企业服务