JUC-线程池
使用线程池
jdk默认生成线程池的三大方法
- Executors.newSingleThreadExecturor(); 创建一个大小为1的线程池
- Executors.newFixedThreadPool(int x); 创建一个指定大小的线程池
- Executors.newChacheThreadPoll();创建一个可伸缩的线程池
public class ExecutorServiceTest { public static void main(String[] args) { ExecutorService threadPool1 = Executors.newSingleThreadExecutor(); // 创建一个线程池大小为1的线程池 ExecutorService threadPool2 = Executors.newFixedThreadPool(5); // 创建一个大小固定的线程池 ExecutorService threadPool3 = Executors.newCachedThreadPool(); // 创建一个大小可伸缩的线程池 try { for (int i = 0; i < 30; i++) { threadPool3.execute(()->{ System.out.println(Thread.currentThread().getName() + " ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭线程池 threadPool1.shutdown(); // threadPool2.shutdown(); } } }
出现问题:
如果查看这三个方法的源码,
newSingleThreadExecutor、newFixedThreadPool使用的阻塞队列,默认最大值为Integer.MAX_VALUE,这可能会导致堆积大量请求,导致OOM
newCachedThreadPool,线程的最大值数量可能是Integer.MAX_VALUE,这可能导致堆积大量线程,导致OOM
自定义线程池
七大参数
查看上面三个方法的源码:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
发现这三个方法都 new 了一个ThreadPoolExecutor对象
查看这个方法的构造器:
public ThreadPoolExecutor(int corePoolSize, // 核心线程数量的大小 int maximumPoolSize, // 最大线程数量的大小 long keepAliveTime, // 保活时间 TimeUnit unit, // 保活时间单位 BlockingQueue<Runnable> workQueue, // 阻塞队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler) { // 拒绝策略 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
查看上面的业务图
理解七大参数
int corePoolSize, // 核心线程数量的大小
常驻在这里,不会被释放,像银行的常驻柜台
int maximumPoolSize, // 最大线程数量的大小
如果银行人来太多了,候客区(阻塞队列)也满了,就会开通这块区域
long keepAliveTime, // 保活时间
如果长时间没有人来,超过这个时间就释放资源
TimeUnit unit, // 保活时间单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
就像银行的候客区
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
如果所有的柜台和候客区都满了,那么新来的客人需要拒绝策略来决定新来的客人去哪
自定义线程池
public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); try { for (int i = 1; i <= 5 ; i++) { threadPool.execute(()-> { System.out.println(Thread.currentThread().getName() + "ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } } /** * pool-1-thread-1ok * pool-1-thread-2ok * pool-1-thread-2ok * pool-1-thread-2ok * pool-1-thread-1ok */
可以看到,如果这里跑5条线程,那么 5 == 核心线程数+阻塞队列中的线程数,那么只会启用核心的两条线程
测试,跑6条线程,那么6 >= 核心线程数+阻塞队列中的线程数,就会启用其他三条线程中的一个
pool-1-thread-1ok pool-1-thread-2ok pool-1-thread-3ok pool-1-thread-3ok pool-1-thread-1ok pool-1-thread-2ok
测试,跑7条线程,那么7 >= 核心线程数+阻塞队列中的线程数,就会启用其他三条线程中的两个
pool-1-thread-1ok pool-1-thread-3ok pool-1-thread-3ok pool-1-thread-3ok pool-1-thread-2ok pool-1-thread-4ok pool-1-thread-1ok
测试,跑8条线程,那么8 >= 核心线程数+阻塞队列中的线程数,就会启用其他三条线程中的三个
pool-1-thread-1ok pool-1-thread-5ok pool-1-thread-4ok pool-1-thread-2ok pool-1-thread-3ok pool-1-thread-4ok pool-1-thread-5ok pool-1-thread-1ok
测试,跑9条线程,那么9 >= 核心线程数+阻塞队列中的线程数,且9 > 最大线程数(5) + 阻塞队列中的线程数(8),则会触发拒绝策略,这里使用的拒绝策略是AbortPolicy,也就是拒绝处理,并抛出异常
pool-1-thread-1ok pool-1-thread-3ok pool-1-thread-2ok pool-1-thread-3ok pool-1-thread-5ok pool-1-thread-1ok pool-1-thread-4ok pool-1-thread-2ok java.util.concurrent.RejectedExecutionException: Task com.pan.pool.ThreadPoolExecutorTest$$Lambda$1/1831932724@7699a589 rejected from java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 5, active threads = 4, queued tasks = 0, completed tasks = 3] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.pan.pool.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:26)
四大拒绝策略
查看源码,发现RejectedExecutionHandler只有四个实现类,那么也就是只有四种拒绝策略。
- AbortPolicy:不处理,并抛出异常
- CallerRunsPolicy:哪来的去哪里
public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() ); try { for (int i = 1; i <= 9 ; i++) { threadPool.execute(()-> { System.out.println(Thread.currentThread().getName() + "ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } } /** * pool-1-thread-1ok * pool-1-thread-3ok * pool-1-thread-2ok * pool-1-thread-4ok * pool-1-thread-5ok * pool-1-thread-3ok * pool-1-thread-1ok * mainok * pool-1-thread-2ok */
可以看到,这条线程是从main线程来的,所以最终交由main线程执行
- DiscardOldestPolicy:队列满了,尝试和队列中的第一个线程去竞争,如果竞争到了就执行,否则不执行
public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy() ); try { for (int i = 1; i <= 9 ; i++) { threadPool.execute(()-> { System.out.println(Thread.currentThread().getName() + "ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } } /** * pool-1-thread-1ok * pool-1-thread-5ok * pool-1-thread-5ok * pool-1-thread-5ok * pool-1-thread-3ok * pool-1-thread-2ok * pool-1-thread-4ok * pool-1-thread-1ok */
- DiscardPolicy:队列满了,直接丢弃任务,不抛出异常
public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() ); try { for (int i = 1; i <= 9 ; i++) { threadPool.execute(()-> { System.out.println(Thread.currentThread().getName() + "ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } } /** * pool-1-thread-1ok * pool-1-thread-4ok * pool-1-thread-3ok * pool-1-thread-3ok * pool-1-thread-2ok * pool-1-thread-1ok * pool-1-thread-5ok * pool-1-thread-4ok */
线程池参数设置最优解
如何设置线程池线程最大数量?
分为IO密集型和CPU密集型
CPU密集型,主机有多少核CPU,就设置多少条线程
IO密集型,如果有15个大型任务,非常占用IO资源,那么就设置30条线程,或者20条线程,保证其他线程也可以正常运行
使用Runtime.getRuntime().availableProcessors(),可以获取当前电脑CPU核数
#JUC#