JUC-线程池

使用线程池

jdk默认生成线程池的三大方法

  • Executors.newSingleThreadExecturor(); 创建一个大小为1的线程池
  • Executors.newFixedThreadPool(int x); 创建一个指定大小的线程池
  • Executors.newChacheThreadPoll();创建一个可伸缩的线程池
public class ExecutorServiceTest {
    public static void main(String[] args) {
        ExecutorService threadPool1 = Executors.newSingleThreadExecutor(); // 创建一个线程池大小为1的线程池
        ExecutorService threadPool2 = Executors.newFixedThreadPool(5); // 创建一个大小固定的线程池
        ExecutorService threadPool3 = Executors.newCachedThreadPool(); // 创建一个大小可伸缩的线程池


        try {
            for (int i = 0; i < 30; i++) {
                threadPool3.execute(()->{
                    System.out.println(Thread.currentThread().getName() + " ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            threadPool1.shutdown();
            // threadPool2.shutdown();
        }
    }
}

出现问题:

如果查看这三个方法的源码,

newSingleThreadExecutor、newFixedThreadPool使用的阻塞队列,默认最大值为Integer.MAX_VALUE,这可能会导致堆积大量请求,导致OOM

newCachedThreadPool,线程的最大值数量可能是Integer.MAX_VALUE,这可能导致堆积大量线程,导致OOM

自定义线程池

七大参数

查看上面三个方法的源码:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

发现这三个方法都 new 了一个ThreadPoolExecutor对象

查看这个方法的构造器:

    public ThreadPoolExecutor(int corePoolSize, // 核心线程数量的大小
                              int maximumPoolSize, // 最大线程数量的大小
                              long keepAliveTime, // 保活时间
                              TimeUnit unit, // 保活时间单位
                              BlockingQueue<Runnable> workQueue, // 阻塞队列
                              ThreadFactory threadFactory, // 线程工厂
                              RejectedExecutionHandler handler) { // 拒绝策略
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

查看上面的业务图

理解七大参数

int corePoolSize, // 核心线程数量的大小

常驻在这里,不会被释放,像银行的常驻柜台

int maximumPoolSize, // 最大线程数量的大小

如果银行人来太多了,候客区(阻塞队列)也满了,就会开通这块区域

long keepAliveTime, // 保活时间

如果长时间没有人来,超过这个时间就释放资源

TimeUnit unit, // 保活时间单位

BlockingQueue<Runnable> workQueue, // 阻塞队列

就像银行的候客区

ThreadFactory threadFactory, // 线程工厂

RejectedExecutionHandler handler // 拒绝策略

如果所有的柜台和候客区都满了,那么新来的客人需要拒绝策略来决定新来的客人去哪

自定义线程池

public class ThreadPoolExecutorTest {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        try {
            for (int i = 1; i <= 5 ; i++) {
                threadPool.execute(()-> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}
/**
 * pool-1-thread-1ok
 * pool-1-thread-2ok
 * pool-1-thread-2ok
 * pool-1-thread-2ok
 * pool-1-thread-1ok
 */

可以看到,如果这里跑5条线程,那么 5 == 核心线程数+阻塞队列中的线程数,那么只会启用核心的两条线程

测试,跑6条线程,那么6 >= 核心线程数+阻塞队列中的线程数,就会启用其他三条线程中的一个

pool-1-thread-1ok
pool-1-thread-2ok
pool-1-thread-3ok
pool-1-thread-3ok
pool-1-thread-1ok
pool-1-thread-2ok

测试,跑7条线程,那么7 >= 核心线程数+阻塞队列中的线程数,就会启用其他三条线程中的两个

pool-1-thread-1ok
pool-1-thread-3ok
pool-1-thread-3ok
pool-1-thread-3ok
pool-1-thread-2ok
pool-1-thread-4ok
pool-1-thread-1ok

测试,跑8条线程,那么8 >= 核心线程数+阻塞队列中的线程数,就会启用其他三条线程中的三个

pool-1-thread-1ok
pool-1-thread-5ok
pool-1-thread-4ok
pool-1-thread-2ok
pool-1-thread-3ok
pool-1-thread-4ok
pool-1-thread-5ok
pool-1-thread-1ok

测试,跑9条线程,那么9 >= 核心线程数+阻塞队列中的线程数,且9 > 最大线程数(5) + 阻塞队列中的线程数(8),则会触发拒绝策略,这里使用的拒绝策略是AbortPolicy,也就是拒绝处理,并抛出异常

pool-1-thread-1ok
pool-1-thread-3ok
pool-1-thread-2ok
pool-1-thread-3ok
pool-1-thread-5ok
pool-1-thread-1ok
pool-1-thread-4ok
pool-1-thread-2ok
java.util.concurrent.RejectedExecutionException: Task com.pan.pool.ThreadPoolExecutorTest$$Lambda$1/1831932724@7699a589 rejected from java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 5, active threads = 4, queued tasks = 0, completed tasks = 3]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.pan.pool.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:26)

四大拒绝策略

查看源码,发现RejectedExecutionHandler只有四个实现类,那么也就是只有四种拒绝策略。

  • AbortPolicy:不处理,并抛出异常
  • CallerRunsPolicy:哪来的去哪里
public class ThreadPoolExecutorTest {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        try {
            for (int i = 1; i <= 9 ; i++) {
                threadPool.execute(()-> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}
/**
 * pool-1-thread-1ok
 * pool-1-thread-3ok
 * pool-1-thread-2ok
 * pool-1-thread-4ok
 * pool-1-thread-5ok
 * pool-1-thread-3ok
 * pool-1-thread-1ok
 * mainok
 * pool-1-thread-2ok
 */

可以看到,这条线程是从main线程来的,所以最终交由main线程执行

  • DiscardOldestPolicy:队列满了,尝试和队列中的第一个线程去竞争,如果竞争到了就执行,否则不执行
public class ThreadPoolExecutorTest {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy()
        );

        try {
            for (int i = 1; i <= 9 ; i++) {
                threadPool.execute(()-> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}
/**
 * pool-1-thread-1ok
 * pool-1-thread-5ok
 * pool-1-thread-5ok
 * pool-1-thread-5ok
 * pool-1-thread-3ok
 * pool-1-thread-2ok
 * pool-1-thread-4ok
 * pool-1-thread-1ok
 */
  • DiscardPolicy:队列满了,直接丢弃任务,不抛出异常
public class ThreadPoolExecutorTest {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy()
        );

        try {
            for (int i = 1; i <= 9 ; i++) {
                threadPool.execute(()-> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}
/**
 * pool-1-thread-1ok
 * pool-1-thread-4ok
 * pool-1-thread-3ok
 * pool-1-thread-3ok
 * pool-1-thread-2ok
 * pool-1-thread-1ok
 * pool-1-thread-5ok
 * pool-1-thread-4ok
 */

线程池参数设置最优解

如何设置线程池线程最大数量?

分为IO密集型和CPU密集型

CPU密集型,主机有多少核CPU,就设置多少条线程

IO密集型,如果有15个大型任务,非常占用IO资源,那么就设置30条线程,或者20条线程,保证其他线程也可以正常运行

使用Runtime.getRuntime().availableProcessors(),可以获取当前电脑CPU核数

#JUC#
全部评论

相关推荐

喜欢吃蛋糕仰泳鲈鱼是我的神:字节可以找个hr 给你挂了,再放池子捞
点赞 评论 收藏
分享
Yushuu:你的确很厉害,但是有一个小问题:谁问你了?我的意思是,谁在意?我告诉你,根本没人问你,在我们之中0人问了你,我把所有问你的人都请来 party 了,到场人数是0个人,誰问你了?WHO ASKED?谁问汝矣?誰があなたに聞きましたか?누가 물어봤어?我爬上了珠穆朗玛峰也没找到谁问你了,我刚刚潜入了世界上最大的射电望远镜也没开到那个问你的人的盒,在找到谁问你之前我连癌症的解药都发明了出来,我开了最大距离渲染也没找到谁问你了我活在这个被辐射蹂躏了多年的破碎世界的坟墓里目睹全球核战争把人类文明毁灭也没见到谁问你了😆
点赞 评论 收藏
分享
评论
5
5
分享
牛客网
牛客企业服务