线程池的线程复用原理
前言
线程池有核心线程和非核心线程之分:
- 核心线程是一直存活在线程池中的
- 非核心线程是在执行完任务之后超时销毁的
当Thread执行完Runnable任务之后就会销毁,而且就算执行完任务之后把线程挂起也没有办法再去执行其他任务,那线程池是如何做到核心线程复用的呢?
首先来看一下执行线程任务的方法,实际上就是根据工作线程数量去执行不同的策略
execute()
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); /* * 如果当前活跃线程数小于核心线程数,就会添加一个worker来执行任务; * 具体来说,新建一个核心线程放入线程池中,并把任务添加到该线程中。 */ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //程序执行到这里,说明要么活跃线程数大于核心线程数;要么addWorker()失败 /* * 如果当前线程池是运行状态,会把任务添加到队列 */ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //程序执行到这里,说明要么线程状态不是RUNNING;要么workQueue队列已经满了 //调用addWorker方法去创建非核心线程, //如果当前线程数已经达到 maximumPoolSize,执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
里面分成了3种情况,但是都会执行addWoker()方法
这个方法就和名字一样,添加一个工人来完成任务;而这个工人就是Thread,任务就是Runnable。
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) { ...省略一些不重要的 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //Worker是实现了Runnable接口的包装类 w = new Worker(firstTask); //Thread是在Worker构造方法创建的 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); //检查线程池状态,分为2种情况 //1、线程池处于RUNNING //2、线程池处于SHUTDOWN并且firstTask==null //这2种情况都会创建Worker来执行队列中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //重新设置标识位 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
由代码,大概知道是通过创建Worker来执行任务的,而且线程是在Worker内部创建的,所以想到Thread需要的Runnable应该也在Worker类内部
Worker()
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
Worker是一个Runnable任务,而且Worker的构造方法中创建了Thread对象。
这样的话,在之前的addWorker()方法中调用t.start(); 就会执行到Worker的run()方法。
runWorker()
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //这个就是addWorker传进来的Runnable Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //如果task不为null或从workQueue中获取任务不为null //就会一直执行 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //检查线程池状态,如果线程池处于中断状态,将调用interrupt将线程中断。 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //中断线程 wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //线程任务执行啦 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
这里的关键在于这个while()条件判断,当第一次创建Worker时就有任务,当执行完这个任务后,这个方法并没有结束,而是不断地调用getTask()方法从阻塞队列中获取任务然后调用task.run()执行任务。
getTask()
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 1.allowCoreThreadTimeOut表示是否允许核心线程超时销毁,默认是false,也就是说核心线程即使空闲也不会被销毁 //当然,如果设置为true,核心线程是会销毁的 //这样的话,只有正在工作的线程数大于核心线程数才会为true,否则返回false boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //2.如果timed为true,通过poll取任务;如果为false,通过take取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
这个方法是通过一个死循环取任务,取任务的话是通过workQueue这个阻塞队列去完成的,在默认不改变allowCoreThreadTimeOut的前提下,
- 如果工作线程数大于核心线程数,则通过poll()从队列取任务,取完队列后,线程销毁;
- 如果工作线程数小于核心线程数,则通过take()从队列取任务,取完队列后,线程阻塞;
这2个方法的区别是:take()取任务时,如果队列中没有任务了会调用await()阻塞当前线程。
![](https://gitee.com/isgangzi/image-store/raw/master/img/image (1).png)
总结
- 当Thread的run方法执行完一个任务之后,会循环地从阻塞队列中取任务来执行,这样执行完一个任务之后就不会立即销毁了;
- 当工作线程数小于核心线程数,那些空闲的核心线程再去队列取任务的时候,如果队列中的Runnable数量为0,就会阻塞当前线程,这样线程就不会回收了。线程的唤醒是在execute时,当调用workQueue.offer()方法,将任务放入阻塞队列时,会调用Condition.signal()方法唤醒一个之前阻塞的线程。
微信公众号:程序员WeiG
个人博客:wggz.top
语雀主页:yuque.com/weig