JDK 源码剖析 —— ThreadPoolExecutor
Java 中的线程池,一般都是围绕 ThreadPoolExecutor 展开的,其他的实现要么是基于它,要么是模仿它的思想。所以只要理解 ThreadPoolExecutor,就相当于完全理解了 Java 线程池的精髓。
我们可以提前给线程池下一个定义:提供预定义好的线程,供调用者直接执行任务的工具。
本章中的源码基于 JDK 1.8。
线程池优点
也可以说是池化的优点,可类推到各种如连接池、内存池等各种 “池” 的优点。
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
类签名及继承关系
ThreadPoolExecutor 类签名如下:
public class ThreadPoolExecutor extends AbstractExecutorService
ThreadPoolExecutor
类继承了 AbstractExecutorService
类,再向上,一个整体的继承关系如下 UML 类图所示:
![](https://uploadfiles.nowcoder.com/files/20210224/6796629_1614145596874/008eGmZEly1gnyaq6j7atj30fo0u03za.jpg)
ThreadPoolExecutor 实现的顶层接口是 Executor,顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。ExecutorService 接口增加了一些能力:
- 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法
- 提供了管控线程池的方法,比如停止线程池的运行。
AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分,ThreadPoolExecutor 将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
我们可以在这里就先看一下 ThreadPoolExecutor 的运行模型,如下图(图源美团技术团队):
ThreadPoolExecutor 在内部实际上构建了一个生产者消费者模型,将任务看作产品,将任务提交和线程执行解耦。ThreadPoolExecutor 可以在逻辑上分成两个部分:任务管理和线程管理。任务管理充当生产者的角色,当有任务提交后,由线程池决定后续流转:
- 直接申请线程执行任务
- 缓冲到阻塞队列等待
- 拒绝任务
线程管理部分承担消费者的角色,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
生命周期管理
ThreadPoolExecutor 内部会随着线程池运行自行维护线程池状态。ThreadPoolExecutor 内部同时将线程数量(workerCount)和运行状态(runState)封装在一个变量中统一进行维护:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl
是 AtomicInteger
类型,其高 3 位保存 runState,低 29 位保存 workerCount。在源码中大部分情况都要同时获取这两个变量来判断状态,使用一个变量存储可以避免在改变状态时,不必去为了维护两者一致而占用锁。源码中也提供了一些方法供用户获得线程池当前的运行状态、线程个数。这些方法一般都是通过位运算:
// mask private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池将其运行状态分成五种:
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
这里的左移实际上是因为 ctl 中只有高三位是表示运行状态的。每个状态具体如下:
运行状态 | 描述 |
---|---|
RUNNING | 接受新提交的任务,也能处理阻塞队列中的任务 |
SHUTDOWN | 不再接受新提交任务,但是仍然能继续处理阻塞队列中的任务 |
STOP | 不再接受新任务,也不再处理阻塞队列中的任务,同时中断正在处理任务的线程 |
TIDYING | 所有任务都被终止,workerCount 为 0 |
TERMINATED | TIDYING 状态时会自动调用 terminated 方法,方法调用完成后进入本状态 |
声明周期转换如下图所示:
任务调度
任务调度是线程池的入口,当用户提交了一个任务,接下来这个任务的全部过程(执行或拒绝)都由这个阶段负责。
任务调度依赖于几个很重要的参数,这些参数在线程池构造时就会被设置,ThreadPoolExecutor
最长的构造方法如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
几个参数解释如下:
corePoolSize: 线程池核心线程数(平时保留的线程数),使用时机: 在初始时刻,每次请求进来都会创建一个线程直到达到该size maximumPoolSize: 线程池最大线程数,使用时机: 当workQueue都放不下时,启动新线程,直到最大线程数,此时到达线程池的极限 keepAliveTime/unit: 超出corePoolSize数量的线程的保留时间,unit为时间单位 workQueue: 任务队列,当核心线程数达到或者超出后,会先尝试将任务放入该队列由各线程自行消费; ArrayBlockingQueue: 构造函数一定要传大小 LinkedBlockingQueue: 构造函数不传大小会默认为65536(Integer.MAX_VALUE ),当大量请求任务时,容易造成 内存耗尽。 SynchronousQueue: 同步队列,一个没有存储空间的阻塞队列 ,将任务同步交付给工作线程。 PriorityBlockingQueue: 优先队列 threadFactory:线程工厂,用于线程需要创建时,调用其newThread()生产新线程使用 handler: 饱和策略,当队列已放不下任务,且创建的线程已达到 maximum 后,则不能再处理任务,直接将任务交给饱和策略 AbortPolicy: 直接抛弃(默认) CallerRunsPolicy: 用调用者的线程执行任务 DiscardOldestPolicy: 抛弃队列中最久的任务 DiscardPolicy: 抛弃当前任务
整体的一个任务流转过程可以由下图表示:
总体流程总结如下:
- 判断核心线程池是否已满,如果不是,则创建线程执行任务
- 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中
- 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务
- 如果线程池也满了,则按照拒绝策略对任务进行处理
任务调度策略的入口是 execute()
方法,它主要的工作是,检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 当还没有达到核心线程池的数量时,直接添加 1 个新线程,然后让其执行任务即可 if (workerCountOf(c) < corePoolSize) { // 添加新线程,且执行 command 任务 // 添加成功,即不需要后续操作了,添加失败,则说明外部环境变化了 // addWorker 第二个参数 true 表示使用核心线程 if (addWorker(command, true)) return; c = ctl.get(); } // 当核心线程达到后,则尝试添加到阻塞队列中,具体添加方法由阻塞队列实现 // isRunning => c < SHUTDOWN; if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 添加队列成功后,还要再次检测线程池的运行状态,决定启动线程或者状态过期 // 当线程池已关闭,则将刚刚添加的任务移除,走reject策略 if (! isRunning(recheck) && remove(command)) reject(command); // 当一个 worker 都没有时,则添加 worker else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 当队列满后,则直接再创建新的线程运行,addWorker 的 false 表示使用 maximumPoolSize // 如果不能再创建线程了,则 reject else if (!addWorker(command, false)) reject(command); }
整个过程没有通过锁,而是仅依靠一个 AtomicInteger ctl 就保证了线程安全。
Worker 管理
线程池为了获取线程状态,维护线程生命周期,使用了工作线程 Worker 作为线程的包装,Worker 部分代码如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread;// Worker 持有的线程 Runnable firstTask;// 初始化的任务,可以为 null }
Worker 工作线程,持有了一个线程 thread和一个初始化任务 firstTask,同时 Worker 自身也实现了 Runnable 接口。thread 是由线程池构造中的 threadFactory 创建的,而 firstTask 则在 Worker 创建时传入,如果 firstTask 不为 null,Worker 就会在创建完成后立刻执行该任务;如果 firstTask 是 null,说明该 Worker 是一个非核心线程,这个线程就需要去任务队列(workQueue)中获取任务执行。
Worker 执行任务的模型如下图:
对于非核心线程,在创建完成并且没有任务执行后,需要考虑回收的问题。线程池通过一个 HashSet
来持有 Worker 的引用:
private final HashSet<Worker> workers = new HashSet<Worker>();
这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期,这时就需要判断线程是否正在运行。
Worker 通过继承了 AbstractQueuedSynchronizer
(AQS)来实现独占,实现了一个不可重入锁来反映线程当前的状态(所以没有直接继承 ReentrantLock 可重入锁)。具体如下:
- lock 方法获得独占锁,表示当前线程正在执行
- 当线程执行任务完成后,会调用 unlock 释放锁
- 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方***使用 tryLock 方法来尝试获得锁,以判断线程池中的线程是否是空闲状态
在线程回收的过程中就用到了上述独占锁的特性,回收过程示意如下:
增加 Worker
增加 Worker 线程主要通过 addWorker()
方法,该方法功能很单一,仅仅是增加一个 Worker,并不会判断当前的状态等,判断策略是在上个步骤(如 execute()
方法)完成的。
addWorker()
方法有两个参数:firstTask、core。firstTask 参数用于指定新增的线程执行的第一个任务,该参数可以为空;core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize。addWorker()
流程如下:
addWorker()
方法注释如下:
private boolean addWorker(Runnable firstTask, boolean core) { // 为确保线程安全,进行CAS反复重试 retry: for (;;) { int c = ctl.get(); // 获取runState int rs = runStateOf(c); // 已经shutdown, firstTask 为空的添加并不会成功 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 如果超出最大允许创建的线程数,则直接失败 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS 更新worker+1数,成功则说明占位成功退出retry,后续的添加操作将是安全的 // 失败则说明已有其他线程变更该值 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // runState 变更,则退出到 retry 重新循环 if (runStateOf(c) != rs) continue retry; } } // 以下为添加 worker 过程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 使用 Worker 封闭 firstTask 任务,后续运行将由 Worker 接管 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 添加 worker 的过程,需要保证线程安全 mainLock.lock(); try { int rs = runStateOf(ctl.get()); // SHUTDOWN 情况下还是会创建 Worker, 但是后续检测将会失败 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 既然是新添加的线程,就不应该是 alive 状态 if (t.isAlive()) throw new IllegalThreadStateException(); // workers 只是一个工作线程的容器,使用 HashSet 承载,以保持其引用 workers.add(w); int s = workers.size(); // 维护一个全局达到过的最大线程数计数器 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // worker 添加成功后,进行将worker启起来,里面应该是有一个 死循环,一直在获取任务 // 不然怎么运行添加到队列里的任务呢? if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果任务启动失败,则必须进行清理,返回失败 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker 执行任务
Worker 中的 run()
方***调用 runWorker()
来执行任务,方法执行过程如下:
- while 循环不断地通过
getTask()
方法获取任务。 getTask()
方法从阻塞队列中取任务。- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
- 执行任务。
- 如果 getTask 结果为 null 则跳出循环,执行 processWorkerExit() 方法,销毁线程。
整体流程如下图所示:
![](https://uploadfiles.nowcoder.com/files/20210224/6796629_1614145597167/008eGmZEly1gnyi7yphp4j30ti17ctbz.jpg)
runWorker()
方法注释如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 允许打断 boolean completedAbruptly = true; try { // 不停地从 workQueue 中获取任务,然后执行,就是这么个逻辑 // getTask() 会阻塞式获取,所以 Worker 往往不会立即退出 while (task != null || (task = getTask()) != null) { // 执行过程中是不允许并发的,即同时只能一个 task 在运行,此时也不允许进行 interrupt w.lock(); // 检测是否已被线程池是否停止 或者当前 worker 被中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 中断信息传递 wt.interrupt(); try { // 任务开始前 切点,默认为空执行 beforeExecute(wt, task); Throwable thrown = null; try { // 直接调用任务的run方法, 具体的返回结果,会被 FutureTask 封装到 某个变量中 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 任务开始后 切点,默认为空执行 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } // 正常退出,有必要的话,可能重新将 Worker 添加进来 completedAbruptly = false; } finally { // 处理退出后下一步操作,可能重新添加 Worker processWorkerExit(w, completedAbruptly); } }
Worker 回收
线程池中线程的销毁依赖 JVM 自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被 JVM 回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker 被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当 Worker 无法获取到任务,也就是获取的任务为空时,循环会结束,Worker 会主动消除自身在线程池内的引用。在上一节 runWorker()
源码中就可以看到。
try { while (task != null || (task = getTask()) != null) { //执行任务 } } finally { processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己 }
线程回收的工作是在 processWorkerExit()
方法完成的。
大致流程如下:
代码注释如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 移出线程池 workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 在 Worker 正常退出的情况下,检查是否超时导致,维持最小线程数 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; // 如果满足最小线程要求,则直接返回 if (workerCountOf(c) >= min) return; } // 否则再添加一个 Worker 到线程池中备用 // 非正常退出,会直接再添加一个 Worker addWorker(null, false); } }
事实上,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。
结束语
线程池算是 JDK 源码中综合性很强的部分了,对于很多项目的设计都是很有启发性的。
面试中也主要是针对 ThreadPoolExecutor 的设计理念来提问,甚至可能会扩展到让面试者自行设计。
#Java##学习路径#