Java线程池原理剖析
一、写在前面
1.1 线程池是什么
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
而本文描述线程池是JDK中提供的ThreadPoolExecutor类。
当然,使用线程池可以带来一系列好处:
**降低资源消耗**:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
**提高响应速度**:任务到达时,无需等待线程创建即可立即执行。
**提高线程的可管理性**:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
**提供更多更强大的功能**:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
二、ThreadPoolExecutor核心方法
(结合源码分析,一次没看过的先看看源码和注释,否则没法理解)
1.1 构造方法
七大核心参数,不再详细阐述,想提高的同学需要着重关注的是:不同的阻塞队列/线程工厂/拒绝策略如何根据不同的业务场景进行选择
1.2 基础方法和变量
这些变量和方法是理解线程池最重要的入口
1.3 execute方法
1.3.1 说明
1.3.2 源码分析
1.4 addWorker(Runnable firstTask, boolean core)
1.4.1 说明
firstTask:工作线程要执行的第一个任务
core:是否为核心线程
思想:创建工作线程并执行任务
1.4.2 源码分析
代码比较长,先说概要设计
第一步:通过CAS算法,将线程数+1(失败则自旋,直到成功);思考:为啥用用for(;;)取代while(true)做自旋?
第二步:执行任务(由Worker对象持有,对象维护在HashSet中)
1.5 runWorker(Worker w)
1.5.1 说明
w:持有工作线程和任务的对象,是ThreadPoolExecutor的内部类
思想:1、获取任务(getTask()方法) 2、执行(就是调用Runnable的run方法)
1.5.2 源码分析
这个方法的
1.6 getTask()
1.6.1 说明
思想:不断的从阻塞队列中获取任务
1.6.2 源码分析
四、吐槽点
1.1 牛客的这个编辑器做的真不咋地,感觉灵活度好低啊。。。
1.2 线程池的原理懂了以后,如何做到动态配置和动态扩容/缩容,自己可多了解下,毕竟线上运行的项目,线程池参数不可能写死在代码里
1.3 参考了美团线程池的文章
1.4 文章字数不多,重点都写在了注释里面,会定期维护这篇文章,欢迎指正
五、后期展望
AQS框架的实现原理
#Java开发##Java##学习路径#
1.1 线程池是什么
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
而本文描述线程池是JDK中提供的ThreadPoolExecutor类。
当然,使用线程池可以带来一系列好处:
**降低资源消耗**:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
**提高响应速度**:任务到达时,无需等待线程创建即可立即执行。
**提高线程的可管理性**:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
**提供更多更强大的功能**:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
1.2 Java线程池总体设计图
二、ThreadPoolExecutor核心方法
(结合源码分析,一次没看过的先看看源码和注释,否则没法理解)
1.1 构造方法
七大核心参数,不再详细阐述,想提高的同学需要着重关注的是:不同的阻塞队列/线程工厂/拒绝策略如何根据不同的业务场景进行选择
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,//存活时间-从阻塞队列获取任务的最长等待时间(没有获取到,消除其在Hash表的引用,等待JVM垃圾回收) TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
1.2 基础方法和变量
这些变量和方法是理解线程池最重要的入口
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//利用原子类的CAS算法维护ctl的值 private static final int COUNT_BITS = Integer.SIZE - 3;//32位,高3位保存线程池状态,低29位保存有效线程数(优点需要自己思考一下) // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; }//计算线程池状态 private static int workerCountOf(int c) { return c & CAPACITY; }//计算有效线程数 private static int ctlOf(int rs, int wc) { return rs | wc; }//将线程池状态和有效线程数进行位运算,转成32位
1.3 execute方法
1.3.1 说明
这是线程池的核心执行方法,先来看看总体设计(这张图不完全正确,参考了其他文献)
/* * 该方法宏观上分为三步 */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 第一步:判断线程数是否小于核心线程数,是则执行addWorker(方法解析放在后面,先记下这个方法) */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } /* * 第二步:判断线程池状态是否处在RUNNING(线程池不同状态对任务调度的影响,自己学习,很重要),是则将任务添加到阻塞队列 */ if (isRunning(c) && workQueue.offer(command)) {//这里的逻辑叫double-check,线程池状态可能发生变化或者线程数=0(这种情况出现在allowCoreThreadTimeOut = true的时候,参数含义自己搜索) int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false);//先自己思考为啥有这一步!!! } /* * 第三步:尝试创建非核心线程执行任务 */ else if (!addWorker(command, false))//若失败,说明线程池状态变更或者线程数超出限制 reject(command);//这种情况出现在线程数已经超过了最大线程数,那么执行拒绝策略(4种,默认抛出异常策略) }
1.4 addWorker(Runnable firstTask, boolean core)
1.4.1 说明
firstTask:工作线程要执行的第一个任务
core:是否为核心线程
思想:创建工作线程并执行任务
1.4.2 源码分析
代码比较长,先说概要设计
第一步:通过CAS算法,将线程数+1(失败则自旋,直到成功);思考:为啥用用for(;;)取代while(true)做自旋?
第二步:执行任务(由Worker对象持有,对象维护在HashSet中)
private boolean addWorker(Runnable firstTask, boolean core) { //环节1:CAS算法维护ctl值 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))//执行CAS算法,如果成功,则目的达到,break,进入环节2;失败则自旋转 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } //环节2:执行任务 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);//Worker是Runnbale接口的实现类,同时实现了AQS接口 final Thread t = w.thread;//所以这里拿到Worker对象持有的Thread对象,线程启动后,执行的就是Worker对象的run方法,从而引出后面的runWorker()方法 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//同步执行 try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//启动线程,执行Worker对象的runWorker方法 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
1.5 runWorker(Worker w)
1.5.1 说明
w:持有工作线程和任务的对象,是ThreadPoolExecutor的内部类
思想:1、获取任务(getTask()方法) 2、执行(就是调用Runnable的run方法)
1.5.2 源码分析
这个方法的
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 思考这里为什么调一下这个方法 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {//不断从队列获取任务:getTask()方法后面解析 w.lock(); // 判断线程池状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);//前置方法,方便扩展、定制(类似Spring的后置处理器BeanPostProcessor) Throwable thrown = null; try { task.run();//执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//后置方法,方便扩展、定制 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly);//获取不到任务了,消除自身引用。还会后置判断队列中是否还有任务(防止最后一个线程消除后,队列中还有任务),如果有,创建线程去执行 } }
1.6 getTask()
1.6.1 说明
思想:不断的从阻塞队列中获取任务
1.6.2 源码分析
private Runnable getTask() { boolean timedOut = false; // 记录最后一次从队列获取任务是否超时 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判断线程池和队列状态 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount();//CAS维护ctl值 return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//allowCoreThreadTimeOut=true时,核心线程数在空闲等待时间过后,也是可回收的,默认false if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //从队列中获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true;//超时了,说明没获取到任务,记录下来 } catch (InterruptedException retry) { timedOut = false; } } }
三、思考题答案
Q:
A:这一步的总体逻辑是,将任务提交到阻塞队列。最后还要判断一下工作线程数是否为0,是防止allowCoreThreadTimeOut=true的情况,因为这时核心线程获取不到任务之后也会被回收,那么队列中的任务就需要新起一个线程去消费,为什么这里的AddWorker的参数设置为null呢,因为为null时,才会去队列中取任务(getTask方法),
Q:这里为什么要调用一下解锁?
A:是因为这个worker还没开始执行,我们允许它被打断的,否则在自旋获取锁被阻塞的时候被打断会抛出异常,干扰到原来的正常逻辑
1.1 牛客的这个编辑器做的真不咋地,感觉灵活度好低啊。。。
1.2 线程池的原理懂了以后,如何做到动态配置和动态扩容/缩容,自己可多了解下,毕竟线上运行的项目,线程池参数不可能写死在代码里
1.3 参考了美团线程池的文章
1.4 文章字数不多,重点都写在了注释里面,会定期维护这篇文章,欢迎指正
五、后期展望
AQS框架的实现原理
#Java开发##Java##学习路径#