繁杂网络IO型业务的分析及探索--协程和响应式
❝ 尝试优化业务中发现,服务的心跳信息中有很多线程都是处于 waiting
,如下图所示:❞
thread count="608"
daemon-count="420"
peak-count="611"
total-started-count="13722"
deadlocked="0" new="0" runnable="169" blocked="0"
waiting="314"
复制代码
❝然后看了CPU的使用率,从左到右分别表示
❞CPU的任务等待数/CPU核数
,CPU的执行时间占比总时间(CPU执行时间+CPU空闲时间+ CPU等待时间)
,当前JAVA进程执行时间占比总时间
❝图中可以清晰地看到,并不是计算型业务导致了线程等待,而是极大可能由于服务到底层数据查询的网络IO等待使得排队的线程增加,因此决定考虑优化这一部分。优化的目标,在保证服务和底层存储的心跳信息在一个安全的范围内,尽可能的增加服务吞吐能力。
❞
思路一协程
当时优化的第一时间就想到了大名鼎鼎的quasar
三方库。quasar
可以理解为轻量级的线程实现,熟悉go语言一定知道goroutine
,我们知道Java语言中不支持协程,业务中很多场景都需要用线程池进行优化,但是使用线程池的成本也很高,无论是内存占用还是线程之间的切换消耗,都限制了一个应用不能无限制的创建线程。
好在社区开源了一款Java coroutine
框架quasar
,容我先吐槽一下,这个框架真的是直男程序员写的(已经被拉去写JDK的协程,十分期待JDK能早点支持协程),文档十分匮乏,导致我本地开始搞得时候就报错了一把,开局体验不是很舒服。
当然优点也十分突出,应用中网络IO耗时占比比较突出的场景中,使用quasar
可以极大的提高CPU
的吞吐率。简单描述就是可以在更短的时间内处理更多的请求。不会因为一个线程中的网络IO
堵塞而让后面的线程处于waiting
中,堵塞的时候CPU
是不干活的,因此将整个系统的吞吐率拉胯。
官网的文档中提供了两种使用方式,为了节约篇幅先用第1种方式示范一下使用方式:
-
Running the Instrumentation Java Agent(加载器织入)
-
Ahead-of-Time (AOT) Instrumentation(预编译织入)
这里我先用Gradle
项目作为🌰来详解一下怎么使用。
一、Gradle
配置模块
configurations {
quasar
}
//
tasks.withType(JavaExec) {
jvmArgs "-javaagent:${configurations.quasar.iterator().next()}"
}
//
dependencies {
compile "org.antlr:antlr4:4.7.2"
compile "co.paralleluniverse:quasar-core:0.7.5"
quasar "co.paralleluniverse:quasar-core:0.7.5:jdk8@jar"
testCompile group: 'junit', name: 'junit', version: '4.12'
}
复制代码
复制代码
二、实现一个耳熟能详的echo
服务器
两个Fiber
(相当于是Java的Thread
)相互通信,increasing
发送一个int
数字给echo
,echo
收到之后返回给increasing
,increasing
接收到echo
返回的消息,先打印,在执行++
操作,然后打印出最后的结果。代码示例如下:
-
increasing
final IntChannel increasingToEcho = Channels.newIntChannel(0);
final IntChannel echoToIncreasing = Channels.newIntChannel(0);
//
Fiber<Integer> increasing = new Fiber<>("INCREASING", new SuspendableCallable<Integer>() {
@Override
public Integer run() throws SuspendExecution, InterruptedException {
int curr = 0;
for (int i = 0; i < 10; ++i) {
Fiber.sleep(10);
System.out.println("INCREASING sending curr = " + curr);
increasingToEcho.send(curr);
curr = echoToIncreasing.receive();
System.out.println("INCREASING received curr = " + curr);
curr++;
System.out.println("INCREASING now curr = " + curr);
}
//
System.out.println("INCREASING closing channel and exiting");
increasingToEcho.close();
return curr;
}
}).start();
复制代码
-
echo
Fiber<Void> echo = new Fiber<Void>("ECHO", new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
Integer curr;
while(true) {
Fiber.sleep(1000);
curr = increasingToEcho.receive();
System.out.println("ECHO received curr = " + curr);
//
if (curr != null) {
System.out.println("ECHO sending curr = " + curr);
echoToIncreasing.send(curr);
} else {
System.out.println("ECHO 检测到关闭channel,closing and existing");
echoToIncreasing.close();
return;
}
}
}
}).start();
复制代码
-
运行
increasing
和increasing
try {
increasing.join();
echo.join();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
复制代码
可以看到,使用起来和Java中的Thread
比较相似,API
的语义清晰明了,减小了使用人员的成本。
三、原理及使用注意事项
1. Running the Instrumentation Java Agent
顾名思义,通过修改javaagent
的方式,原理就是在classloading
阶段动态的修改字节码。比如熟悉的AspectJ
框架,核心就是ajc(编译器)
和织入器(weaver)
达到不修改业务逻辑而修改字节码,ajc
在java编译器的基础上,定义了一些aop语法,将符合这些语法的方法进行重新编译。分为「预编译(CTW)」、「加载期(LTW)」、「后编译期(PTW)」 3种织入方式。
quasar
的jvaagent
就属于「加载期(LTW)」 织入的方式。同样也是在不影响正常编译的情况下,增加一些代码检测,当检测到某一个「方法」需要支持「暂停」功能的时候,进行重新编译,从而达到挂起方法保存上下文,阻塞完成后恢复执行下面的代码。
比如一个方法,在运行的时候需要请求网络,此时这个方法就被阻塞了,需要等网络请求返回在执行下面的代码。那么,当方法被阻塞的时候,就需要交给协程去控制,需要保存此时方法运行的上下文。当网络请求完成的时候,在执行方法的下文。粗略的概括一下协程的工作方式大致就是如此。
但是实际的场景会无比复杂。实际的一段阻塞的代码中,里面可会有多个阻塞的代码块,因此最顶端需要一个调度中心
,只有当里面的阻塞代码块执行完了之后,才会执行外面的代码块,不然最后都乱套了。
这就很像java中的ForkJoinPool
,一个大任务可以Fork出很多子任务,只有当子任务
都完成执行,才会去执行父任务
。quasar
也是如此,运行过程中将需要被挂起的方法和方法内的代码块交给调度中心,调度中心中存储任务之间的父子兄弟
关系,然后按照任务层次关系执行代码。
1.1 quasar「织入」的条件
quasar会将满足下面条件的方法进行织入:
-
方法带有
@Suspendable
注解 -
方法抛出了异常
SuspendExecution
-
classpath下
/META-INF/suspendables
、/META-INF/suspendable-supers
指定了一些类或者接口,quasar
会对这些类或者接口的方法进行分析,符合上面任意一种的方法进行「织入」 -
方法内部通过反射调用的方法,前后也会进行「织入」
-
MethodHandle.invoke
动态调用的方法前后进行「织入」 -
JDK动态代理执行的代码块前后「织入」
-
Java 8 lambdas调用前后「织入」
我们也可以从quasar
的官网文档中看到依赖项,其中就有ASM — Java bytecode manipulation and analysis framework, by the ASM team,因此更多想了解织入的细节,大家可以去了解下ASM
框架。Caffe
有空的时候也会单独出一篇文章科普下,因为这块的东西比较偏虚拟机
底层。
1.2 quasar实际使用中需要对业务怎么改造?
如果业务中一个方法中有很多阻塞性业务,那么就要将这些阻塞性业务放入不同的Fiber
执行,可以看到上文中的echo
和increasing
就属于两个阻塞型业务同时又相互依赖,逻辑上的依赖通过Channels
解决。
1.3 兼容性问题
无论是通过javaagent
还是AOT
(预编译织入)的方式进行织入,本质上都是通过对字节码前后进行插入特定的指令。但是这种很容易带来一些兼容性问题,比如很多大厂都会通过pt-tracer
这种染色技术,来对java线程进行着色,进行全链路的调用监控或者压测流量的区分。所以caffe
思来想去就放弃了使用quasar
这款伟大的协程框架,担心这种织入方式会不兼容线程中的染色
。
不过后面会尝试解决,毕竟quasar
的性能让人看了不得不流口水。
思路二响应式编程
❝这块大家都应该很熟悉了吧,最出名的就数
❞ReactiveX/RxJava
,这款在android
中最为被广泛使用,caffe
在这使用RxJava3
进行举例说明。
一、RxJava简介
❝RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
❞
翻译过来就是使用事件驱动实现异步的一款响应式框架。事件驱动
相信写过前端
很熟悉,这也是node.js社区所吹嘘的高并发
。RxJava
底层利用发布订阅模式
(与node.js底层模式相似)并且支持线程切换
来完成在有限的时间内支持更高的并发。
1.RxJava相关概念介绍
既然是发布订阅
模式,那必不可少的三要素发布者
,订阅者
, 事件类型
。
1.1 事件类型
主要分为下面3种事件类型:
-
Next
,发布者可以发布多个Next
事件,订阅者也可以订阅多个Next
事件; -
Complete
,订阅者接收到Complete
事件便不再订阅发布者的事件; -
Error
,发布者发布Error
事件之后,便不再发布事件。订阅者接受Error
事件也不会继续订阅事件。
1.2 echo服务中的发布、订阅、事件
increasing
充当发布者的角色,每隔一段时间向echo
推送一个数字类型的消息。echo
服务接收到消息之后打印出来。
increasing
// 发布者发送事件
Observable increasing = Observable.create((emitter) -> {
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(0);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(1);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(2);
Thread.sleep(new Random().nextInt(1000));
emitter.onComplete();
});
复制代码
echo
// 创建订阅者
Observer<Integer> echo = new Observer<Integer>() {
private Disposable disposable;
//
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("观察者开始订阅");
disposable = d;
}
//
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("观察者接受到消息: " + integer);
}
//
@Override
public void onError(@NonNull Throwable e) {
System.out.println("观察者接收到报错: " + e.getMessage());
}
//
@Override
public void onComplete() {
System.out.println("观察者订阅完成,不再继续订阅消息");
}
};
复制代码
echo
订阅increasing
increasing.subscribe(echo);
复制代码
可以看到最后的执行结果:
观察者开始订阅
观察者接受到消息: 0
观察者接受到消息: 1
观察者接受到消息: 2
观察者订阅完成,不再继续订阅消息
复制代码
因此,在RxJava中,Observable
扮演发布者,Observer
扮演订阅者,ObservableOnSubscribe.subscribe
方法来完成事件
的发布。发布和订阅之间的关联是通过Observable.subscribe
来完成的。
1.3 发布者和订阅者线程切换
上面的实例代码所有的发布者和订阅者的代码都是在一个主线程中进行的。但是实际的业务场景中需要将发布者的业务,和订阅者的业务用不同的线程去完成,以减小业务的耗时。
线程切换
的代码如下:
increasing
:
CountDownLatch latch = new CountDownLatch(3);
// 发布者发送事件
ObservableOnSubscribe<Integer> onSubscribe = new ObservableOnSubscribe<Integer>() {
//
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
System.out.println("发布者开始发布事件-Thread.currentThread().getName() = " + Thread.currentThread().getName());
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(0);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(1);
Thread.sleep(new Random().nextInt(1000));
emitter.onNext(2);
emitter.onComplete();
}
};
Observable<Integer> increasing = Observable.create(onSubscribe);
复制代码
echo
:
// 订阅者接受事件
Observer<Integer> echo = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("订阅者开始订阅事件-" + Thread.currentThread().getName());
disposable = d;
}
//
@Override
public void onNext(@NonNull Integer integer) {
latch.countDown();
System.out.println("订阅者接收到事件-" + Thread.currentThread().getName() + " " + integer);
}
//
@Override
public void onError(@NonNull Throwable e) {
System.out.println("订阅者接收到报错,停止接受订阅事件-" + Thread.currentThread().getName());
}
//
@Override
public void onComplete() {
System.out.println("订阅者接收到complete事件,停止接受订阅事件-" + Thread.currentThread().getName());
}
};
复制代码
线程切换
:
// 订阅者和发布者切换线程订阅
increasing
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.trampoline())
.subscribe(echo);
//
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
复制代码
最后的结果:
订阅者开始订阅事件-main
发布者开始发布事件-Thread.currentThread().getName() = RxCachedThreadScheduler-1
订阅者接收到事件-RxCachedThreadScheduler-1 0
订阅者接收到事件-RxCachedThreadScheduler-1 1
订阅者接收到事件-RxCachedThreadScheduler-1 2
订阅者接收到complete事件,停止接受订阅事件-RxCachedThreadScheduler-1
复制代码
可以看到,发布者和订阅者分别在不同的线程中执行。其中Observable.subscribeOn(@NonNull Scheduler scheduler)
是定义发布者方法执行的调度器,Observable。observeOn(@NonNull Scheduler scheduler)
定义了订阅者方法的调度器。
而调度器
有很多种类别,比如IoScheduler
、NewThreadScheduler
、SingleScheduler
、ComputationScheduler
等,需要根据不同的业务场景,合理的选择Scheduler
。
因此,需要更深层次的理解RxJava,就需要再去扒Scheduler
的具体实现,这里caffe准备之后的文章中进行深度分析。
手写辛苦,麻烦各位大佬点赞关注留言,非常感谢你们的鼓励和支持。