JUC-线程池

使用线程池

jdk默认生成线程池的三大方法

  • Executors.newSingleThreadExecturor(); 创建一个大小为1的线程池
  • Executors.newFixedThreadPool(int x); 创建一个指定大小的线程池
  • Executors.newChacheThreadPoll();创建一个可伸缩的线程池
public class ExecutorServiceTest {
    public static void main(String[] args) {
        ExecutorService threadPool1 = Executors.newSingleThreadExecutor(); // 创建一个线程池大小为1的线程池
        ExecutorService threadPool2 = Executors.newFixedThreadPool(5); // 创建一个大小固定的线程池
        ExecutorService threadPool3 = Executors.newCachedThreadPool(); // 创建一个大小可伸缩的线程池


        try {
            for (int i = 0; i < 30; i++) {
                threadPool3.execute(()->{
                    System.out.println(Thread.currentThread().getName() + " ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            threadPool1.shutdown();
            // threadPool2.shutdown();
        }
    }
}

出现问题:

如果查看这三个方法的源码,

newSingleThreadExecutor、newFixedThreadPool使用的阻塞队列,默认最大值为Integer.MAX_VALUE,这可能会导致堆积大量请求,导致OOM

newCachedThreadPool,线程的最大值数量可能是Integer.MAX_VALUE,这可能导致堆积大量线程,导致OOM

自定义线程池

七大参数

查看上面三个方法的源码:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

发现这三个方法都 new 了一个ThreadPoolExecutor对象

查看这个方法的构造器:

    public ThreadPoolExecutor(int corePoolSize, // 核心线程数量的大小
                              int maximumPoolSize, // 最大线程数量的大小
                              long keepAliveTime, // 保活时间
                              TimeUnit unit, // 保活时间单位
                              BlockingQueue<Runnable> workQueue, // 阻塞队列
                              ThreadFactory threadFactory, // 线程工厂
                              RejectedExecutionHandler handler) { // 拒绝策略
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

查看上面的业务图

理解七大参数

int corePoolSize, // 核心线程数量的大小

常驻在这里,不会被释放,像银行的常驻柜台

int maximumPoolSize, // 最大线程数量的大小

如果银行人来太多了,候客区(阻塞队列)也满了,就会开通这块区域

long keepAliveTime, // 保活时间

如果长时间没有人来,超过这个时间就释放资源

TimeUnit unit, // 保活时间单位

BlockingQueue<Runnable> workQueue, // 阻塞队列

就像银行的候客区

ThreadFactory threadFactory, // 线程工厂

RejectedExecutionHandler handler // 拒绝策略

如果所有的柜台和候客区都满了,那么新来的客人需要拒绝策略来决定新来的客人去哪

自定义线程池

public class ThreadPoolExecutorTest {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        try {
            for (int i = 1; i <= 5 ; i++) {
                threadPool.execute(()-> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}
/**
 * pool-1-thread-1ok
 * pool-1-thread-2ok
 * pool-1-thread-2ok
 * pool-1-thread-2ok
 * pool-1-thread-1ok
 */

可以看到,如果这里跑5条线程,那么 5 == 核心线程数+阻塞队列中的线程数,那么只会启用核心的两条线程

测试,跑6条线程,那么6 >= 核心线程数+阻塞队列中的线程数,就会启用其他三条线程中的一个

pool-1-thread-1ok
pool-1-thread-2ok
pool-1-thread-3ok
pool-1-thread-3ok
pool-1-thread-1ok
pool-1-thread-2ok

测试,跑7条线程,那么7 >= 核心线程数+阻塞队列中的线程数,就会启用其他三条线程中的两个

pool-1-thread-1ok
pool-1-thread-3ok
pool-1-thread-3ok
pool-1-thread-3ok
pool-1-thread-2ok
pool-1-thread-4ok
pool-1-thread-1ok

测试,跑8条线程,那么8 >= 核心线程数+阻塞队列中的线程数,就会启用其他三条线程中的三个

pool-1-thread-1ok
pool-1-thread-5ok
pool-1-thread-4ok
pool-1-thread-2ok
pool-1-thread-3ok
pool-1-thread-4ok
pool-1-thread-5ok
pool-1-thread-1ok

测试,跑9条线程,那么9 >= 核心线程数+阻塞队列中的线程数,且9 > 最大线程数(5) + 阻塞队列中的线程数(8),则会触发拒绝策略,这里使用的拒绝策略是AbortPolicy,也就是拒绝处理,并抛出异常

pool-1-thread-1ok
pool-1-thread-3ok
pool-1-thread-2ok
pool-1-thread-3ok
pool-1-thread-5ok
pool-1-thread-1ok
pool-1-thread-4ok
pool-1-thread-2ok
java.util.concurrent.RejectedExecutionException: Task com.pan.pool.ThreadPoolExecutorTest$$Lambda$1/1831932724@7699a589 rejected from java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 5, active threads = 4, queued tasks = 0, completed tasks = 3]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.pan.pool.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:26)

四大拒绝策略

查看源码,发现RejectedExecutionHandler只有四个实现类,那么也就是只有四种拒绝策略。

  • AbortPolicy:不处理,并抛出异常
  • CallerRunsPolicy:哪来的去哪里
public class ThreadPoolExecutorTest {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );

        try {
            for (int i = 1; i <= 9 ; i++) {
                threadPool.execute(()-> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}
/**
 * pool-1-thread-1ok
 * pool-1-thread-3ok
 * pool-1-thread-2ok
 * pool-1-thread-4ok
 * pool-1-thread-5ok
 * pool-1-thread-3ok
 * pool-1-thread-1ok
 * mainok
 * pool-1-thread-2ok
 */

可以看到,这条线程是从main线程来的,所以最终交由main线程执行

  • DiscardOldestPolicy:队列满了,尝试和队列中的第一个线程去竞争,如果竞争到了就执行,否则不执行
public class ThreadPoolExecutorTest {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy()
        );

        try {
            for (int i = 1; i <= 9 ; i++) {
                threadPool.execute(()-> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}
/**
 * pool-1-thread-1ok
 * pool-1-thread-5ok
 * pool-1-thread-5ok
 * pool-1-thread-5ok
 * pool-1-thread-3ok
 * pool-1-thread-2ok
 * pool-1-thread-4ok
 * pool-1-thread-1ok
 */
  • DiscardPolicy:队列满了,直接丢弃任务,不抛出异常
public class ThreadPoolExecutorTest {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy()
        );

        try {
            for (int i = 1; i <= 9 ; i++) {
                threadPool.execute(()-> {
                    System.out.println(Thread.currentThread().getName() + "ok");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}
/**
 * pool-1-thread-1ok
 * pool-1-thread-4ok
 * pool-1-thread-3ok
 * pool-1-thread-3ok
 * pool-1-thread-2ok
 * pool-1-thread-1ok
 * pool-1-thread-5ok
 * pool-1-thread-4ok
 */

线程池参数设置最优解

如何设置线程池线程最大数量?

分为IO密集型和CPU密集型

CPU密集型,主机有多少核CPU,就设置多少条线程

IO密集型,如果有15个大型任务,非常占用IO资源,那么就设置30条线程,或者20条线程,保证其他线程也可以正常运行

使用Runtime.getRuntime().availableProcessors(),可以获取当前电脑CPU核数

#JUC#
全部评论

相关推荐

求个公司要我:接好运
点赞 评论 收藏
分享
2024-12-04 19:53
已编辑
湖南文理学院 产品经理
牛客224543458号:他想找牛马,愿意疯狂加班的,因为要证明自己
点赞 评论 收藏
分享
评论
5
5
分享

创作者周榜

更多
牛客网
牛客企业服务