Java线程池原理剖析
一、写在前面
1.1 线程池是什么
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
而本文描述线程池是JDK中提供的ThreadPoolExecutor类。
当然,使用线程池可以带来一系列好处:
**降低资源消耗**:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
**提高响应速度**:任务到达时,无需等待线程创建即可立即执行。
**提高线程的可管理性**:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
**提供更多更强大的功能**:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

二、ThreadPoolExecutor核心方法
(结合源码分析,一次没看过的先看看源码和注释,否则没法理解)
1.1 构造方法
七大核心参数,不再详细阐述,想提高的同学需要着重关注的是:不同的阻塞队列/线程工厂/拒绝策略如何根据不同的业务场景进行选择
1.2 基础方法和变量
这些变量和方法是理解线程池最重要的入口
1.3 execute方法
1.3.1 说明

1.3.2 源码分析
1.4 addWorker(Runnable firstTask, boolean core)
1.4.1 说明
firstTask:工作线程要执行的第一个任务
core:是否为核心线程
思想:创建工作线程并执行任务
1.4.2 源码分析
代码比较长,先说概要设计
第一步:通过CAS算法,将线程数+1(失败则自旋,直到成功);思考:为啥用用for(;;)取代while(true)做自旋?
第二步:执行任务(由Worker对象持有,对象维护在HashSet中)
1.5 runWorker(Worker w)
1.5.1 说明
w:持有工作线程和任务的对象,是ThreadPoolExecutor的内部类
思想:1、获取任务(getTask()方法) 2、执行(就是调用Runnable的run方法)
1.5.2 源码分析
这个方法的
1.6 getTask()
1.6.1 说明
思想:不断的从阻塞队列中获取任务
1.6.2 源码分析
四、吐槽点
1.1 牛客的这个编辑器做的真不咋地,感觉灵活度好低啊。。。
1.2 线程池的原理懂了以后,如何做到动态配置和动态扩容/缩容,自己可多了解下,毕竟线上运行的项目,线程池参数不可能写死在代码里
1.3 参考了美团线程池的文章
1.4 文章字数不多,重点都写在了注释里面,会定期维护这篇文章,欢迎指正
五、后期展望
AQS框架的实现原理
#Java开发##Java##学习路径#
1.1 线程池是什么
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
而本文描述线程池是JDK中提供的ThreadPoolExecutor类。
当然,使用线程池可以带来一系列好处:
**降低资源消耗**:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
**提高响应速度**:任务到达时,无需等待线程创建即可立即执行。
**提高线程的可管理性**:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
**提供更多更强大的功能**:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
1.2 Java线程池总体设计图
二、ThreadPoolExecutor核心方法
(结合源码分析,一次没看过的先看看源码和注释,否则没法理解)
1.1 构造方法
七大核心参数,不再详细阐述,想提高的同学需要着重关注的是:不同的阻塞队列/线程工厂/拒绝策略如何根据不同的业务场景进行选择
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,//存活时间-从阻塞队列获取任务的最长等待时间(没有获取到,消除其在Hash表的引用,等待JVM垃圾回收)
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
} 1.2 基础方法和变量
这些变量和方法是理解线程池最重要的入口
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//利用原子类的CAS算法维护ctl的值
private static final int COUNT_BITS = Integer.SIZE - 3;//32位,高3位保存线程池状态,低29位保存有效线程数(优点需要自己思考一下)
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }//计算线程池状态
private static int workerCountOf(int c) { return c & CAPACITY; }//计算有效线程数
private static int ctlOf(int rs, int wc) { return rs | wc; }//将线程池状态和有效线程数进行位运算,转成32位 1.3 execute方法
1.3.1 说明
这是线程池的核心执行方法,先来看看总体设计(这张图不完全正确,参考了其他文献)
/*
* 该方法宏观上分为三步
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 第一步:判断线程数是否小于核心线程数,是则执行addWorker(方法解析放在后面,先记下这个方法)
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/*
* 第二步:判断线程池状态是否处在RUNNING(线程池不同状态对任务调度的影响,自己学习,很重要),是则将任务添加到阻塞队列
*/
if (isRunning(c) && workQueue.offer(command)) {//这里的逻辑叫double-check,线程池状态可能发生变化或者线程数=0(这种情况出现在allowCoreThreadTimeOut = true的时候,参数含义自己搜索)
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//先自己思考为啥有这一步!!!
}
/*
* 第三步:尝试创建非核心线程执行任务
*/
else if (!addWorker(command, false))//若失败,说明线程池状态变更或者线程数超出限制
reject(command);//这种情况出现在线程数已经超过了最大线程数,那么执行拒绝策略(4种,默认抛出异常策略)
} 1.4 addWorker(Runnable firstTask, boolean core)
1.4.1 说明
firstTask:工作线程要执行的第一个任务
core:是否为核心线程
思想:创建工作线程并执行任务
1.4.2 源码分析
代码比较长,先说概要设计
第一步:通过CAS算法,将线程数+1(失败则自旋,直到成功);思考:为啥用用for(;;)取代while(true)做自旋?
第二步:执行任务(由Worker对象持有,对象维护在HashSet中)
private boolean addWorker(Runnable firstTask, boolean core) {
//环节1:CAS算法维护ctl值
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//执行CAS算法,如果成功,则目的达到,break,进入环节2;失败则自旋转
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//环节2:执行任务
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//Worker是Runnbale接口的实现类,同时实现了AQS接口
final Thread t = w.thread;//所以这里拿到Worker对象持有的Thread对象,线程启动后,执行的就是Worker对象的run方法,从而引出后面的runWorker()方法
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//同步执行
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//启动线程,执行Worker对象的runWorker方法
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
} 1.5 runWorker(Worker w)
1.5.1 说明
w:持有工作线程和任务的对象,是ThreadPoolExecutor的内部类
思想:1、获取任务(getTask()方法) 2、执行(就是调用Runnable的run方法)
1.5.2 源码分析
这个方法的
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 思考这里为什么调一下这个方法
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//不断从队列获取任务:getTask()方法后面解析
w.lock();
// 判断线程池状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//前置方法,方便扩展、定制(类似Spring的后置处理器BeanPostProcessor)
Throwable thrown = null;
try {
task.run();//执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//后置方法,方便扩展、定制
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务了,消除自身引用。还会后置判断队列中是否还有任务(防止最后一个线程消除后,队列中还有任务),如果有,创建线程去执行
}
} 1.6 getTask()
1.6.1 说明
思想:不断的从阻塞队列中获取任务
1.6.2 源码分析
private Runnable getTask() {
boolean timedOut = false; // 记录最后一次从队列获取任务是否超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断线程池和队列状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//CAS维护ctl值
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//allowCoreThreadTimeOut=true时,核心线程数在空闲等待时间过后,也是可回收的,默认false
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//从队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;//超时了,说明没获取到任务,记录下来
} catch (InterruptedException retry) {
timedOut = false;
}
}
} 三、思考题答案
Q:
A:这一步的总体逻辑是,将任务提交到阻塞队列。最后还要判断一下工作线程数是否为0,是防止allowCoreThreadTimeOut=true的情况,因为这时核心线程获取不到任务之后也会被回收,那么队列中的任务就需要新起一个线程去消费,为什么这里的AddWorker的参数设置为null呢,因为为null时,才会去队列中取任务(getTask方法),
Q:
这里为什么要调用一下解锁?
A:是因为这个worker还没开始执行,我们允许它被打断的,否则在自旋获取锁被阻塞的时候被打断会抛出异常,干扰到原来的正常逻辑
1.1 牛客的这个编辑器做的真不咋地,感觉灵活度好低啊。。。
1.2 线程池的原理懂了以后,如何做到动态配置和动态扩容/缩容,自己可多了解下,毕竟线上运行的项目,线程池参数不可能写死在代码里
1.3 参考了美团线程池的文章
1.4 文章字数不多,重点都写在了注释里面,会定期维护这篇文章,欢迎指正
五、后期展望
AQS框架的实现原理
#Java开发##Java##学习路径#
查看16道真题和解析