并发编程—高并发场景微服务实战(七)
Hello,牛油们好,我是程序员Alan,很高兴遇见你。
在《 需求分析—高并发场景微服务实战(二)》一文的最后,我提了一个问题 “你会用什么方式获取和聚合机票信息?”今天我会详细地讲解解决这类问题的几种常用方法。
问题回顾
在开始讲解问题的解决方法之前,我们再来看一下问题的具体描述。搭建一个订票系统经常会有这样的需求 ,那就是同时获取多家航空公司的航班信息。比如,从深圳到三亚的机票钱是多少?有很多家航空公司都有这样的航班信息,所以应该把所有航空公司的航班、票价等信息都获取到,然后再聚合。由于每个航空公司都有自己的服务器,所以需要分别去请求它们的服务器,如下图所示:
解决方法
1. 串行
我们想获取所有航空公司某个航班信息,要先去访问东航,然后再去访问南航,以此类推。每一个请求发出去之后,等它响应回来以后,我们才能去请求下一个航空公司,这就是串行的方式。
这样做的效率非常低下,如果航空公司比较多,假设每个航空公司都需要 1 秒钟的话,那么用户肯定等不及,所以这种方式是不可取的。
2. 并行
既然串行的方法很慢,那么我们可以并行地去获取这些机票信息,然后再把机票信息给聚合起来,这样的话,效率会成倍的提高。
这种并行虽然提高了效率,但也有一个缺点,那就是会“一直等到所有请求都返回”。如果有一个航空公司的响应特别慢,那么我们的整个服务就会被拖累。所以我们需要再改进一下,增加超时获取的功能。
3. 有超时的并行获取
上图的这种情况,就属于有超时的并行获取,同样也在并行的去请求各个公司的机票信息。但是我们规定了一个超时时间,如果没能在指定时间内响应信息,我们就把这些请求给忽略掉,这样用户体验就比较好了,它最多只需要等固定的时间就能获得机票信息,虽然拿到的信息可能是不全的,但是总比一直等更好。
实现这个目标有多种实现方案,我们一个个的来看看。
3.1 线程池的实现
第一个实现方案是用线程池,我们来看一下代码。
/** * @author alan * @create 2022 - 10 - 05 15:17 */ public class ThreadPoolDemo { ExecutorService threadPool = Executors.newFixedThreadPool(3); public static void main(String[] args) throws InterruptedException { ThreadPoolDemo threadPoolDemo = new ThreadPoolDemo(); System.out.println(threadPoolDemo.getPrices()); } private Set<Integer> getPrices() throws InterruptedException { Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>()); threadPool.submit(new Task(1, prices)); threadPool.submit(new Task(2, prices)); threadPool.submit(new Task(3, prices)); Thread.sleep(3000); return prices; } private class Task implements Runnable { Integer productId; Set<Integer> prices; public Task(Integer productId, Set<Integer> prices) { this.productId = productId; this.prices = prices; } @Override public void run() { int price=0; try { Thread.sleep((long) (Math.random() * 6000)); price= productId; }catch (Exception e){ e.printStackTrace(); } prices.add(price); } } }
在代码中,新建了一个线程安全的 Set,命名为Prices 用它来存储价格信息,然后往线程池中去放任务。线程池是在类的最开始时创建的,是一个固定 3 线程的线程池。
在Task的run方法中,用一个随机的时间取模拟各个航空公司的响应时间,然后再返回我们传入的值作为票价,最后把这个票价放到Set中。
getPrices 函数中,我们新建了三个任务,productId 分别是 1、2、3,为了实现等待固定时间的功能,在这里调用了 Thread 的 sleep 方法来休眠 3 秒钟,它就会在这里等待 3 秒,之后直接返回 prices。
此时,如果 Math.random() * 6000) 的值很小,任务的响应速度快的话,返回的prices 里面最多会有三个值,但是如果每一个响应时间都很慢,那么可能 prices 里面一个值都没有。
这就是用线程池去实现的最基础的方案。
3.2 CountDownLatch
上面的方法有一个优化的空间,比如说网络特别好时,每个航空公司响应速度都特别快,你根本不需要等三秒,有的航空公司可能几百毫秒就返回了,那么我们也不应该让用户等 3 秒。所以需要进行一下这样的改进,看下面这段代码:
/** * @author alan * @create 2022 - 10 - 05 15:32 */ public class CountDownLatchDemo { ExecutorService threadPool = Executors.newFixedThreadPool(3); public static void main(String[] args) throws InterruptedException { CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo(); System.out.println(countDownLatchDemo.getPrices()); } private Set<Integer> getPrices() throws InterruptedException { Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>()); CountDownLatch countDownLatch = new CountDownLatch(3); threadPool.submit(new Task(1, prices, countDownLatch)); threadPool.submit(new Task(2, prices, countDownLatch)); threadPool.submit(new Task(3, prices, countDownLatch)); countDownLatch.await(3, TimeUnit.SECONDS); return prices; } private class Task implements Runnable { Integer productId; Set<Integer> prices; CountDownLatch countDownLatch; public Task(Integer productId, Set<Integer> prices,CountDownLatch countDownLatch) { this.productId = productId; this.prices = prices; this.countDownLatch = countDownLatch; } @Override public void run() { int price = 0; try { Thread.sleep((long) (Math.random() * 6000)); price = productId; } catch (InterruptedException e) { e.printStackTrace(); } prices.add(price); countDownLatch.countDown(); } } }
这段代码使用 CountDownLatch 实现了这个功能,整体思路和之前是一致的,不同点在于我们新增了一个 CountDownLatch,并且把它传入到了 Task 中。在 Task 中,获取完机票信息并且把它添加到 Set 之后,会调用 countDown 方法,相当于把计数减 1。
这样一来,在执行 countDownLatch.await(3, TimeUnit.SECONDS) 这个函数进行等待时,如果三个任务都非常快速地执行完毕了,那么三个线程都已经执行了 countDown 方法,那么这个 await 方法就会立刻返回,不需要傻等到 3 秒钟。
如果有一个请求特别慢,相当于有一个线程没有执行 countDown 方法,来不及在 3 秒钟之内执行完毕,那么这个带超时参数的 await 方法也会在 3 秒钟到了以后,及时地放弃这一次等待,于是就把 prices 给返回了。所以这样一来,我们就利用 CountDownLatch 实现了这个需求,也就是说我们最多等 3 秒钟,但如果在 3 秒之内全都返回了,我们也可以快速地去返回,不会傻等,提高了效率。
3.3 CompletableFuture
我们再来看一下用 CompletableFuture 来实现这个功能的用法,代码如下所示:
** * @author alan * @create 2022 - 10 - 05 15:59 */ public class CompletableFutureDemo { public static void main(String[] args) throws Exception { CompletableFutureDemo completableFutureDemo = new CompletableFutureDemo(); System.out.println(completableFutureDemo.getPrices()); } private Set<Integer> getPrices() { Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>()); CompletableFuture<Void> task1 = CompletableFuture.runAsync(new Task(1, prices)); CompletableFuture<Void> task2 = CompletableFuture.runAsync(new Task(2, prices)); CompletableFuture<Void> task3 = CompletableFuture.runAsync(new Task(3, prices)); CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3); try { allTasks.get(3, TimeUnit.SECONDS); } catch (Exception e) { e.printStackTrace(); } return prices; } private class Task implements Runnable { Integer productId; Set<Integer> prices; public Task(Integer productId, Set<Integer> prices) { this.productId = productId; this.prices = prices; } @Override public void run() { int price = 0; try { Thread.sleep((long) (Math.random() * 6000)); price = productId; } catch (InterruptedException e) { e.printStackTrace(); } prices.add(price); } } }
getPrices 方法中,我们用了 CompletableFuture 的 runAsync 方法,这个方***异步的去执行任务。
我们有三个任务,并且在执行这个代码之后会分别返回一个 CompletableFuture 对象,我们把它们命名为 task 1、task 2、task 3,然后执行 CompletableFuture 的 allOf 方法,并且把 task 1、task 2、task 3 传入。这个方法的作用是把多个 task 汇总,然后可以根据需要去获取到传入参数的这些 task 的返回结果,或者等待它们都执行完毕等。我们就把这个返回值叫作 allTasks,并且在下面调用它的带超时时间的 get 方法,同时传入 3 秒钟的超时参数。
它的效果是,如果在 3 秒钟之内这 3 个任务都可以顺利返回,那么会立即响应结果
但是如果有某一个任务没能来得及在 3 秒钟之内返回,那么这个带超时参数的 get 方法便会抛出 TimeoutException 异常,会被我们给 catch 住。
这样一来它就实现了这样的效果:会尝试等待所有的任务完成,但是最多只会等 3 秒钟,在此之间,如及时完成则及时返回,如果超时则抛出异常丢弃。
站在巨人的肩膀上
- 徐隆曦——《Java 并发编程核心 78 讲》
说到高并发和微服务,你是不是和我一样有很多的困惑? -知道高并发系统开发知识,是获取大厂Offer的利器,可是工作中遇不到高并发的需求场景。 -了解过微服务开发、高并发系统开发理论,苦于没实战经验。 为了帮助其他有这些困惑的朋友,我决定以一个虚拟的高并发场景的微服务系统为主线,一步步将技术点串联起来,系统性从 0 到 1 的创造一个高并发场景的微服务系统。