什么,你还不会用CompletableFuture?

如果对JAVA8中的Supplier、Function等类不熟悉的话,可以先移步我的另外一篇文章快速理解Consumer、Supplier、Predicate与Function

上一篇我们讲了Future机制,有兴趣的可以参考谈谈Runnable、Future、Callable、FutureTask之间的关系

但Future机制,还不那么灵活,比如怎么去利用Future机制描述两个任务串行执行,又或是两个任务并行执行,又或是只关心最先执行结束的任务结果。

Future机制在一定程度上都无法快速地满足以上需求,CompletableFuture便应运而生了。

本片会介绍CompletableFuture的api,并用一些示例演示如何去使用。


1. 创建一个异步任务

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor); public static CompletableFuture<Void> runAsync(Runnable runnable); public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);

supplyAsync与runAsync的区别在于:supplyAsync有返回值,而runAsync没有返回值

带Executor参数的构造函数,则使用线程池中的线程执行异步任务(线程池可以参考说说线程池

不带Executor参数的构造函数,则使用ForkJoinPool.commonPool()中的线程执行异步任务(Fork/Join框架可以参考谈谈并行流parallelStream


1.1 示例:使用supplyAsync创建一个有返回值的异步任务

public class Case1 { public static void main(String[] args) throws Exception {

        CompletableFuture<Integer> completableFuture=CompletableFuture.supplyAsync(()->{ try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } return 1;
        }); //该方***一直阻塞 Integer result = completableFuture.get();
        System.out.println(result);
    }

}

2. 异步任务的回调

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);

whenComplete开头的方法在计算任务完成(包括正常完成与出现异常)之后会回调

而exceptionally则只会在计算任务出现异常时才会被回调

如何确定哪个线程去回调whenComplete,比较复杂,先略过。

而回调whenCompleteAsync的线程比较简单,随便拿一个空闲的线程即可,后缀是Async的方法同理。


2.1 示例:计算出现异常,使用whenComplete与exceptionally进行处理

package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.IntStream; /**
 * @author qcy
 * @create 2020/09/07 17:40:44
 */ public class Case2 { public static void main(String[] args) throws Exception {

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行supplyAsync的线程:" + Thread.currentThread().getName()); int i = 1 / 0; return 1;
        });

        completableFuture.whenComplete(new BiConsumer<Integer, Throwable>() { @Override public void accept(Integer integer, Throwable throwable) {
                System.out.println("执行whenComplete的线程:" + Thread.currentThread().getName()); if (throwable == null) {
                    System.out.println("计算未出现异常,结果:" + integer);
                }
            }
        });

        completableFuture.exceptionally(new Function<Throwable, Integer>() { @Override public Integer apply(Throwable throwable) { //出现异常时,则返回一个默认值 System.out.println("计算出现异常,信息:" + throwable.getMessage()); return -1;
            }
        });

        System.out.println(completableFuture.get());
    }

}

输出:

当然,CompletableFuture内的各种方法是支持链式调用与Lambda表达式的,我们进行如下改写:

public static void main(String[] args) throws Exception {

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行supplyAsync的线程:" + Thread.currentThread().getName()); int i = 1 / 0; return 1;
        }).whenComplete((integer, throwable) -> {
            System.out.println("执行whenComplete的线程:" + Thread.currentThread().getName()); if (throwable == null) {
                System.out.println("计算未出现异常,结果:" + integer);
            }
        }).exceptionally(throwable -> { //出现异常时,则返回一个默认值 System.out.println("计算出现异常,信息:" + throwable.getMessage()); return -1;
        });

        System.out.println("计算结果:" + completableFuture.get());
    }

3. 任务串行化执行

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); public CompletableFuture<Void> thenRun(Runnable action); public CompletableFuture<Void> thenAccept(Consumer<? super T> action); public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

thenApply,依赖上一次任务执行的结果,参数中的Function<? super T,? extends U>,T代表上一次任务返回值的类型,U代表当前任务返回值的类型,当上一个任务没有出现异常时,thenApply才会被调用

thenRun,不需要知道上一个任务的返回结果,只是在上一个任务执行完成之后开始执行Runnable

thenAccept,依赖上一次任务的执行结果,因为入参是Consumer,所以不返回任何值。

handle和thenApply相似,不过当上一个任务出现异常时,能够执行handle,却不会去执行thenApply

thenCompose,传入一次任务执行的结果,返回一个新的CompleteableFuture对象


3.1 示例:使用串行化任务分解两数相乘并输出

package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; /**
 * @author qcy
 * @create 2020/09/07 17:40:44
 */ public class Case4 { public static void main(String[] args) {
        
        CompletableFuture.supplyAsync(() -> 2)
                .thenApply(num -> num * 3)
                .thenAccept(System.out::print);
    }

}

很显然,输出为6

3.2 示例:使用串行化任务并且模拟出现异常

package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; /**
 * @author qcy
 * @create 2020/09/07 17:40:44
 */ public class Case4 { public static void main(String[] args) {

        CompletableFuture.supplyAsync(() -> 2)
                .thenApply(num -> num / 0)
                .thenApply(result -> result * 3)
                .handle((integer, throwable) -> { if (throwable == null) { return integer;
                    } else {
                        throwable.printStackTrace(); return -1;
                    }
                }).thenAccept(System.out::print);
    }

}

