面试官:说下线程池的实现原理,ThreadPoolExecutor源码的分析
一.概述
线程池,顾名思义就是存放线程的池子,池子里存放了很多可以复用的线程。
如果不用类似线程池的容器,每当我们需要执行用户任务的时候都去创建新的线程,任务执行完之后线程就被回收了,这样频繁地创建和销毁线程会浪费大量的系统资源。
因此,线程池通过线程复用机制,并对线程进行统一管理,具有以下优点:
- 降低系统资源消耗。通过复用已存在的线程,降低线程创建和销毁造成的消耗;
- 提高响应速度。当有任务到达时,无需等待新线程的创建便能立即执行;
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗大量系统资源,还会降低系统的稳定性,使用线程池可以进行对线程进行统一的分配、调优和监控。
ThreadPoolExecutor是线程池框架的一个核心类,本文通过对ThreadPoolExecutor源码的分析(基于JDK 1.8),来深入分析线程池的实现原理。
二.ThreadPoolExecutor类的属性
先从ThreadPoolExecutor类中的字段开始:
// 线程池的控制状态,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//值为29,用来表示偏移量
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池的最大容量,其值的二进制为:00011111111111111111111111111111(29个1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池的运行状态,总共有5个状态,用高3位来表示
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue;
//全局锁,对线程池状态等属性修改时需要使用这个锁
private final ReentrantLock mainLock = new ReentrantLock();
//线程池中工作线程的集合,访问和修改需要持有全局锁
private final HashSet<Worker> workers = new HashSet<Worker>();
// 终止条件
private final Condition termination = mainLock.newCondition();
//线程池中曾经出现过的最大线程数
private int largestPoolSize;
//已完成任务的数量
private long completedTaskCount;
//线程工厂
private volatile ThreadFactory threadFactory;
//任务拒绝策略
private volatile RejectedExecutionHandler handler;
//线程存活时间
private volatile long keepAliveTime;
//是否允许核心线程超时
private volatile boolean allowCoreThreadTimeOut;
//核心池大小,若allowCoreThreadTimeOut被设置,核心线程全部空闲超时被回收的情况下会为0
private volatile int corePoolSize;
//最大池大小,不得超过CAPACITY
private volatile int maximumPoolSize;
//默认的任务拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
private final AccessControlContext acc;
在ThreadPoolExecutor类的这些属性中,线程池状态是控制线程池生命周期至关重要的属性,这里就以线程池状态为出发点进行研究。
通过上面的源码可知,线程池的运行状态总共有5种,其值和含义分别如下:
- RUNNING: 高3位为111,接受新任务并处理阻塞队列中的任务
- SHUTDOWN: 高3位为000,不接受新任务但会处理阻塞队列中的任务
- STOP:高3位为001,不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务
- TIDYING: 高3位为010,所有任务都已终止,工作线程数量为0,线程池将转化到TIDYING状态,即将要执行terminated()钩子方法
- TERMINATED: 高3位为011,terminated()方法已经执行结束
然而,线程池中并没有使用单独的变量来表示线程池的运行状态,而是使用一个AtomicInteger类型的变量ctl来表示线程池的控制状态,其将线程池运行状态与工作线程的数量打包在一个整型中,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量,对ctl的操作主要参考以下几个函数:
// 通过与的方式,获取ctl的高3位,也就是线程池的运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//通过与的方式,获取ctl的低29位,也就是线程池中工作线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
//通过或的方式,将线程池状态和线程池中工作线程的数量打包成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
//SHUTDOWN状态的值是0,比它大的均是线程池停止或清理状态,比它小的是运行状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
接下来,我们看一下线程池状态的所有转换情况,如下:
- RUNNING -> SHUTDOWN:调用shutdown(),可能在finalize()中隐式调用
- (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()
- SHUTDOWN -> TIDYING:当缓存队列和线程池都为空时
- STOP -> TIDYING:当线程池为空时
- TIDYING -> TERMINATED:当terminated()方法执行结束时
通常情况下,线程池有如下两种状态转换流程:
- RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED
- RUNNING -> STOP -> TIDYING -> TERMINATED
三.ThreadPoolExecutor类的构造方法
通常情况下,我们使用线程池的方式就是new一个ThreadPoolExecutor对象来生成一个线程池。接下来,先看ThreadPoolExecutor类的构造函数:
//间接调用最后一个构造函数,采用默认的任务拒绝策略AbortPolicy和默认的线程工厂
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue);
//间接调用最后一个构造函数,采用默认的任务拒绝策略AbortPolicy
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory);
//间接调用最后一个构造函数,采用默认的默认的线程工厂
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler);
//前面三个分别调用了最后一个,主要的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
接下来,看下最后一个构造函数的具体实现:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//参数合法性校验
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
//参数合法性校验
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
//初始化对应的属性
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
下面解释下一下构造器中各个参数的含义:
1.corePoolSize
线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行。
2.maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。
3.keepAliveTime
线程空闲时的存活时间。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
4.unit
keepAliveTime参数的时间单位。
5.workQueue
任务缓存队列,用来存放等待执行的任务。如果当前线程数为corePoolSize,继续提交的任务就会被保存到任务缓存队列中,等待被执行。
一般来说,这里的BlockingQueue有以下三种选择:
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。因此,如果线程池中始终没有空闲线程(任务提交的平均速度快于被处理的速度),可能出现无限制的线程增长。
- LinkedBlockingQueue:基于链表结构的阻塞队列,如果不设置初始化容量,其容量为Integer.MAX_VALUE,即为无界队列。因此,如果线程池中线程数达到了corePoolSize,且始终没有空闲线程(任务提交的平均速度快于被处理的速度),任务缓存队列可能出现无限制的增长。
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。
6.threadFactory
线程工厂,创建新线程时使用的线程工厂。
7.handler
任务拒绝策略,当阻塞队列满了,且线程池中的线程数达到maximumPoolSize,如果继续提交任务,就会采取任务拒绝策略处理该任务,线程池提供了4种任务拒绝策略:
- AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,默认策略;
- CallerRunsPolicy:由调用execute方法的线程执行该任务;
- DiscardPolicy:丢弃任务,但是不抛出异常;
- DiscardOldestPolicy:丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)。
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
四.线程池的实现原理
1.提交任务
线程池框架提供了两种方式提交任务,submit()和execute(),通过submit()方法提交的任务可以返回任务执行的结果,通过execute()方法提交的任务不能获取任务执行的结果。
submit()方法的实现有以下三种:
public Future<?> submit(Runnable task);
public <T> Future<T> submit(Runnable task, T result);
public <T> Future<T> submit(Callable<T> task);
下面以第一个方法为例简单看一下submit()方法的实现:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
submit()方法是在ThreadPoolExecutor的父类AbstractExecutorService类实现的,最终还是调用的ThreadPoolExecutor类的execute()方法,下面着重看一下execute()方法的实现。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取线程池控制状态
int c = ctl.get();
// (1)
//worker数量小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
//创建worker,addWorker方法boolean参数用来判断是否创建核心线程
if (addWorker(command, true))
//成功则返回
return;
//失败则再次获取线程池控制状态
c = ctl.get();
}
//(2)
//线程池处于RUNNING状态,将任务加入workQueue任务缓存队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查,获取线程池控制状态,防止在任务入队的过程中线程池关闭了或者线程池中没有线程了
int recheck = ctl.get();
//线程池不处于RUNNING状态,且将任务从workQueue移除成功
if (! isRunning(recheck) && remove(command))
//采取任务拒绝策略
reject(command);
//worker数量等于0
else if (workerCountOf(recheck) == 0)
//创建worker
addWorker(null, false);
}
//(3)
else if (!addWorker(command, false)) //创建worker
reject(command); //如果创建worker失败,采取任务拒绝策略
}
execute()方法的执行流程可以总结如下:
- 若线程池工作线程数量小于corePoolSize,则创建新线程来执行任务
- 若工作线程数量大于或等于corePoolSize,则将任务加入BlockingQueue
- 若无法将任务加入BlockingQueue(BlockingQueue已满),且工作线程数量小于maximumPoolSize,则创建新的线程来执行任务
- 若工作线程数量达到maximumPoolSize,则创建线程失败,采取任务拒绝策略
可以结合下面的两张图来理解线程池提交任务的执行流程。
2.创建线程
从execute()方法的实现可以看出,addWorker()方法主要负责创建新的线程并执行任务,代码实现如下:
//addWorker有两个参数:Runnable类型的firstTask,用于指定新增的线程执行的第一个任务;boolean类型的core,表示是否创建核心线程
//该方法的返回值代表是否成功新增一个线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// (1)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//线程数超标,不能再创建线程,直接返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS操作递增workCount
//如果成功,那么创建线程前的所有条件校验都满足了,准备创建线程执行任务,退出retry循环
//如果失败,说明有其他线程也在尝试往线程池中创建线程(往线程池提交任务可以是并发的),则继续往下执行
if (compareAndIncrementWorkerCount(c))
break retry;
//重新获取线程池控制状态
c = ctl.get();
// 如果线程池的状态发生了变更,如有其他线程关闭了这个线程池,那么需要回到外层的for循环
if (runStateOf(c) != rs)
continue retry;
//如果只是CAS操作失败的话,进入内层的for循环就可以了
}
}
//到这里,创建线程前的所有条件校验都满足了,可以开始创建线程来执行任务
//worker是否已经启动
boolean workerStarted = false;
//是否已将这个worker添加到workers这个HashSet中
boolean workerAdded = false;
Worker w = null;
try {
//创建一个worker,从这里可以看出对线程的包装
w = new Worker(firstTask);
//取出worker中的线程对象,Worker的构造方***调用ThreadFactory来创建一个新的线程
final Thread t = w.thread;
if (t != null) {
//获取全局锁, 并发的访问线程池workers对象必须加锁,持有锁的期间线程池也不会被关闭
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//重新获取线程池的运行状态
int rs = runStateOf(ctl.get());
//小于SHUTTDOWN即RUNNING
//等于SHUTDOWN并且firstTask为null,不接受新的任务,但是会继续执行等待队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//worker里面的thread不能是已启动的
if (t.isAlive())
throw new IllegalThreadStateException();
//将新创建的线程加入到线程池中
workers.add(w);
int s = workers.size();
// 更新largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//线程添加线程池成功,则启动新创建的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//若线程启动失败,做一些清理工作,例如从workers中移除新添加的worker并递减wokerCount
if (! workerStarted)
addWorkerFailed(w);
}
//返回线程是否启动成功
return workerStarted;
}
因为代码(1)处的逻辑不利于理解,我们通过(1)的等价实现来理解:
if (rs>=SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
//等价实现
rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
其含义为,满足下列条件之一则直接返回false,线程创建失败:
- rs > SHUTDOWN,也就是STOP,TIDYING或TERMINATED,此时不再接受新的任务,且中断正在执行的任务
- rs = SHUTDOWN且firstTask != null,此时不再接受任务,但是仍会处理任务缓存队列中的任务
- rs = SHUTDOWN,队列为空
多说一句,若线程池处于 SHUTDOWN, firstTask 为 null,且 workQueue 非空,那么还得创建线程继续处理任务缓存队列中的任务。
总结一下,addWorker()方法完成了如下几件任务:
- 原子性的增加workerCount
- 将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中
- 启动worker对应的线程
- 若线程启动失败,回滚worker的创建动作,即从workers中移除新添加的worker,并原子性的减少workerCount
限于平台篇幅原因,只能分成两篇发,感兴趣的朋友可以通过查看博主主页来完整阅读