花5分钟把线程池源码撸完,面试还怕被问线程池?
前言
大家好,我是小海,我们知道不论是面试还是工作当中,都会接触到线程池,线程池更是面试官爱问的知识点,今天我带着大家来看一看线程池的源码,虽然看源码是一件十分枯燥无味的事情,但是你不卷,总有人卷,谁也不甘心被优化!!!跟着我,卷起来!!!
继承体系
线程池的继承体系如下图所示
线程池状态
线程池状态转换如下图所示
-
首先创建出线程池,并且没有调用
shutdown
方法之前,线程池就处于正常的运行状态,即RUNNING
。处于RUNNING
状态的线程池,你提交的任务(调用submit
或者execute
方法)才会被线程池执行,处于RUNNING
状态的线程池能接受新提交的任务,并且也能处理阻塞队列中的任务。 -
处于
RUNNING
状态的线程池调用shutdown
方法之后,线程池的状态就会变成SHUTDOWN
,此时你再提交新的任务(调用submit
或者execute
方法)就会被线程池拒绝掉。但是SHUTDOWN
状态的线程池有一个特殊的点:它不是会将所有的线程立马退出,而是将之前任务队列中的任务都执行完之后线程才会依次退出。 -
处于
STOP
状态的线程池不能接受新提交的任务,也不能处理阻塞队列中的任务,会中断正在处理任务的线程。当线程池处于RUNNING
或者SHUTDOWN
状态时,调用shutdownNow
方法就会线程池就会进入该状态。 -
当所有的任务都终止了,线程池中的线程数
workCount
为0
时,线程池就会进入TIDYING
状态 -
最后一个退出的线程(即执行完任务队列中的最后一个任务后),最后一个退出的线程会把线程池的状态变成
TIDYING
,然后调用terminated
方法,线程池就会变成TERMINATED
状态了。
拒绝策略
四种拒绝策略如下:
AbortPolicy
:抛出一个异常,默认的(比较常用)CallerRunsPolicy
:交给线程池调用所在的线程进行处理DiscardPolicy
:直接丢弃任务DiscardOldestPolicy
:丢弃队列里最老的任务,将当前这个任务继续提交给线程池
属性
//高3位:表示当前线程池运行状态 除去高3位之后的低位(29位):表示当前线程池中所拥有的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//表示在ctl中,低COUNT_BITS位 是用于存放当前线程数量的位。COUNT_BITS的值为29
private static final int COUNT_BITS = Integer.SIZE - 3;
//低COUNT_BITS位 所能表达的最大数值。 000 11111111111111111111 => 5亿多。
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//-1就是32个1来表示的,左移29位,后面补零,即只剩下三个1和29个0
//111 000000000000000000 转换成整数,其实是一个负数
private static final int RUNNING = -1 << COUNT_BITS;
//000 000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//001 000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//010 000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//011 000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
//任务队列,当线程池中的线程达到核心线程数量时,再提交任务 就会直接提交到 workQueue
//workQueue instanceOf ArrayBrokingQueue LinkedBrokingQueue 同步队列
private final BlockingQueue<Runnable> workQueue;
//线程池全局锁,增加worker(线程) 减少 worker(线程) 时需要持有mainLock , 修改线程池运行状态时,也需要。
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
//线程池中真正存放 worker->thread 的地方。
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Wait condition to support awaitTermination
*/
//当外部线程调用 awaitTermination() 方法时,外部线程会等待当前线程池状态为 Termination 为止。
//等待是如何实现的? 就是将外部线程 封装成 waitNode 放入到 Condition 队列中了, waitNode.Thread 就是外部线程,会被park掉(处于WAITING状态)。
//当线程池 状态 变为 Termination时,会去唤醒这些线程。通过 termination.signalAll() ,唤醒之后这些线程会进入到 阻塞队列,然后头结点会去抢占mainLock。
//抢占到的线程,会继续执行awaitTermination() 后面程序。这些线程最后,都会正常执行。
//简单理解:termination.await() 会将线程阻塞在这。
// termination.signalAll() 会将阻塞在这的线程依次唤醒
private final Condition termination = mainLock.newCondition();
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
*/
//记录线程池生命周期内 线程数最大值
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
*/
//记录线程池所完成任务总数 ,当worker(线程)退出时会将 worker完成的任务累积到completedTaskCount
private long completedTaskCount;
//创建线程时会使用 线程工厂,当我们使用 Executors.newFix... newCache... 创建线程池时,使用的是 DefaultThreadFactory
//一般不建议使用Default线程池,推荐自己实现ThreadFactory
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
*/
//拒绝策略,juc包提供了4中方式,默认采用 Abort..抛出异常的方式。
private volatile RejectedExecutionHandler handler;
//空闲线程存活时间,当allowCoreThreadTimeOut == false 时,会维护核心线程数量内的线程存活,超出核心线程数量的部分会被回收。
//allowCoreThreadTimeOut == true 核心数量内的线程 空闲时 也会被回收。
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
//控制核心线程数量内的线程 是否可以被回收。true 可以,false不可以。
private volatile boolean allowCoreThreadTimeOut;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
//核心线程数量限制。
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
//线程池最大线程数量限制。
private volatile int maximumPoolSize;
/**
* The default rejected execution handler
*/
//缺省拒绝策略,采用的是AbortPolicy 抛出异常的方式。
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
复制代码
-
ctl
:高3位
:表示当前线程池运行状态(runState
),除去高3位之后的低位(29位
):表示当前线程池中所拥有的线程数量(workerCount
)。 -
COUNT_BITS
:值为29
(Integer.SIZE - 3
=32 - 3 = 29
),表示在ctl
中,低COUNT_BITS位
是用于存放当前线程数量的位。 -
CAPACITY
:低COUNT_BITS位
所能表达的最大数值。000 11111111111111111111
=> 5亿多。 -
RUNNING
:表示线程池处于正常运行状态,-1
就是32
个1
来表示的,左移29
位,后面补零,即-1 << COUNT_BITS
只剩下三个1
和29
个0
,即111 000000000000000000
,转换成整数,其实是一个负数。 -
SHUTDOWN
:表示线程池处于SHUTDOWN
状态,同理0 << COUNT_BITS
,即000000000000000000....
,即就是0
-
STOP
:表示线程池处于STOP
状态,同理1 << COUNT_BITS
,即001 000000000000000000....
. -
TIDYING
:表示线程池处于TIDYING
状态,同理2 << COUNT_BITS
,即010 000000000000000000....
. -
TERMINATED
:表示线程池处于TERMINATED
状态,同理3 << COUNT_BITS
,即011000000000000000000....
. -
workQueue
:任务队列,当线程池中的线程达到核心线程数量时,再提交任务 就会直接提交到workQueue
,任务队列的实现有ArrayBrokingQueue
、LinkedBrokingQueue
等等。 -
mainLock
:线程池全局锁,增加worker
(线程) 减少worker
(线程) 时需要持有mainLock
, 修改线程池运行状态时,也需要。 -
workers
:线程池中会把线程都封装成worker
,workers
真正存放所有的worker
,即存放所有的线程。 -
largestPoolSize
:记录线程池生命周期内 线程数最大值。 -
completedTaskCount
:记录线程池所完成任务总数 ,当worker(线程)
退出时会将worker
完成的任务累积到completedTaskCount
-
threadFactory
:创建线程时会使用 线程工厂,一般不建议使用默认的,推荐自己实现ThreadFactory
-
handler
:拒绝策略,juc包提供了4中方式。AbortPolicy
:抛出一个异常,默认的(比较常用)CallerRunsPolicy
:交给线程池调用所在的线程进行处理DiscardPolicy
:直接丢弃任务DiscardOldestPolicy
:丢弃队列里最老的任务,将当前这个任务继续提交给线程池
-
keepAliveTime
:空闲线程存活时间,当allowCoreThreadTimeOut == false
时,会维护核心线程数量内的线程存活,超出核心线程数量的部分会被回收;allowCoreThreadTimeOut == true
核心数量内的线程 空闲时 也会被回收。 -
allowCoreThreadTimeOut
:控制核心线程数量内的线程 是否可以被回收。true 可以,false不可以。 -
corePoolSize
:核心线程数量限制 -
maximumPoolSize
:线程池最大线程数量限制。 -
defaultHandler
:默认的拒绝策略,采用的是AbortPolicy
抛出异常的方式。
小方法
常用的小方法源码如下:
// Packing and unpacking ctl
//获取当前线程池运行状态,即获取ctl的高三位的值+29个0
// 将ctl的值 & ~CAPACITY CAPACITY为高三位为0,低29位为1,即000 11111111111111111111
//~000 11111111111111111111 => 111 000000000000000000000
//c == ctl = 111 000000000000000000111
//111 000000000000000000111
//111 000000000000000000000
//111 000000000000000000000
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取当前线程池线程数量
//c == ctl = 111 000000000000000000111
//111 000000000000000000111
//000 111111111111111111111
//000 000000000000000000111 => 7
private static int workerCountOf(int c) { return c & CAPACITY; }
//用在重置当前线程池ctl值时 会用到
//rs 表示线程池状态 wc 表示当前线程池中worker(线程)数量
//rs:111 000000000000000000
// |
//wc:000 000000000000000111
//=
//111 000000000000000111
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
//比较当前线程池ctl所表示的状态,是否小于某个状态s
//c = 111 000000000000000111 < 000 000000000000000000 == true
//所有情况下,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//比较当前线程池ctl所表示的状态,是否大于等于某个状态s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
//小于SHUTDOWN 的一定是RUNNING。 SHUTDOWN == 0
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
//使用CAS方式 让ctl值+1 ,成功返回true, 失败返回false
//将ctl的低29位即线程数量+1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
//使用CAS方式 让ctl值-1 ,成功返回true, 失败返回false
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
//将ctl值减一,这个方法一定成功
private void decrementWorkerCount() {
//这里会一直重试,直到成功为止。
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
复制代码
runStateOf
:获取当前线程池运行状态,即获取ctl
的高三位的值 +29
个0
。workerCountOf
:获取当前线程池线程数量,即ctl
的低29
位的值。ctlOf
:用于重置当前线程池ctl
值时会用到,rs
表示线程池状态,wc
表示当前线程池中worker
(线程)数量。runStateLessThan
:比较当前线程池ctl
所表示的状态,是否小于某个状态s
,我们知道RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
。runStateAtLeast
:比较当前线程池ctl
所表示的状态,是否大于等于某个状态s
。isRunning
:小于SHUTDOWN
的一定是RUNNING
。SHUTDOWN == 0
。compareAndIncrementWorkerCount
:使用CAS
方式让ctl
中线程数量加一,成功返回true
, 失败返回false
compareAndDecrementWorkerCount
:使用CAS
方式让ctl
中线程数量减一,成功返回true
, 失败返回false
decrementWorkerCount
:使用CAS
方式让ctl
中线程数量减一,这个方法一定成功。
构造方法
构造方法源码如下:
public ThreadPoolExecutor(int corePoolSize,//核心线程数限制
int maximumPoolSize,//最大线程限制
long keepAliveTime,//空闲线程存活时间
TimeUnit unit,//时间单位 seconds nano..
BlockingQueue<Runnable> workQueue,//任务队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler/*拒绝策略*/) {
//判断参数是否越界
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
//工作队列 和 线程工厂 和 拒绝策略 都不能为空。
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
构造方法中的7
大参数如下:
corePoolSize
:线程池中的核心线程数。- 当线程池中的线程数量小于
corePoolSize
核心线程数时,对于新提交的任务,即使线程池中的其他线程是空闲的,但是线程池也会新创建线程来处理这个新提交的任务(底层调用addWorker
方法新增worker
(线程),并且把新提交的任务作为新创建的worker
(线程)的第一任务,即firstTask
)。 - 当线程池中的线程数量大于等于
corePoolSize
核心线程数并且小于maximumPoolSize
最大线程数时,对于新提交的任务,如果任务队列workQueue
还未满时,则将新提交的任务加入到任务队列workQueue
中。如果任务队列workQueue
已满时,则会创建新线程(底层调用addWorker
方法新增worker
(线程),并且把新提交的任务作为新创建的worker
(线程)的第一任务,即firstTask
)来处理新提交的任务。 - 当线程池中的线程数量大于等于
maximumPoolSize
最大线程数,如果任务队列workQueue
还未满时,则将新提交的任务加入到任务队列workQueue
中。如果任务队列workQueue
已满时,则通过handler
所指定的拒绝策略来处理新提交的任务。
- 当线程池中的线程数量小于
maximumPoolSize
:线程池中的最大线程数。workQueue
:任务队列(阻塞队列)keepAliveTime
:当线程池中的线程数量大于corePoolSize
核心线程数时,并且这些核心线程数外的其他线程处于空闲状态(没有新的任务提交)时,等待keepAliveTime
时间后就会将这部分线程回收。unit
:keepAliveTime
参数的时间单位,例如TimeUnit.SECONDS
表示秒。handler
:拒绝策略。threadFactory
:线程工厂,用于真正创建线程,默认使用DefaultThreadFactory
中的newThread
方法来创建线程池中的线程,该方法中会设置先创建出来的线程为非守护线程并且优先级设置为5,即Thread.NORM_PRIORITY
,最高的线程优先级为10,即Thread.MAX_PRIORITY
。DefaultThreadFactory
类的结构如下:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
/**
* 使用该方法创建线程
*/
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
//线程名
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
//设置为非守护线程
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
//设置线程优先级为5,最高优先级为10
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
复制代码
DefaultThreadFactory
类的newThread
方***在下面的内部类Worker
的构造方法中调用。而内部类Worker
的构造方***在调用addWorker
方法创建worker
(线程)的时候调用。
内部类
比较重要的内部类Worker
,该类用于用于创建线程时封装线程,该类的结构和重要的方法如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
//Worker采用了AQS的独占模式
//独占模式:两个重要属性 state 和 ExclusiveOwnerThread
//state:0时表示未被占用 > 0时表示被占用 < 0 时 表示初始状态,这种情况下不能被抢锁。
//ExclusiveOwnerThread:表示独占锁的线程。
/** Thread this worker is running in. Null if factory fails. */
//worker内部封装的工作线程
final Thread thread;
/** Initial task to run. Possibly null. */
//假设firstTask不为空,那么当worker启动后(内部的线程启动)会优先执行firstTask,当执行完firstTask后,会到queue中去获取下一个任务。
Runnable firstTask;
/** Per-thread task counter */
//记录当前worker所完成任务数量。
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
//firstTask可以为null。为null 启动后会到queue中获取。
Worker(Runnable firstTask) {
//设置AQS独占模式为初始化中状态,这个时候 不能被抢占锁。
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//使用线程工厂创建了一个线程,并且将当前worker 指定为 Runnable,也就是说当thread启动的时候,会以worker.run()为入口。
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
//当worker启动时,会执行run()
public void run() {
//ThreadPoolExecutor->runWorker() 这个是核心方法,等后面分析worker启动后逻辑时会以这里切入。
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
//判断当前worker的独占锁是否被独占。
//0 表示未被占用
//1 表示已占用
protected boolean isHeldExclusively() {
return getState() != 0;
}
//尝试去占用worker的独占锁
//返回值 表示是否抢占成功
protected boolean tryAcquire(int unused) {
//使用CAS修改 AQS中的 state ,期望值为0(0时表示未被占用),修改成功表示当前线程抢占成功
//那么则设置 ExclusiveOwnerThread 为当前线程。
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//外部不会直接调用这个方法 这个方法是AQS 内调用的,外部调用unlock时 ,unlock->AQS.release() ->tryRelease()
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//加锁,加锁失败时,会阻塞当前线程,直到获取到锁位置。
public void lock() { acquire(1); }
//尝试去加锁,如果当前锁是未被持有状态,那么加锁成功后 会返回true,否则不会阻塞当前线程,直接返回false.
public boolean tryLock() { return tryAcquire(1); }
//一般情况下,咱们调用unlock 要保证 当前线程是持有锁的。
//特殊情况,当worker的state == -1 时,调用unlock 表示初始化state 设置state == 0
//启动worker之前会先调用unlock()这个方法。会强制刷新ExclusiveOwnerThread == null State==0
public void unlock() { release(1); }
//就是返回当前worker的lock是否被占用。
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
复制代码
-
属性如下:
thread
:worker
内部封装的工作线程。firstTask
:创建Worker
调用构造方法时参数firstTask
不为空,那么当worker
启动后(内部的线程启动)会优先执行firstTask
,当执行完firstTask
后,会调用getTask
方法到任务队列workQueue
中去获取下一个任务来执行;如果firstTask
为空,则创建出来的worker
直接调用getTask
方法到任务队列workQueue
中去获取任务来执行。completedTasks
:记录当前worker
所完成任务数量。
-
构造方法中首先调用
setState(-1)
设置AQS
中的state
字段的值为-1
(因为Worker
类继承了AbstractQueuedSynchronizer
),接着执行getThreadFactory().newThread(this)
来真正创建线程(如果在创建线程池的时候在构造方法中指定了ThreadFactory
的话就会调用对应的ThreadFactory
的newThread
方法,如果没有指定就用默认的线程工厂DefaultThreadFactory
,注意这里newThread(this)
传入的this
为worker
实例,因为worker
也实现了Runnable
,所以当真正worker
中的线程调用start
方法启动的时候会执行Worker
中的run
方法)。 -
run
方法为当worker
中的线程启动的时候,即调用start
方法的时候就会调用run
方法,而run
方法里面是通过调用核心方法runWorker
。
execute方法
我们在线程池中提交任务时,就会调用execute
方法,该方法的源码如下:
//command 可以是普通的Runnable 实现类,也可以是 FutureTask
public void execute(Runnable command) {
//非空判断..
if (command == null)
throw new NullPointerException();
//获取ctl最新值赋值给c,ctl :高3位 表示线程池状态,低位表示当前线程池线程数量。
int c = ctl.get();
//workerCountOf(c) 获取出当前线程数量
//条件成立:表示当前线程数量小于核心线程数,此次提交任务,直接创建一个新的worker,对应线程池中多了一个新的线程。
if (workerCountOf(c) < corePoolSize) {
//addWorker 即为创建线程的过程,会创建worker对象,并且将command作为firstTask
//core == true 表示采用核心线程数量限制 false表示采用 maximumPoolSize
if (addWorker(command, true))
//创建成功后,直接返回。addWorker方法里面会启动新创建的worker,将firstTask执行。
return;
//执行到这条语句,说明addWorker一定是失败了...
//有几种可能呢??
//1.存在并发现象,execute方法是可能有多个线程同时调用的,当workerCountOf(c) < corePoolSize成立后,
//其它线程可能也成立了,并且向线程池中创建了worker。这个时候线程池中的核心线程数已经达到,所以...
//2.当前线程池状态发生改变了。 RUNNING SHUTDOWN STOP TIDYING TERMINATION
//当线程池状态是非RUNNING状态时,addWorker(firstTask!=null, true|false) 一定会失败。
//SHUTDOWN 状态下,也有可能创建成功。前提 firstTask == null 而且当前 queue 不为空。特殊情况。
c = ctl.get();
}
//执行到这里有几种情况?
//1.当前线程数量已经达到corePoolSize
//2.addWorker失败..
//条件成立:说明当前线程池处于running状态,则尝试将 task 放入到workQueue中。
if (isRunning(c) && workQueue.offer(command)) {
//执行到这里,说明offer提交任务成功了..
//再次获取ctl保存到recheck。
int recheck = ctl.get();
//条件一:! isRunning(recheck) 成立:说明你提交到队列之后,线程池状态被外部线程给修改 比如:shutdown() shutdownNow()
//这种情况 需要把刚刚提交的任务删除掉。
//条件二:remove(command) 有可能成功,也有可能失败
//成功:提交之后,线程池中的线程还未消费(处理)
//失败:提交之后,在shutdown() shutdownNow()之前,就被线程池中的线程 给处理。
if (! isRunning(recheck) && remove(command))
//提交之后线程池状态为 非running 且 任务出队成功,走个拒绝策略。
reject(command);
//有几种情况会到这里?
//1.当前线程池是running状态(这个概率最大)
//2.线程池状态是非running状态 但是remove提交的任务失败.
//担心 当前线程池是running状态,但是线程池中的存活线程数量是0,这个时候,如果是0的话,会很尴尬,任务没线程去跑了,
//这里其实是一个担保机制,保证线程池在running状态下,最起码得有一个线程在工作。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//执行到这里,有几种情况?
//1.offer方法失败
//2.当前线程池是非running状态
//1.offer失败,需要做什么? 说明当前queue 满了!这个时候 如果当前线程数量尚未达到maximumPoolSize的话,会创建新的worker直接执行command
//假设当前线程数量达到maximumPoolSize的话,这里也会失败,也走拒绝策略。
//2.线程池状态为非running状态,这个时候因为 command != null addWorker 一定是返回false。
else if (!addWorker(command, false))
reject(command);
}
复制代码
execute
方法的执行流程如下:
-
当线程池中的线程数量小于
corePoolSize
核心线程数时,则调用addWorker(command, true)
新创建一个worker
(线程)来执行新提交的command
(任务),core == true
表示采用核心线程数量限制false
表示采用maximumPoolSize
。 -
当线程池中的线程数量大于等于
corePoolSize
核心线程数时,且让任务队列还没满时,则将调用workQueue.offer(command)
将任务command
添加到任务队列中。- 将任务添加到队列之后,接着判断当前线程池的状态,如果不是
RUNNING
的话(! isRunning(recheck)
成立:说明你提交到队列之后,线程池状态被外部线程给修改 比如:shutdown() ,shutdownNow()
),则需要调用remove(command)
将任务从任务队列中删除,remove(command)
有可能成功,也有可能失败,成功:提交之后,线程池中的线程还未消费(处理);失败:提交之后,在shutdown()
shutdownNow()
之前,就被线程池中的线程 给处理。 - 如果调用
remove(command)
将任务从任务队列中删除成功的话,接着就会调用reject(command)
走拒绝策略了。 - 如果当前线程池的状态是
RUNNING
状态或者上面步骤将任务从任务队列中删除失败的话,接着判断当前线程池中的线程数是否等于0
,如果等于0
,则需要调用addWorker(null, false)
新创建一个worker
,这里其实是一个担保机制,保证线程池在RUNNING
状态下,最起码得有一个线程在工作。
- 将任务添加到队列之后,接着判断当前线程池的状态,如果不是
-
如果线程池中的线程数大于等于
corePoolSize
并且小于最大线程数maximumPoolSize
,且线程池内的阻塞队列已满,则调用addWorker(command, false)
创建并启动一个线程来执行新提交的任务。 -
如果上一个步骤在调用
addWorker(command, false)
发现 线程池中的线程数大于等于maximumPoolSize
,并且线程池内的阻塞队列已满,则addWorker(command, false)
会返回false
接着就会调用reject(command)
走拒绝策略了。
execute
方法的执行流程图如下:
addWorker方法
addWorker
方法主要的作用是创建一个worker
(线程)用于处理新提交的任务firstTask
,firstTask
用于指定先创建的worker
(线程)要执行的第一个任务,如果firstTask
为空,则新创建的worker
(线程会从任务队列中获取任务来执行。core
参数为true
表示在新增worker
(线程)时会判断当前活动线程数是否小于corePoolSize
,false
表示新增worker
(线程)前需要判断当前活动线程数是否小于maximumPoolSize
,该方法的源码如下:
//firstTask 可以为null,表示启动worker之后,worker自动到queue中获取任务.. 如果不是null,则worker优先执行firstTask
//core 采用的线程数限制 如果为true 采用 核心线程数限制 false采用 maximumPoolSize线程数限制.
//返回值总结:
//true 表示创建worker成功,且线程启动
//false 表示创建失败。
//1.线程池状态rs > SHUTDOWN (STOP/TIDYING/TERMINATION)
//2.rs == SHUTDOWN 但是队列中已经没有任务了 或者 当前状态是SHUTDOWN且队列未空,但是firstTask不为null
//3.当前线程池已经达到指定指标(coprePoolSize 或者 maximumPoolSIze)
//4.threadFactory 创建的线程是null
private boolean addWorker(Runnable firstTask, boolean core) {
//自旋 判断当前线程池状态是否允许创建线程的事情。
retry:
for (;;) {
//获取当前ctl值保存到c
int c = ctl.get();
//获取当前线程池运行状态 保存到rs中
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//条件一:rs >= SHUTDOWN 成立:说明当前线程池状态不是running状态
//条件二:前置条件,当前的线程池状态不是running状态 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
//rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
//表示:当前线程池状态是SHUTDOWN状态 & 提交的任务是空,addWorker这个方法可能不是execute调用的。 & 当前任务队列不是空
//排除掉这种情况,当前线程池是SHUTDOWN状态,但是队列里面还有任务尚未处理完,这个时候是允许添加worker,但是不允许再次提交task。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
//什么情况下回返回false?
//线程池状态 rs > SHUTDOWN
//rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask != null
return false;
//上面这些代码,就是判断 当前线程池状态 是否允许添加线程。
//内部自旋 获取创建线程令牌的过程。
for (;;) {
//获取当前线程池中线程数量 保存到wc中
int wc = workerCountOf(c);
//条件一:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多大的数字
//条件二:wc >= (core ? corePoolSize : maximumPoolSize)
//core == true ,判断当前线程数量是否>=corePoolSize,会拿核心线程数量做限制。
//core == false,判断当前线程数量是否>=maximumPoolSize,会拿最大线程数量做限制。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//执行到这里,说明当前无法添加线程了,已经达到指定限制了
return false;
//条件成立:说明记录线程数量已经加1成功,相当于申请到了一块令牌。
//条件失败:说明可能有其它线程,修改过ctl这个值了。
//可能发生过什么事?
//1.其它线程execute() 申请过令牌了,在这之前。导致CAS失败(即自己拿到的c已经是旧值了)
//2.外部线程可能调用过 shutdown() 或者 shutdownNow() 导致线程池状态发生变化了,咱们知道 ctl 高3位表示状态
//状态改变后,cas也会失败。
if (compareAndIncrementWorkerCount(c))
//进入到这里面,一定是cas成功啦!申请到令牌了
//直接跳出了 retry 外部这个for自旋。
break retry;
//CAS失败,没有成功的申请到令牌
//获取最新的ctl值
c = ctl.get(); // Re-read ctl
//判断当前线程池状态是否发生过变化,如果外部在这之前调用过shutdown. shutdownNow 会导致状态变化。
if (runStateOf(c) != rs)
//状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态,是否允许创建线程。
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//表示创建的worker是否已经启动,false未启动 true启动
boolean workerStarted = false;
//表示创建的worker是否添加到池子中了 默认false 未添加 true是添加。
boolean workerAdded = false;
//w表示后面创建worker的一个引用。
Worker w = null;
try {
//创建Worker,执行完后,线程应该是已经创建好了。
w = new Worker(firstTask);
//将新创建的worker节点的线程 赋值给 t
final Thread t = w.thread;
//为什么要做 t != null 这个判断?
//为了防止ThreadFactory 实现类有bug,因为ThreadFactory 是一个接口,谁都可以实现。
//万一哪个 小哥哥 脑子一热,有bug,创建出来的线程 是null、、
//Doug lea考虑的比较全面。肯定会防止他自己的程序报空指针,所以这里一定要做!
if (t != null) {
//将全局锁的引用保存到mainLock
final ReentrantLock mainLock = this.mainLock;
//持有全局锁,可能会阻塞,直到获取成功为止,同一时刻 操纵 线程池内部相关的操作,都必须持锁。
mainLock.lock();
//从这里加锁之后,其它线程 是无法修改当前线程池状态的。
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取最新线程池运行状态保存到rs中
int rs = runStateOf(ctl.get());
//条件一:rs < SHUTDOWN 成立:最正常状态,当前线程池为RUNNING状态.
//条件二:前置条件:当前线程池状态不是RUNNING状态。
//(rs == SHUTDOWN && firstTask == null) 当前状态为SHUTDOWN状态且firstTask为空。其实判断的就是SHUTDOWN状态下的特殊情况,
//只不过这里不再判断队列是否为空了
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//t.isAlive() 当线程start后,线程isAlive会返回true。
//防止脑子发热的程序员,ThreadFactory创建线程返回给外部之前,将线程start了。。
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将咱们创建的worker添加到线程池中。
workers.add(w);
//获取最新当前线程池线程数量
int s = workers.size();
//条件成立:说明当前线程数量是一个新高。更新largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
//表示线程已经追加进线程池中了。
workerAdded = true;
}
} finally {
//释放线程池全局锁。
mainLock.unlock();
}
//条件成立:说明 添加worker成功
//条件失败:说明线程池在lock之前,线程池状态发生了变化,导致添加失败。
if (workerAdded) {
//成功后,则将创建的worker启动,线程启动。
t.start();
//启动标记设置为true
workerStarted = true;
}
}
} finally {
//条件成立:! workerStarted 说明启动失败,需要做清理工作。
if (! workerStarted)
//失败时做什么清理工作?
//1.释放令牌
//2.将当前worker清理出workers集合
addWorkerFailed(w);
}
//返回新创建的线程是否启动。
return workerStarted;
}
复制代码
addWorker
方法的执行流程如下:
- 首先根据线程池状态来判断是否需要创建
worker
(线程),当线程池的状态大于SHUTDOWN
,即线程池处于STOP,TIDYING,TERMINATED
时(处于这些状态线程池会中断正在处理任务的线程并且不在接收新的任务和处理任务队列中的任务,会将线程池中的线程依次退出)或者当线程池处于SHUTDOWN
,但是任务队列为空(因为线程池处于SHUTDOWN
状态时,只是不再接收新的提交任务,但是会继续处理完任务队列中的任务,如果任务队列为空的话,也就没有必要增加worker
线程了)或者当线程池处于SHUTDOWN
且firstTask
不为空,判断的代码如下:
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
//什么情况下回返回false?
//线程池状态 rs > SHUTDOWN
//rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask != null
return false;
复制代码
- 接着,如果满足如下条件也不会创建
woker
:①当前线程池中的线程数大于CAPACITY
(CAPACITY
等于5亿多,一般这个肯定不会大于);②根据传入的core
参数(是否根据核心线程数来判断),判断是否大于核心线程数corePoolSize
或者最大线程数maximumPoolSize
,代码如下:
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//执行到这里,说明当前无法添加线程了,已经达到指定限制了
return false;
复制代码
- 然后接着通过
cas
修改线程池中的线程数的值加一,即ctl
的低29
位加一,接着通过w = new Worker(firstTask)
(底层通过上面的ThreadFactory
的newThread
方法)创建worker
(线程)。 - 最后获取到全局锁,将新创建的
worker
添加到workers
中,释放全部锁,将新创建的worker
中封装的线程调用start
方法启动。线程启动后,最终会执行worker
实例中的run
方法。
runWorker方法
通过上面我们知道,当worker
中的线程启动后,会执行worker
实例中的run
方法,而run
方法中真正调用的是runWorker
方法,该方法源码如下:
//w 就是启动worker
final void runWorker(Worker w) {
//wt == w.thread
Thread wt = Thread.currentThread();
//将初始执行task赋值给task
Runnable task = w.firstTask;
//清空当前w.firstTask引用
w.firstTask = null;
//这里为什么先调用unlock? 就是为了初始化worker state == 0(一开始为-1) 和 exclusiveOwnerThread ==null
w.unlock(); // allow interrupts
//是否是突然退出,true->发生异常了,当前线程是突然退出,回头需要做一些处理
//false->正常退出。
boolean completedAbruptly = true;
try {
//条件一:task != null 指的就是firstTask是不是null,如果不是null,直接执行循环体里面。
//条件二:(task = getTask()) != null 条件成立:说明当前线程在queue中获取任务成功,getTask这个方法是一个会阻塞线程的方法
//getTask如果返回null,当前线程需要执行结束逻辑。
while (task != null || (task = getTask()) != null) {
//worker设置独占锁 为当前线程
//为什么要设置独占锁呢?shutdown时会判断当前worker状态,根据独占锁是否空闲来判断当前worker是否正在工作。
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//条件一:runStateAtLeast(ctl.get(), STOP) 说明线程池目前处于STOP/TIDYING/TERMINATION 此时线程一定要给它一个中断信号
//条件一成立:runStateAtLeast(ctl.get(), STOP)&& !wt.isInterrupted()
//上面如果成立:说明当前线程池状态是>=STOP 且 当前线程是未设置中断状态的,此时需要进入到if里面,给当前线程一个中断。
//假设:runStateAtLeast(ctl.get(), STOP) == false
// (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) 在干吗呢?
// Thread.interrupted() 获取当前中断状态,且设置中断位为false。连续调用两次,这个interrupted()方法 第二次一定是返回false.
// runStateAtLeast(ctl.get(), STOP) 大概率这里还是false.
// 其实它在强制刷新当前线程的中断标记位 false,因为有可能上一次执行task时,业务代码里面将当前线程的中断标记位 设置为了 true,且没有处理
// 这里一定要强制刷新一下。不会再影响到后面的task了。
//假设:Thread.interrupted() == true 且 runStateAtLeast(ctl.get(), STOP)) == true
//这种情况有发生几率么?
//有可能,因为外部线程在 第一次 (runStateAtLeast(ctl.get(), STOP) == false 后,有机会调用shutdown 、shutdownNow方法,将线程池状态修改
//这个时候,也会将当前线程的中断标记位 再次设置回 中断状态。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//钩子方法,留给子类实现的
beforeExecute(wt, task);
//表示异常情况,如果thrown不为空,表示 task运行过程中 向上层抛出异常了。
Throwable thrown = null;
try {
//task 可能是FutureTask 也可能是 普通的Runnable接口实现类。
//如果前面是通过submit()提交的 runnable/callable 会被封装成 FutureTask。这个不清楚,请看上一期,在b站。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//钩子方法,留给子类实现的
afterExecute(task, thrown);
}
} finally {
//将局部变量task置为Null
task = null;
//更新worker完成任务数量
w.completedTasks++;
//worker处理完一个任务后,会释放掉独占锁
//1.正常情况下,会再次回到getTask()那里获取任务 while(getTask...)
//2.task.run()时内部抛出异常了..
w.unlock();
}
}
//什么情况下,会来到这里?
//getTask()方法返回null时,说明当前线程应该执行退出逻辑了。
completedAbruptly = false;
} finally {
//task.run()内部抛出异常时,直接从 w.unlock() 那里 跳到这一行。
//正常退出 completedAbruptly == false
//异常退出 completedAbruptly == true
processWorkerExit(w, completedAbruptly);
}
}
复制代码
该方法的执行流程如下:
-
首先通过
w.unlock()
将AQS
中的state
的值修改为0
(在创建worker
的时候,通过Worker
构造方法中的setState(-1)
方法将state
的值初始化为-1
)。 -
接着进入
while
循环,如果firstTask
不为空的话,当前worker
先执行firstTask
中的任务,如果firstTask
为空的话,当前worker
就会从任务队列workQueue
中获取任务来执行,直到任务队列中没有任务。 -
执行任务的过程中,如果当前线程池的状态变成
STOP
,则会将会给当前线程一个中断信息,即wt.interrupt()
,真正执行任务是这行代码task.run()
。执行完任务后,会将当前worker
中的已完成任务数completedTasks
加一。 -
最后如果执行任务的时候出现异常,则最终执行
processWorkerExit(w, completedAbruptly)
,下面再分析该方法。
getTask方法
由上面的分析可知getTask
方法是线程用来从阻塞队列workQueue
中获取任务的,源码如下:
//什么情况下会返回null?
//1.rs >= STOP 成立说明:当前的状态最低也是STOP状态,一定要返回null了
//2.前置条件 状态是 SHUTDOWN ,workQueue.isEmpty()
//3.线程池中的线程数量 超过 最大限制时,会有一部分线程返回Null
//4.线程池中的线程数超过corePoolSize时,会有一部分线程 超时后,返回null。
private Runnable getTask() {
//表示当前线程获取任务是否超时 默认false true表示已超时
boolean timedOut = false; // Did the last poll() time out?
//自旋
for (;;) {
//获取最新ctl值保存到c中。
int c = ctl.get();
//获取线程池当前运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//条件一:rs >= SHUTDOWN 条件成立:说明当前线程池是非RUNNING状态,可能是 SHUTDOWN/STOP....
//条件二:(rs >= STOP || workQueue.isEmpty())
//2.1:rs >= STOP 成立说明:当前的状态最低也是STOP状态,一定要返回null了
//2.2:前置条件 状态是 SHUTDOWN ,workQueue.isEmpty()条件成立:说明当前线程池状态为SHUTDOWN状态 且 任务队列已空,此时一定返回null。
//返回null,runWorker方法就会将返回Null的线程执行线程退出线程池的逻辑。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//使用CAS+死循环的方式让 ctl值 -1
decrementWorkerCount();
return null;
}
//执行到这里,有几种情况?
//1.线程池是RUNNING状态
//2.线程池是SHUTDOWN状态 但是队列还未空,此时可以创建线程。
//获取线程池中的线程数量
int wc = workerCountOf(c);
// Are workers subject to culling?
//timed == true 表示当前这个线程 获取 task 时 是支持超时机制的,使用queue.poll(xxx,xxx); 当获取task超时的情况下,下一次自旋就可能返回null了。
//timed == false 表示当前这个线程 获取 task 时 是不支持超时机制的,当前线程会使用 queue.take();
//queue.poll和queue.take()都会阻塞当前线程
//情况1:allowCoreThreadTimeOut == true 表示核心线程数量内的线程 也可以被回收。
//所有线程 都是使用queue.poll(xxx,xxx) 超时机制这种方式获取task.
//情况2:allowCoreThreadTimeOut == false 表示当前线程池会维护核心数量内的线程。
//wc > corePoolSize
//条件成立:当前线程池中的线程数量是大于核心线程数的,此时让所有路过这里的线程,都是用poll 支持超时的方式去获取任务,
//这样,就会可能有一部分线程获取不到任务,获取不到任务 返回Null,然后..runWorker会执行线程退出逻辑。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//条件一:(wc > maximumPoolSize || (timed && timedOut))
//1.1:wc > maximumPoolSize 为什么会成立?setMaximumPoolSize()方法,可能外部线程将线程池最大线程数设置为比初始化时的要小
//1.2: (timed && timedOut) 条件成立:前置条件,当前线程使用 poll方式获取task。上一次循环时 使用poll方式获取任务时,超时了
//条件一 为true 表示 线程可以被回收,达到回收标准,当确实需要回收时再回收。
//条件二:(wc > 1 || workQueue.isEmpty())
//2.1: wc > 1 条件成立,说明当前线程池中还有其他线程,当前线程可以直接回收,返回null
//2.2: workQueue.isEmpty() 前置条件 wc == 1, 条件成立:说明当前任务队列 已经空了,最后一个线程,也可以放心的退出。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//使用CAS机制 将 ctl值 -1 ,减1成功的线程,返回null
//CAS成功的,返回Null
//CAS失败? 为什么会CAS失败?
//1.其它线程先你一步退出了
//2.线程池状态发生变化了。
if (compareAndDecrementWorkerCount(c))
return null;
//再次自旋时,timed有可能就是false了,因为当前线程cas失败,很有可能是因为其它线程成功退出导致的,再次咨询时
//检查发现,当前线程 就可能属于 不需要回收范围内了。
continue;
}
try {
//获取任务的逻辑
//从任务队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//条件成立:返回任务
if (r != null)
return r;
//说明当前线程超时了...
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
复制代码
该方法的主要的执行流程如下:
-
首先如果当前线程池的状态为
SHUTDOWN
并且任务队列workQueue
为空的话或者线程池的状态大于等于STOP
状态,则此时会将线程池中线程数量加一,并且返回null
,之后会在runWorker
中通过processWorkerExit
方法处理回收线程逻辑。 -
然后通过
timeed
标识(如果从workQueue
获取任务超过创建线程池时指定的keepAliveTime
的值的话,会返回null
,之后会在runWorker
中通过processWorkerExit
方法处理回收线程逻辑),否则通过workQueue.take()
来获取任务队列中的任务返回。
processWorkerExit方法
由上面我们知道processWorkerExit
方法用于处理回收线程池中的worker
(线程)逻辑,源码如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//条件成立:代表当前w 这个worker是发生异常退出的,task任务执行过程中向上抛出异常了..
//异常退出时,ctl计数,并没有-1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//获取线程池的全局锁引用
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
//将当前worker完成的task数量,汇总到线程池的completedTaskCount
completedTaskCount += w.completedTasks;
//将worker从池子中移除..
workers.remove(w);
} finally {
//释放全局锁
mainLock.unlock();
}
//根据线程池状态进行判断是否结束线程池
tryTerminate();
//获取最新ctl值
int c = ctl.get();
//条件成立:当前线程池状态为 RUNNING 或者 SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
//条件成立:当前线程是正常退出..
if (!completedAbruptly) {
//min表示线程池最低持有的线程数量
//allowCoreThreadTimeOut == true => 说明核心线程数内的线程,也会超时被回收。 min == 0
//allowCoreThreadTimeOut == false => min == corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//线程池状态:RUNNING SHUTDOWN
//条件一:假设min == 0 成立
//条件二:! workQueue.isEmpty() 说明任务队列中还有任务,最起码要留一个线程。
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//条件成立:线程池中还拥有足够的线程。
//考虑一个问题: workerCountOf(c) >= min => (0 >= 0) ?
//有可能!
//什么情况下? 当线程池中的核心线程数是可以被回收的情况下,会出现这种情况,这种情况下,当前线程池中的线程数 会变为0
//下次再提交任务时,会再创建线程。
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//1.当前线程在执行task时 发生异常,这里一定要创建一个新worker顶上去。
//2.!workQueue.isEmpty() 说明任务队列中还有任务,最起码要留一个线程。 当前状态为 RUNNING || SHUTDOWN
//3.当前线程数量 < corePoolSize值,此时会创建线程,维护线程池数量在corePoolSize个。
addWorker(null, false);
}
}
复制代码
该方法的主要执行流程如下:
-
首先获取全局锁,将需要移除的
worker
中已完成的任务数累加到线程池已完成任务数completedTaskCount
中,接着worker
从workers
中移除。 -
接着调用
tryTerminate
方法,根据线程池状态进行判断是否结束线程池。下面再说该方法。 -
最后会加一个担保机制,如果当前线程池小于
STOP
,即处于RUNNING
或者SHUTWODN
时,任务队列中还有任务,此时需要保证线程池中至少一个线程,即执行addWorker(null, false)
该行代码添加worker
。
tryTerminate方法
由上面可知tryTerminate
方法用于线程池状态进行判断是否结束线程池,该方法源码如下:
final void tryTerminate() {
//自旋
for (;;) {
//获取最新ctl值
int c = ctl.get();
//条件一:isRunning(c) 成立,直接返回就行,线程池很正常!
//条件二:runStateAtLeast(c, TIDYING) 说明 已经有其它线程 在执行 TIDYING -> TERMINATED状态了,当前线程直接回去。
//条件三:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())
//SHUTDOWN特殊情况,如果是这种情况,直接回去。得等队列中的任务处理完毕后,再转化状态。
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//什么情况会执行到这里?
//1.线程池状态 >= STOP
//2.线程池状态为 SHUTDOWN 且 队列已经空了
//条件成立:当前线程池中的线程数量 > 0
if (workerCountOf(c) != 0) { // Eligible to terminate
//中断一个空闲线程。
//空闲线程,在哪空闲呢? queue.take() | queue.poll()
//1.唤醒后的线程 会在getTask()方法返回null
//2.执行退出逻辑的时候会再次调用tryTerminate() 唤醒下一个空闲线程
//3.因为线程池状态是 (线程池状态 >= STOP || 线程池状态为 SHUTDOWN 且 队列已经空了) 最终调用addWorker时,会失败。
//最终空闲线程都会在这里退出,非空闲线程 当执行完当前task时,也会调用tryTerminate方法,有可能会走到这里。
interruptIdleWorkers(ONLY_ONE);
return;
}
//执行到这里的线程是谁?
//workerCountOf(c) == 0 时,会来到这里。
//最后一个退出的线程。 咱们知道,在 (线程池状态 >= STOP || 线程池状态为 SHUTDOWN 且 队列已经空了)
//线程唤醒后,都会执行退出逻辑,退出过程中 会 先将 workerCount计数 -1 => ctl -1。
//调用tryTerminate 方法之前,已经减过了,所以0时,表示这是最后一个退出的线程了。
final ReentrantLock mainLock = this.mainLock;
//获取线程池全局锁
mainLock.lock();
try {
//设置线程池状态为TIDYING状态。
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//调用钩子方法
terminated();
} finally {
//设置线程池状态为TERMINATED状态。
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用 awaitTermination() 方法的线程。
termination.signalAll();
}
return;
}
} finally {
//释放线程池全局锁。
mainLock.unlock();
}
// else retry on failed CAS
}
}
复制代码
该方法的主要执行流程如下:
-
首先如果满足当前线程池处于
RUNNING
状态,或者大于等于TIDYING
(说明已经有其它线程在执行TIDYING -> TERMINATED
状态了,当前线程直接回去)或者处于SHUTDOWN
并且任务队列workQuue
中还有任务,就直接返回,不执行任何逻辑。 -
如果不满足上面条件的话,说明当前线程池符合需要结束状态,接着如果当前线程池中的线程数不等于
0
,就会调用interruptIdleWorkers(ONLY_ONE)
给所有的线程一个中断信号。 -
当线程池中的线程数量等于
0
时,就会执行ctl.compareAndSet(c, ctlOf(TIDYING, 0))
将线程池的状态改为TIDYING
,线程池的数量改为0
,接着调用钩子方法(空方法)terminated
,最后将线程池的状态修改为TERMINATED
。
shutDown方法
我们知道当线程池处于RUNNING
状态时,调用shutDown
方***将线程池的状态修改为SHUTWODN
状态,该方法源码如下:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//获取线程池全局锁
mainLock.lock();
try {
//不管这行
checkShutdownAccess();
//设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断空闲线程
interruptIdleWorkers();
//空方法,子类可以扩展
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
//释放线程池全局锁
mainLock.unlock();
}
//回头说..
tryTerminate();
}
/**
* 设置线程池状态为SHUTDOWN
*/
private void advanceRunState(int targetState) {
//自旋
for (;;) {
int c = ctl.get();
//条件成立:假设targetState == SHUTDOWN,说明 当前线程池状态是 >= SHUTDOWN
//条件不成立:假设targetState == SHUTDOWN ,说明当前线程池状态是RUNNING。
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
//onlyOne == true 说明只中断一个线程 ,false 则中断所有线程
//共同前提,worker是空闲状态。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
//持有全局锁
mainLock.lock();
try {
//迭代所有worker
for (Worker w : workers) {
//获取当前worker的线程 保存到t
Thread t = w.thread;
//条件一:条件成立:!t.isInterrupted() == true 说明当前迭代的这个线程尚未中断。
//条件二:w.tryLock() 条件成立:说明当前worker处于空闲状态,可以去给它一个中断信号。 目前worker内的线程 在 queue.take() | queue.poll()
//阻塞中。因为worker执行task时,是加锁的!
if (!t.isInterrupted() && w.tryLock()) {
try {
//给当前线程中断信号..处于queue阻塞的线程,会被唤醒,唤醒后,进入下一次自旋时,可能会return null。执行退出相关的逻辑。
t.interrupt();
} catch (SecurityException ignore) {
} finally {
//释放worker的独占锁。
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
//释放全局锁。
mainLock.unlock();
}
}
复制代码
直接看上面的注释吧,不再细说了。
shutDownNow方法
我们知道调用shutDownNow
方***将线程池的状态修改为STOP
状态,源码如下:
public List<Runnable> shutdownNow() {
//返回值引用
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
//获取线程池全局锁
mainLock.lock();
try {
checkShutdownAccess();
//设置线程池状态为STOP
advanceRunState(STOP);
//中断线程池中所有线程
interruptWorkers();
//导出未处理的task
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
//返回当前任务队列中 未处理的任务。
return tasks;
}
/**
* 将线程池的状态修改为STOP
*/
private void advanceRunState(int targetState) {
//自旋
for (;;) {
int c = ctl.get();
//条件成立:假设targetState == SHUTDOWN,说明 当前线程池状态是 >= SHUTDOWN
//条件不成立:假设targetState == SHUTDOWN ,说明当前线程池状态是RUNNING。
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
/**
* 给线程池中所有的线程,不管式正在处理任务还是空闲的线程一个中断信号
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
//获取线程池全局锁
mainLock.lock();
try {
//遍历所有worker
for (Worker w : workers)
//interruptIfStarted() 如果worker内的thread 是启动状态,则给它一个中断信号。。
w.interruptIfStarted();
} finally {
//释放线程池全局锁
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
复制代码
直接看上面的注释吧,不再细说了。
作者:努力更文的小白
链接:https://juejin.cn/post/7051572669439246367