最终会输出-1


4. 任务同时执行,且都需要执行完成

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn); public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action); public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

thenCombine,合并两个任务,两个任务可以同时执行,都执行成功后,执行最后的BiFunction操作。其中T代表第一个任务的执行结果类型,U代表第二个任务的执行结果类型,V代表合并的结果类型

thenAcceptBoth,和thenCombine特性用法都及其相似,唯一的区别在于thenAcceptBoth进行一个消费,没有返回值

runAfterBoth,两个任务都执行完成后,但不关心他们的返回结构,然后去执行一个Runnable。

allOf,当所有的任务都执行完成后,返回一个CompletableFuture


4.1 示例:使用thenCombine合并任务

package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /**
 * @author qcy
 * @create 2020/09/07 17:40:44
 */ public class Case5 { public static void main(String[] args) throws Exception {

        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1开始"); try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务1结束"); return 2;
        });

        CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2开始"); try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2结束"); return 3;
        });

        CompletableFuture<Integer> completableFuture = cf1.thenCombine(cf2, (result1, result2) -> result1 * result2);
        System.out.println("计算结果:" + completableFuture.get());
    }

}

输出:

可以看到两个任务确实是同时执行的

当然,熟练了之后,直接使用链式操作,代码如下:

package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; /**
 * @author qcy
 * @create 2020/09/07 17:40:44
 */ public class Case6 { public static void main(String[] args) throws Exception {

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1开始"); try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务1结束"); return 2;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2开始"); try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2结束"); return 3;
        }), (result1, result2) -> result1 * result2);

        System.out.println("计算结果:" + completableFuture.get());
    }

}

5. 任务同时执行,且只取最先完成的那个任务

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn); public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action); public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action); public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

applyToEither,最新执行完任务,将其结果执行Function操作,其中T是最先执行完的任务结果类型,U是最后输出的类型

acceptEither,最新执行完的任务,将其结果执行消费操作

runAfterEither,任意一个任务执行完成之后,执行Runnable操作

anyOf,多个任务中,返回最先执行完成的CompletableFuture


5.1 示例:两个任务同时执行,打印最先完成的任务的结果

package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; /**
 * @author qcy
 * @create 2020/09/07 17:40:44
 */ public class Case7 { public static void main(String[] args) throws Exception {

        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1开始"); try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务1结束"); return 2;
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2开始"); try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2结束"); return 3;
        }), result -> System.out.println(result)); //等待CompletableFuture返回,防止主线程退出 completableFuture.join();
    }

}

输出:

可以看得到,任务2结束后,直接不再执行任务1的剩余代码


5.2 示例:多个任务同时执行,打印最先完成的任务的结果

package com.qcy.testCompleteableFuture; import java.util.concurrent.CompletableFuture; /**
 * @author qcy
 * @create 2020/09/07 17:40:44
 */ public class Case8 { public static void main(String[] args) throws Exception {

        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1开始"); try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务1结束"); return 2;
        });

        CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2开始"); try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2结束"); return 3;
        });

        CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务3开始"); try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务3结束"); return 4;
        });

        CompletableFuture<Object> firstCf = CompletableFuture.anyOf(cf1, cf2, cf3);
        System.out.println(firstCf.get());
    }

}

输出:


全部评论
更多内容请移步我的博客https://blog.csdn.net/qq_33591903
点赞 回复 分享
发布于 2020-09-11 10:42

相关推荐

04-16 10:27
已编辑
美团_Saas_后端开发
今天周一休息,突发奇想写一篇阶段总结。如题,我已经去了一个和Java彻底毫无关联的行业。曾经我以为自己能在计算机行业发光发热,拿到美团offer那会感觉自己天都亮了。没想到刚入行一年多就当了逃兵。从最开始的热爱到现在一看到代码就厌恶,不知道自己经历了什么。所以我去干什么了?答案是:在成都当了租房销售。上班那会压力大了就念叨着去干租房中介,但是一直下不去这个决心,想着自己学了四年多的计算机知识,终究还是不甘心。终于在某一天准备八股文的时候,看着无数篇和工作内容关系不大的理论知识,那一刻下定决心,决定尝试一下销售行业,也算是给自己一个交代。后面阴差阳错的投了成都自如去当租房管家,没想到面试很顺利,在当天一百多个面试的人里面,我成为了为数不多通过的几个幸运儿之一。目前已经培训通过,正式入职,也开了单,有压力但是每天过得很开心,真心喜欢那种和人交流的感觉,哪怕是最后没有选择找我租房。说这些也是想告诉那些大三,大四正在找Java实习而焦虑的同学:你们现在还年轻,选择很多,容错率也很高,可以尽情去尝试自己喜欢的行业和工作。不用因为某一次的面试没通过或者简历石沉大海而焦虑,更不用因为身边人都在挤编程的独木桥就强迫自己跟风。也算是自己的碎碎念吧,也希望自己能在新的领域取得一点小成就。也祝牛油工作顺利!
沉淀小子:干啥都不丢人啊,生存是必须要的,销售很考验一个人综合素质能力的,好的销售人脉和资源可不比写字楼的白领差啊
点赞 评论 收藏
分享
不愿透露姓名的神秘牛友
03-20 12:46
瘦嘟嘟右卫门:百度文库网盘的暑期也没约面吗
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务