【详解】Executors框架之ThreadPoolExecutor
目录
引出
摘自阿里巴巴开发手册:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 返回的线程池对象的弊端如下:
- 1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
- 2)CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
线程池创建的参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:线程池始终线程数,即使有些是空闲的。设置
allowCoreThreadTimeOut
参数为true,才会进行回收。 - maximumPoolSize:线程池最大线程数,表示在线程池中最多能创建多少个线程。如果当线程池中的数量到达这个数字时,新来的任务会抛出异常。
- keepAliveTime:表示线程没有任务执行时最多能保持多少时间会回收,然后线程池的数目维持在corePoolSize。
- unit:参数keepAliveTime的时间单位
- workQueue:一个阻塞队列,所有的任务都会先放在这里,务;如果对当前对线程的需求超过了corePoolSize大小,会用来存储等待执行的任。
- threadFactory:线程工厂,主要用来创建线程,比如指定线程的名字。
- handler:如果线程池已满,新的任务处理方式。
注意一点:<mark>初始化线程池时,线程数为0</mark>
测试
测试点
- coreSize = 1,MaxSize=2,阻塞队列为1,如果提交三个任务会怎么样?
- coreSize = 1,MaxSize=2,阻塞队列大小为5,如果提交七个任务会怎么样?
- coreSize = 1,MaxSize=2,阻塞队列大小为5,如果提交八个任务会怎么样?
测试代码
public class ThreadPoolExecutorBuild {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = bulidThreadPoolExecutor();
int activeCount = -1;
int queueSize = -1;
while (true){
if (activeCount!=threadPoolExecutor.getActiveCount() || queueSize !=threadPoolExecutor.getQueue().size()){
activeCount = threadPoolExecutor.getActiveCount();
queueSize = threadPoolExecutor.getQueue().size();
System.out.println("活跃的线程数:"+threadPoolExecutor.getActiveCount());
System.out.println("getCorePoolSize: "+threadPoolExecutor.getCorePoolSize());
System.out.println("阻塞队列的任务数:"+threadPoolExecutor.getQueue().size());
System.out.println("最大的线程数:"+threadPoolExecutor.getMaximumPoolSize());
System.out.println("======================");
}
}
}
private static ThreadPoolExecutor bulidThreadPoolExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,2,
30, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1), Thread::new,new ThreadPoolExecutor.AbortPolicy());
System.out.println("The thread pool creat done.");
threadPoolExecutor.execute(()->sleepSeconds(100));
threadPoolExecutor.execute(()->sleepSeconds(100));
threadPoolExecutor.execute(()->sleepSeconds(100));
return threadPoolExecutor;
}
private static void sleepSeconds(long sec){
try {
System.out.println(" >>> " + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(sec);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果
测试一结果:
The thread pool creat done.
>>> Thread-0
>>> Thread-1
活跃的线程数:2
getCorePoolSize: 1
阻塞队列的任务数:1
最大的线程数:2
======================
测试二结果
The thread pool creat done.
>>> Thread-0
>>> Thread-1
活跃的线程数:2
getCorePoolSize: 1
阻塞队列的任务数:5
最大的线程数:2
======================
测试三结果
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ThreadPool.ThreadPoolExecutorBuild$$Lambda$9/2065951873@6acbcfc0 rejected from java.util.concurrent.ThreadPoolExecutor@5f184fc6[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at ThreadPool.ThreadPoolExecutorBuild.bulidThreadPoolExecutor(ThreadPoolExecutorBuild.java:44)
at ThreadPool.ThreadPoolExecutorBuild.main(ThreadPoolExecutorBuild.java:11)
结论:
- 当阻塞队列满时,才会创建新的线程
- 当线程有空闲时,超过一定时间会被回收
- 当线程数达到最大值,且阻塞队列满时,会执行拒绝策略
- 核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
线程池的阻塞队列的选择?
1. ArrayBlockingQueue
2. LinkedBlockingQueue
3. PriorityBlockingQueue
4. SynchronousQueue
- ArrayBlockingQueue:是一个有边界的阻塞队列,它的内部实现是一个数组。
它的容量在初始化时就确定不变。
- LinkedBlockingQueue:阻塞队列
大小的配置是可选的
,其内部实现是一个链表。 - PriorityBlockingQueue:是一个没有边界的队列,所有插入到PriorityBlockingQueue的对象必须实现java.lang.Comparable接口,队
列优先级的排序就是按照我们对这个接口的实现来定义的。
- SynchronousQueue:队列内部
仅允许容纳一个元素
。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。
workQueue常用的是:<mark>java.util.concurrent.ArrayBlockingQueue</mark>
拒绝策略
handler有四个选择:
- ThreadPoolExecutor.AbortPolicy(): 抛出java.util.concurrent.RejectedExecutionException异常
- ThreadPoolExecutor.CallerRunsPolicy():重试添加当前的任务,他会自动重复调用execute()方法
- ThreadPoolExecutor.DiscardOldestPolicy():抛弃旧的任务
- ThreadPoolExecutor.DiscardPolicy(): 抛弃当前的任务
关闭
关闭有两个方法:shutdown
和shutdownNow
shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
- 从源码可以看出,本质上执行的是
interrupt
方法 - 如果线程是空闲的,执行的是Condition的await的方***被直接打断,被回收
- 如果正在工作,该线程会被打上一个标记,等任务执行后被回收
shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();//先打断
tasks = drainQueue();//再把任务队列没有执行的任务取出
} finally {
mainLock.unlock();
}
tryTerminate();//不断的打断
return tasks;
}
- 先打断空闲的打断
- 然后清空任务队列
- 然后不断的尝试打断正在执行的线程
- 最后会返回一个List集合,包含还没有执行的任务
守护线程
如果有些任务执行时间很长,想要关闭,可以利用守护线程的方式强制关闭。
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,2,
30, TimeUnit.SECONDS,new ArrayBlockingQueue<>(5), r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
},new ThreadPoolExecutor.AbortPolicy());
<mark>被守护的线程是主线程,只要主线程执行完成,线程池就会强制关闭,可以配合awaitTermination
方法使用</mark>
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = bulidThreadPoolExecutor();
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(5,TimeUnit.SECONDS);
System.out.println("强制关闭");
}
一个关闭的陷阱
- 当我们关闭线程时,如果一个线程正在执行,被打断后,也会被系统认为完成了任务
- 当使用
shutdownNow
时,不会返回到集合中,此时该任务就莫名的消失了
API
关闭核心线程
public void allowCoreThreadTimeOut(boolean value)
- 当赋值为true时,核心线程在空闲的时候也会被销毁
更改线程池核心参数
public void setCorePoolSize(int corePoolSize)
public void setMaximumPoolSize(int maximumPoolSize)
public void setKeepAliveTime(long time,
TimeUnit unit)
public void setThreadFactory(ThreadFactory threadFactory)
public void setRejectedExecutionHandler(RejectedExecutionHandler handler)
<mark>除了阻塞队列,可以看出线程池参数都是可以更改的</mark>
创建初始线程
public int prestartAllCoreThreads()
默认线程数是0,使用后,会初始化到核心线程数
测试
public class ExecutorServiceTest {
public static void main(String[] args) {
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
executorService.prestartAllCoreThreads();
int poolSize = executorService.getPoolSize();
System.out.println(poolSize);
}
}
结果
5
清空阻塞队列
public void purge()
- 所有没有执行的任务会被清空