【详解】Executors框架之CompletionService
Future的缺点
虽然Future可以异步的执行任务,但是还是有很多缺点:
没有办法回调
,需要手动的调用- 执行一组任务需要
等待所有的任务执行完
CompletionService简介
CompletionService的实现目标是任务先完成可优先获取到,即结果按照完成先后顺序排序。
ExecutorCompletionService类,该类只有三个成员变量:
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
....
}
- 可以看到ExecutorCompletionService主要是增强executor线程池的。
- Task包装后被塞入completionQueue,当Task结束,其Future就可以从completionQueue中获取到。
执行流程为:
基本使用
public class CompletionServiceTest {
public static void main(String[] args) {
Long start = System.currentTimeMillis();
//开启3个线程
ExecutorService exs = Executors.newFixedThreadPool(5);
try {
int taskCount = 10;
// 结果集
List<Integer> list = new ArrayList<Integer>();
// 1.定义CompletionService
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exs);
// 2.添加任务
for(int i=0;i<taskCount;i++){
completionService.submit(new Task(i+1));
}
// 3.获取结果
for(int i=0;i<taskCount;i++){
Integer result = completionService.take().get();
list.add(result);
}
System.out.println("list="+list);
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭线程池
exs.shutdown();
}
}
static class Task implements Callable<Integer> {
Integer i;
public Task(Integer i) {
super();
this.i=i;
}
@Override
public Integer call() throws Exception {
if(i==5) {
Thread.sleep(5000);
}else{
Thread.sleep(1000);
}
System.out.println("线程:"+Thread.currentThread().getName()+"任务i="+i+",执行完成!");
return i;
}
}
}
结果:
线程:pool-1-thread-1任务i=1,执行完成!
线程:pool-1-thread-2任务i=2,执行完成!
线程:pool-1-thread-3任务i=3,执行完成!
线程:pool-1-thread-4任务i=4,执行完成!
线程:pool-1-thread-1任务i=7,执行完成!
线程:pool-1-thread-3任务i=8,执行完成!
线程:pool-1-thread-4任务i=9,执行完成!
线程:pool-1-thread-2任务i=6,执行完成!
线程:pool-1-thread-1任务i=10,执行完成!
线程:pool-1-thread-5任务i=5,执行完成!
list=[1, 2, 3, 4, 7, 8, 9, 6, 10, 5]
- 可以发现结果顺序就是任务完成的顺序
阻塞和非阻塞获取
public Future<V> take()throws InterruptedException
public Future<V> poll()
public Future<V> poll(long timeout,TimeUnit unit) throws InterruptedException
阻塞获取
take方法回使调用者阻塞,可以保证一定会有Future取出
非阻塞获取
poll方***去查看是否有任务完成,有则取出;没有,就会返回一个null