【Java多线程】ThreadPoolExecutor
1、线程池是做什么的
/** * An {@link ExecutorService} that executes each submitted task using * one of possibly several pooled threads, normally configured * using {@link Executors} factory methods. * * <p>Thread pools address two different problems: they usually * provide improved performance when executing large numbers of * asynchronous tasks, due to reduced per-task invocation overhead, * and they provide a means of bounding and managing the resources, * including threads, consumed when executing a collection of tasks. * Each {@code ThreadPoolExecutor} also maintains some basic * statistics, such as the number of completed tasks. */根据类继承关系和官方注释,我们可以大致的了解到,ThreadPoolExecutor是一个ExecutorService,他负责将提交的任务通过工厂方法配置给池中的线程去执行。
线程池主要为了解决两个问题:
1.通过减少每个任务的调用上限,提高执行大量异步任务的性能;
2.提供了一种限制和管理资源的手段,包括线程,在执行任务时他们会被消费掉。
2. 线程池的执行流程
相信大家对线程池的执行流程都不陌生了:
当线程池中的线程数小于corePoolSize 时,新提交的任务直接新建一个线程执行任务(不管是否有空闲线程);
当线程池中的线程数等于corePoolSize 时,新提交的任务将会进入阻塞队列(workQueue)中,等待线程的调度;
当阻塞队列满了以后,如果corePoolSize < maximumPoolSize ,则新提交的任务会新建线程执行任务,直至线程数达到maximumPoolSize;
当线程数达到maximumPoolSize 时,新提交的任务会由(拒绝策略)管理。
当线程池中的线程数等于corePoolSize 时,新提交的任务将会进入阻塞队列(workQueue)中,等待线程的调度;
当阻塞队列满了以后,如果corePoolSize < maximumPoolSize ,则新提交的任务会新建线程执行任务,直至线程数达到maximumPoolSize;
当线程数达到maximumPoolSize 时,新提交的任务会由(拒绝策略)管理。
3. 线程池是如何做到资源管理的,源码分析
1. 重要成员变量
线程池通过按位切割来表示线程池的状态和当前线程数。
//记录线程池状态和线程数量(总共32位,前三位表示线程池状态,后29位表示线程数量) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //线程数量统计位数29 Integer.SIZE=32 private static final int COUNT_BITS = Integer.SIZE - 3; //容量 000 11111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //运行中 111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //关闭 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //停止 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //整理 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //终止 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; //获取运行状态(获取前3位) private static int runStateOf(int c) { return c & ~CAPACITY; } //获取线程个数(获取后29位) private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
- RUNNING:接受新任务并且处理阻塞队列里的任务
- SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
- STOP:拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
- TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
- TERMINATED:终止状态。terminated方法调用完成以后的状态
线程池状态转换
RUNNING -> SHUTDOWN 显式调用shutdown()方法, 或者隐式调用了finalize()方法 (RUNNING or SHUTDOWN) -> STOP 显式调用shutdownNow()方法 SHUTDOWN -> TIDYING 当线程池和任务队列都为空的时候 STOP -> TIDYING 当线程池为空的时候 TIDYING -> TERMINATED 当 terminated() hook 方法执行完成时候
其他变量:
// 缓存任务阻塞队列 private final BlockingQueue<Runnable> workQueue; // 线程池主锁 private final ReentrantLock mainLock = new ReentrantLock(); // 工作线程 private final HashSet<Worker> workers = new HashSet<Worker>(); // mainLock上的终止条件量,用于支持awaitTermination private final Condition termination = mainLock.newCondition(); // 记录曾经创建的最大线程数 private int largestPoolSize; // 已经完成任务数 private long completedTaskCount; private volatile ThreadFactory threadFactory; // 设置默认任务拒绝策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
2. 重要内部类:Worker
/** * Class Worker mainly maintains interrupt control state for * threads running tasks, along with other minor bookkeeping. * This class opportunistically extends AbstractQueuedSynchronizer * to simplify acquiring and releasing a lock surrounding each * task execution. This protects against interrupts that are * intended to wake up a worker thread waiting for a task from * instead interrupting a task being run. We implement a simple * non-reentrant mutual exclusion lock rather than use * ReentrantLock because we do not want worker tasks to be able to * reacquire the lock when they invoke pool control methods like * setCorePoolSize. Additionally, to suppress interrupts until * the thread actually starts running tasks, we initialize lock * state to a negative value, and clear it upon start (in * runWorker). */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }从注释中看到:Class Worker主要维护运行任务的线程的中断控制状态。Worker类继承了AQS以简化获取和释放每个任务的锁。 这可以防止意图唤醒等待任务的工作线程而不是中断正在运行的任务的中断。 我们实现了一个简单的非重入互斥锁而不是使用ReentrantLock,因为我们不希望任务在调用setCorePoolSize等池控制方法时能够重新获取锁。 另外,为了在线程实际开始运行任务之前禁止中断,我们将锁状态初始化为负值,并在启动时清除它(在runWorker中)。
3. 重要方法 execute
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { //传进来的线程为null,则抛出空指针异常 if (command == null) throw new NullPointerException(); //获取当前线程池的状态+线程个数变量 int c = ctl.get(); /** * 3个步骤 */ //1.判断当前线程池线程个数是否小于corePoolSize,小于则调用addWorker方法创建新线程运行,且传进来的Runnable当做第一个任务执行。 //如果调用addWorker方法返回false,则直接返回 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //2.如果线程池处于RUNNING状态,则添加任务到阻塞队列 if (isRunning(c) && workQueue.offer(command)) { //二次检查 int recheck = ctl.get(); //如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); //否者如果当前线程池线程空,则添加一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3.新增线程,新增失败则执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
4.重要方法addWorker
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 检查当前线程池状态是否是SHUTDOWN、STOP、TIDYING或者TERMINATED // 且!(当前状态为SHUTDOWN、且传入的任务为null,且队列不为null) // 条件都成立则返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //自旋 for (;;) { int wc = workerCountOf(c); //如果当前的线程数量超过最大容量或者大于(根据传入的core决定是核心线程数还是最大线程数)核心线程数 || 最大线程数,则返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS增加c,成功则跳出retry if (compareAndIncrementWorkerCount(c)) break retry; //CAS失败执行下面方法,查看当前线程数是否变化,变化则继续retry循环,没变化则继续内部循环 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } //CAS成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //新建一个线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //重新检查线程池状态 //避免ThreadFactory退出故障或者在锁获取前线程池被关闭 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // 先检查线程是否是可启动的 throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //判断worker是否添加成功,成功则启动线程,然后将workerStarted设置为true if (workerAdded) { t.start(); workerStarted = true; } } } finally { //判断线程有没有启动成功,没有则调用addWorkerFailed方法 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }检查是否可以根据当前池状态和给定边界(核心或最大)添加新工作线程。 如果可以添加,则相应地调整Worker计数,并且如果可能,创建并启动Worker,将firstTask作为其第一任务运行。 如果池已停止或有资格关闭,则此方法返回false。 如果线程工厂在询问时无法创建线程,它也会返回false。 如果线程创建失败,或者由于线程工厂返回null,或者由于异常(通常是thread.start()中的OutOfMemoryError),我们会干净地回滚。
5. 重要方法runWorker
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { //循环获取任务 while (task != null || (task = getTask()) != null) { w.lock(); // 当线程池是处于STOP状态或者TIDYING、TERMINATED状态时,设置当前线程处于中断状态 // 如果不是,当前线程就处于RUNNING或者SHUTDOWN状态,确保当前线程不处于中断状态 // 重新检查当前线程池的状态是否大于等于STOP状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //提供给继承类使用做一些统计之类的事情,在线程运行前调用 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //提供给继承类使用做一些统计之类的事情,在线程运行之后调用 afterExecute(task, thrown); } } finally { task = null; //统计当前worker完成了多少个任务 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //整个线程结束时调用,线程退出操作。统计整个线程池完成的任务个数之类的工作 processWorkerExit(w, completedAbruptly); } }
6. 重要方法processWorkerExit
/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * * @param w the worker * @param completedAbruptly if the worker died due to user exception */ private void processWorkerExit(Worker w, boolean completedAbruptly) { /** * 1、worker数量-1 * 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1 * 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了 */ if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 代码和注释正好相反啊 decrementWorkerCount(); /** * 2、从Workers Set中移除worker */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; //把worker的完成任务数加到线程池的完成任务数 workers.remove(w); //从HashSet<Worker>中移除 } finally { mainLock.unlock(); } /** * 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池 * 主要是判断线程池是否满足终止的状态 * 如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程 * 没有线程了,更新状态为tidying->terminated */ tryTerminate(); /** * 4、是否需要增加worker线程 * 线程池状态是running 或 shutdown * 如果当前线程是突然终止的,addWorker() * 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker() * 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程 */ int c = ctl.get(); //如果状态是running、shutdown,即tryTerminate()没有成功终止线程池,尝试再添加一个worker if (runStateLessThan(c, STOP)) { //不是突然完成的,即没有task任务可以获取而完成的,计算min,并根据当前worker数量判断是否需要addWorker() if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默认为false,即min默认为corePoolSize //如果min为0,即不需要维持核心线程数量,且workQueue不为空,至少保持一个线程 if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果线程数量大于最少数量,直接返回,否则下面至少要addWorker一个 if (workerCountOf(c) >= min) return; // replacement not needed } //添加一个没有firstTask的worker //只要worker是completedAbruptly突然终止的,或者线程数量小于要维护的数量,就新添一个worker线程,即使是shutdown状态 addWorker(null, false); } }执行流程:
1、worker数量-1
A、如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
B、如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
2、从Workers Set中移除worker,删除时需要上锁mainlock
3、tryTerminate():在对线程池有负效益的操作时,都需要“尝试终止”线程池,大概逻辑:
判断线程池是否满足终止的状态
A、如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
B、没有线程了,更新状态为tidying->terminated
4、是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程
线程池状态是running 或 shutdown
A、如果当前线程是突然终止的,addWorker()
B、如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
7. 重要方法,关闭线程池
关闭线程池
shutdown
当调用shutdown方法时,线程池将不会再接收新的任务,然后将先前放在队列中的任务执行完成。
下面是shutdown方法的源码
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow
立即停止所有的执行任务,并将队列中的任务返回
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
shutdown和shutdownNow区别
shutdown和shutdownNow这两个方法的作用都是关闭线程池,流程大致相同,只有几个步骤不同,如下
- 加锁
- 检查关闭权限
- CAS改变线程池状态
- 设置中断标志(线程池不在接收任务,队列任务会完成)/中断当前执行的线程
- 调用onShutdown方法(给子类提供的方法)/获取队列中的任务
- 解锁
- 尝试将线程池状态变成终止状态TERMINATED
- 结束/返回队列中的任务