J.U.C并发包(2)
J.U.C并发包(2)
FutureTask
Callable接口和Runnable接口对比
- 二者大致相似,不同的是,Runnable只有一个方法就是Run,而Callable有一个Call方法,功能也更强大,因为被线程执行后有返回值,并且可以抛出异常。
Future接口
- Future对于具体的Runnable和Callable任务进行查询,取消或者获取结果等。可以监视目标线程调用Call的情况,当调用get()方法就会获得线程Call方法的返回值。
FutureTask类
- FutureTask是RunnableFuture的实现类,RunnableFuture的父接口就是Runnable和Future。
- 既可以作为Runnable被线程执行,也可以作为Future得到Call的返回值。
- 因此FutureTask是非常便利的。
示例代码:
public class FutureTaskExample { private static Logger log = LoggerFactory.getLogger(FutureTaskExample.class); public static void main(String[] args) throws Exception { FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() { @Override public String call() throws Exception { log.info("do something in callable"); Thread.sleep(5000); return "Done"; } }); new Thread(futureTask).start(); log.info("do something in main"); Thread.sleep(1000); String result = futureTask.get(); log.info("result:{}", result); } }
Fork/Join框架
- 并行执行任务,把大任务分割为若干个子任务(Fork),然后合并子任务的执行结果(Join)。
- 采用工作窃取算法,为了减少窃取任务线程与被窃取任务线程之间进行竞争,采用双端队列,被窃取任务线程从双端队列头部拿任务,窃取任务线程从双端队列尾部拿任务
- 优点:充分使用线程进行并行计算,减少线程之间的竞争
- 缺陷:还是可能存在浪费系统资源的可能。
- 局限性:任务只能使用Fork/Join作为同步机制,如果使用其他同步机制,线程就不能执行其他任务
- 执行的线程中的任务不可以做IO操作
- 任务不能跑出检查异常,必须额外通过代码处理。
BlockingQueue
- 当一个线程对一个已经填满的阻塞队列进行入队操作便会阻塞,当一个线程对一个空阻塞队列出队操作时也会发生阻塞。
- 优点:线程安全,用于生产者消费者模式。
实现类:
- ArrayBlockingQueue:有界,FIFO
- DelayQueue:元素有序,通常按照元素过期时间排序
- LinkedBlockingQueue:大小可选,可以指定大小,FIFO
- PriorityBlockingQueue:带优先级的阻塞队列,可以插入空值,必须实现Compareable接口。
- SynchronousQueue:只能允许容纳一个元素,当一个线程插入一个元素后就会被阻塞,除非这元素被另一个线程消费,所以也被称为无界非缓存队列。
线程池
优点:
- 重用存在的线程,减少对象的创建、消亡的开销,性能更好
- 可以有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多的资源竞争,避免阻塞。
ThreadPoolExecutor
- corePoolSize:核心线程数量
- maximumPoolSize:线程最大线程数
- workQueue:阻塞队列,存储等待执行的任务,
当一个任务进入线程池,会根据当前线程池中运行中的线程数量决定该任务的处理方式:
- 直接切换:即SynchronousQueue
- 使用无界队列:即LinkedBlockingQueue,线程池中能够运行的最大线程数就是corePoolSize,maximumPoolSize失效。
- 使用有界队列:即ArrayBlockingQueue,线程池中能够运行的最大线程数是maximumPoolSize,这样可以降低资源消耗,但降低了性能。
- keepAliveTime:线程2没执行任务最多保持多久时间终止。
- unit:keepAliveTime的时间单位。
- threadFactory:线程工厂,用来创建线程。
RejectedExecutionHandler:拒绝处理任务时的策略。
- AbortPolicy抛出RejectedExecutionException异常。
- DiscardOldestPolicy将队列最老的任务干掉
- DiscardPolicy也是丢弃任务,但是不抛出异常
- CallerRunsPolicy即不用线程池中的线程执行,而是交给调用方来执行
ThreadPoolExecutor
- execute():提交任务,交给线程池执行
- submit():提交任务,能够返回执行结果,execute加上Future
- shutdown():关闭线程池,等待线程执行完毕,即阻塞队列中的任务也进入线程池执行完毕,不再接受新任务。
- shutdownNow():关闭线程池,不等待任务执行完,暂停正在执行的线程,
Executors框架接口
- Executors.newCachedThreadPool:可缓存的线程池
- Executors.newFixedThreadPool:定长的线程池
- Executors.newScheduledThreadPool:定长的线程池,支持定时执行,这也就有了线程调度的思想,使用schedule方法和scheduleAtFixedRate()方法。
public class ThreadPoolExample4 { public static void main(String[] args) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); executorService.schedule(new Runnable() { @Override public void run() { log.warn("schedule run"); } }, 3, TimeUnit.SECONDS); executorService.shutdown(); } }
public class ThreadPoolExample4 { private static Logger log = LoggerFactory.getLogger(ThreadPoolExample4.class); public static void main(String[] args) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { log.warn("schedule run"); } }, 1, 3, TimeUnit.SECONDS); } }
- Executors.newSingelThreadPool:单线程化线程池,只用唯一的工作线程执行,可以使线程按照顺序执行。
线程池的合理配置
- CPU密集性任务,线程池容量设置为CPU数+1
- IO密集性任务,线程池容量设置为CPU数 * 2