线程池实现的原理
线程池的优点是可总结为以下三点:
- 线程复用
- 控制最大并发数
- 管理线程
1.线程复用过程
理解线程复用原理首先应了解线程生命周期。
在线程的生命周期中,它要经过新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)5种状态。
Thread通过new来新建一个线程,这个过程是是初始化一些线程信息,如线程名,id,线程所属group等,可以认为只是个普通的对象。调用Thread的start()后Java虚拟机会为其创建方法调用栈和程序计数器,同时将hasBeenStarted为true,之后调用start方法就会有异常。
处于这个状态中的线程并没有开始运行,只是表示该线程可以运行了。至于该线程何时开始运行,取决于JVM里线程调度器的调度。当线程获取cpu后,run()方法会被调用。不要自己去调用Thread的run()方法。之后根据CPU的调度在就绪——运行——阻塞间切换,直到run()方法结束或其他方式停止线程,进入dead状态。
所以实现线程复用的原理应该就是要保持线程处于存活状态(就绪,运行或阻塞)。接下来来看下ThreadPoolExecutor是怎么实现线程复用的。
在ThreadPoolExecutor主要Worker类来控制线程的复用。看下Worker类简化后的代码,这样方便理解:
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null){
task.run();
}
}
Worker是一个Runnable,同时拥有一个thread,这个thread就是要开启的线程,在新建Worker对象时同时新建一个Thread对象,同时将Worker自己作为参数传入TThread,这样当Thread的start()方法调用时,运行的实际上是Worker的run()方法,接着到runWorker()中,有个while循环,一直从getTask()里得到Runnable对象,顺序执行。getTask()又是怎么得到Runnable对象的呢?
依旧是简化后的代码:
private Runnable getTask() {
if(一些特殊情况) {
return null;
}
Runnable r = workQueue.take();
return r;
}
这个workQueue就是初始化ThreadPoolExecutor时存放任务的BlockingQueue队列,这个队列里的存放的都是将要执行的Runnable任务。因为BlockingQueue是个阻塞队列,BlockingQueue.take()得到如果是空,则进入等待状态直到BlockingQueue有新的对象被加入时唤醒阻塞的线程。所以一般情况Thread的run()方法就不会结束,而是不断执行从workQueue里的Runnable任务,这就达到了线程复用的原理了。
2.控制最大并发数
那Runnable是什么时候放入workQueue?Worker又是什么时候创建,Worker里的Thread的又是什么时候调用start()开启新线程来执行Worker的run()方法的呢?有上面的分析看出Worker里的runWorker()执行任务时是一个接一个,串行进行的,那并发是怎么体现的呢?
很容易想到是在execute(Runnable runnable)时会做上面的一些任务。看下execute里是怎么做的。
execute:
简化后的代码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 当前线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 直接启动新的线程。
if (addWorker(command, true))
return;
c = ctl.get();
}
// 活动线程数 >= corePoolSize
// runState为RUNNING && 队列未满
// workQueue.offer(command)表示添加到队列,如果添加成功返回true,否则返回false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检验是否为RUNNING状态
// 非RUNNING状态 则从workQueue中移除任务并拒绝
if (!isRunning(recheck) && remove(command))
reject(command);// 采用线程池指定的策略拒绝任务
// 两种情况:
// 1.非RUNNING状态拒绝新的任务
// 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}
addWorker:
简化后的代码
private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
}
根据代码再来看上面提到的线程池工作过程中的添加任务的情况:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
- 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
- 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException。
这就是Android的AsyncTask在并行执行是在超出最大任务数是抛出RejectExecutionException的原因所在,详见基于最新版本的AsyncTask源码解读及AsyncTask的黑暗面
通过addWorker如果成功创建新的线程成功,则通过start()开启新线程,同时将firstTask作为这个Worker里的run()中执行的第一个任务。
虽然每个Worker的任务是串行处理,但如果创建了多个Worker,因为共用一个workQueue,所以就会并行处理了。
所以根据corePoolSize和maximumPoolSize来控制最大并发数。大致过程可用下图表示。
上面的讲解和图来可以很好的理解的这个过程。
如果是做Android开发的,并且对Handler原理比较熟悉,你可能会觉得这个图挺熟悉,其中的一些过程和Handler,Looper,Meaasge使用中,很相似。Handler.send(Message)相当于execute(Runnuble),Looper中维护的Meaasge队列相当于BlockingQueue,只不过需要自己通过同步来维护这个队列,Looper中的loop()函数循环从Meaasge队列取Meaasge和Worker中的runWork()不断从BlockingQueue取Runnable是同样的道理。
3.管理线程
通过线程池可以很好的管理线程的复用,控制并发数,以及销毁等过程,线程的复用和控制并发上面已经讲了,而线程的管理过程已经穿插在其中了,也很好理解。
在ThreadPoolExecutor有个ctl的AtomicInteger变量。通过这一个变量保存了两个内容:
- 所有线程的数量
- 每个线程所处的状态
其中低29位存线程数,高3位存runState,通过位运算来得到不同的值。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//得到线程的状态
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
//得到Worker的的数量
private static int workerCountOf(int c) {
return c & CAPACITY;
}
// 判断线程是否在运行
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
这里主要通过shutdown和shutdownNow()来分析线程池的关闭过程。首先线程池有五种状态来控制任务添加与执行。主要介绍以下三种:
- RUNNING状态:线程池正常运行,可以接受新的任务并处理队列中的任务;
- SHUTDOWN状态:不再接受新的任务,但是会执行队列中的任务;
- STOP状态:不再接受新任务,不处理队列中的任务
shutdown这个方法会将runState置为SHUTDOWN,会终止所有空闲的线程,而仍在工作的线程不受影响,所以队列中的任务人会被执行。shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方法会终止所有的线程,所以队列中的任务也不会被执行了。