【详解】Executors框架之CompleableFuture
目录
引入
使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
<mark>优势如下:</mark>
- 可以利用结果进行级联的执行
- 会自动回调给调用者
- 执行一批任务时,可以按照任务执行的顺序,获得结果
- 可以并行的获取结果,只拿最先获取的结果级联的执行
简介
CompleableFuture依然是对Executor的封装,看构造函数的源码,可以知道一般情况下会创建一个ForkJoinPool,同时ThreadFactory会设置为守护线程。这就意味着:一旦主线程结束,线程池就会关闭。
简单实用
public class CompletableFutureTest {
public static void main(String[] args) throws InterruptedException {
CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).whenComplete((v,t)->{
System.out.println("Done");
});
Thread.currentThread().join();
}
}
结果:
Done
可以发现CompleableFuture,在执行完成后,会自动去调用下个任务的方法,不会受到阻塞。
创建CompleableFuture
创建CompleableFuture
不建议使用构造方法,而是使用静态的工厂方法构建。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
public static <U> CompletableFuture<U> completedFuture(U value)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
-
allOf(CompletableFuture<?>... cfs)
:这个方***返回一个全新的CompletableFuture,同时会把传递进去的所有CompletableFuture执行完才算是执行完成 -
anyOf(CompletableFuture<?>... cfs)
:这个方***返回一个全新的CompletableFuture,只要传递进去的一个CompletableFuture执行完,就算是执行完成 -
completedFuture(U value)
:可以假设一个执行出了一个结果,进行下面的级联操作 -
runAsync
:异步的执行Runnable -
supplyAsync
:异步的执行Supplier实例,会有回调的结果U
supplyAsync举例
public class CompletableFutureTest {
public static void main(String[] args) throws InterruptedException {
supplyAsync();
Thread.currentThread().join();
}
private static void supplyAsync() throws InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello ");
CompletableFuture<String> future2 = future1.thenApplyAsync(obj -> {
try {
System.out.print(obj);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "World! ";
});
future2.thenAccept(System.out::println);
}
}
结果:
Hello World!
分析:
需要给supplyAsync
提供一个Supplier接口,如下所示:
public interface Supplier<T> {
T get();
}
runAsync举例
public class CompletableFutureTest {
public static void main(String[] args) throws InterruptedException {
supplyAsync();
Thread.currentThread().join();
}
private static void supplyAsync() throws InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.print("Hello "));
CompletableFuture<Void> future2 = future1.thenAccept(t -> {
System.out.println("World !");
});
}
}
结果:
Hello World!
分析:
比一般的提交一个Runnable相比,可以更加灵活点使用,级联、并联等操作
anyOf 举例
同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
// 两个CompletableFuture执行异步查询:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://finance.sina.com.cn/code/");
});
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://money.163.com/code/");
});
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 两个CompletableFuture执行异步查询:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://money.163.com/price/");
});
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最终结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(200);
}
static String queryCode(String name, String url) {
System.out.println("query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code, String url) {
System.out.println("query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
结果:
query code from https://finance.sina.com.cn/code/...
query code from https://money.163.com/code/...
query price from https://finance.sina.com.cn/price/...
query price from https://money.163.com/price/...
price: 7.50078323021774
分析:
<mark>需要注意一点,虽然是异步的从一个地方取值,但是其他任务依然会执行完成,而并非不再执行了</mark>
组合方法
组合两个任务,同时处理两个结果
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T,? super U> action,
Executor executor)
举例:
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenAcceptBoth(CompletableFuture.supplyAsync(() -> 100),
(s,i)-> System.out.println("s: "+ s + ", i: " + i));
Thread.currentThread().join();
}
}
结果:
s: Hello, i: 100
分析
- 可以看出是两个任务组合,然后同时将两个结果一起处理
组合两个任务,任务完成后做的操作
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action)
分析
- 相当于组合两个任务,执行
thenRun
当两个任务任意一个执行完成后,执行一个操作
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action)
举例
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello");
return "a";
});
CompletableFuture<Void> future = completableFuture.runAfterEitherAsync(CompletableFuture.supplyAsync(() -> {
System.out.println(100);
return 100;
}),
() -> System.out.println("done."));
Thread.currentThread().join();
}
}
结果:
Hello
100
done.
分析
- 相当于一次同步任务
组合两个任务,处理后,返回一个结果
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
举例
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "a");
CompletableFuture<Boolean> future = completableFuture.thenCombine(CompletableFuture.supplyAsync(() -> 100),
(s,i)->{
System.out.println("s: " + s +" , i : " + i);
return true;
});
System.out.println(future.get());
Thread.currentThread().join();
}
}
结果:
s: a , i : 100
true
合并两个任务,第一个任务的输出是第二个任务的输入
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn,
Executor executor)
分析
- 相当于一次级联操作
中转方法
有返回值
当执行完成时执行的操作
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
举例
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture.whenComplete((v, t) -> System.out.println(v + " World !"));
System.out.println(future.get());
Thread.currentThread().join();
}
}
结果
Hello World !
Hello
分析
- 当执行完成时执行的回调方法
- 该方***接收执行的结果以及异常
- 回调完成会,会把任务执行的结果传递回去
- whenCompleteAsync是异步的;whenComplete是同步的,会卡住主线程
- 需要传递一个
BiConsumer
接口,如下所示:
public interface BiConsumer<T, U> {
void accept(T t, U u);
}
- T是执行的结果,U是执行时产生的异常
级联操作
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor)
举例
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Integer> future = completableFuture.thenApplyAsync(t -> {
String s = t + " World !";
System.out.println(s);
return s.length();
});
System.out.println(future.get());
Thread.currentThread().join();
}
}
public interface Function<T, R> {
R apply(T t);
}
结果
Hello World !
13
分析
- 是一个级联操作,即拿着上个任务的结果,做下个任务,同时返回一个新的结果
处理结果的操作
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn,Executor executor)
举例
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Integer> future = completableFuture.handleAsync((s,t) -> {
String aaa = t + " World !";
System.out.println(aaa);
return aaa.length();
});
System.out.println(future.get());
Thread.currentThread().join();
}
}
结果:
Hello World !
13
分析:
- 相比于
whenComplete
返回值可以自己处理,相当于一次级联 - 相比于
thenApply
,可以处理异常
无返回值
处理结果
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
举例
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenAccept(t -> {
String aaa = t + " World !";
System.out.println(aaa);
});
System.out.println(future.get());
Thread.currentThread().join();
}
}
结果
Hello World !
null
分析
- 相当于一次级联,但是没有返回值
执行完全部任务
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
分析
- 相较
thenAccept
,不处理任务的执行结果
终结方法
处理异常
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
int a = 1/0;
return "World ";
});
completableFuture.exceptionally(Throwable::getMessage).thenAccept(t->{
System.out.println(t);
});
Thread.currentThread().join();
}
}
立马获取结果
public T getNow(T valueIfAbsent)
举例
public class CompletableFutureTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "World ");
Thread.sleep(1000);
String now = completableFuture.getNow("Hello");
System.out.println(now);
System.out.println(completableFuture.get());
Thread.currentThread().join();
}
}
结果
World
World
分析
- 如果结果完成返回结果,如果未完成,返回传入进去的值
判断结果是否完成,如果未完成则赋予结果
public boolean complete(T value)
判断结果是否完成,如果未完成返回异常
public boolean completeExceptionally(Throwable ex)
后续获取结果会产生异常
public void obtrudeException(Throwable ex)
总结
thenAccept
()处理正常结果;exceptionally
()处理异常结果;thenApplyAsync
()用于串行化另一个CompletableFuture
;anyOf
()和allOf
()用于并行化多个CompletableFuture
。