2024-12-12Cider一面
介绍实习
突然给了我个链接让我写一个实战题
手写并发优化解决思路
我们要调用对方的RPC接口,我们的RPC接口每调用一次对方都会阻塞50ms
但是我们的业务要批量调用RPC,例如我们要批量调用1k次,我们不可能在for循环里面写1k次远程调用,因为我们1次就会阻塞50ms,我们for循环弄1k次那么就要等待1k×50ms
我们还要保证返回的结果是按照我们的请求顺序的
场景介绍:我们这边是C端的,我们不可能修改对方的代码,所以我们只能尽可能优化我们自己的代码提高接口效率
我的评价是,为啥别人都是八股就我遇到了实战题......我还准备了半天的八股,丧失了所有的力气和手段,优化思路全部说对了,但是我们的异步等待结束allOf()忘记了我能怎么办......还是太菜了
面试官评价:《你写代码都不缩进的吗》《我个人是有代码洁癖的,你想想你代码一点不规范让以后接手你代码的人怎么想》
我好像开了个玩笑说《交给后人的智慧》
屁颠屁颠写出了优化,不过写的很丑陋但也是成功异步优化了,但是它让我用Hash模算法来优化推送....我宕机了,继续往下写+不缩进写了一坨,但是勉强能跑 面试官说《你写的这堆是什么东西啊》,看来面试官看我的不缩进代码不爽了#面试#
我已经不想写了,我说写不出来我想让他换一题问我,问问八股也好,他说没事你思路都对了你继续写吧
问了20分钟的实习,40分钟坐牢,写了70%没写好就是没写出来某的一点办法。
第一次成功跑出来结果,但是我才发现他跟我说我们返回的调用结果还必须是按顺序的,于是重写。
第二次写忘记写了allOf()等待,弄了结果都是顺序的但是呢主线程没有等待异步线程执行完就结束了,于是输出不出结果
还是太菜了,我什么时候才能沉淀完然后感谢教练啊
解决思路
然后我后面对我的屎山代码进行了优化
1.通过Hash算法来分批运算,最后把结果存到map<Integer,String>里面然后来取,因为我们的顺序由id从低到高,所以我们可以通过id在map里面根据顺序取出然后放到我们的List里面
2.我们for循环,然后每一次循环都开启一个异步线程将结果存到Map里面,然后我们最终存到List。但我一开始有个问题,就是我没等全部执行完就存到我们的Map里面了,因为我不会写那个全局等待的代码......破防了
解决思路1
每一个线程都开一个异步,最后用allof()函数等待异步线程全部执行完
package com.kira.scaffoldmvc.appender; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.IntStream; public class RpcBatchRequestTest { static RpcService rpcService = new RpcService(); public static void main(String[] args) throws ExecutionException, InterruptedException { // rpc 请求参数 List<Integer> requestIds = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); // rpc 调用 List<String> results = batchGetDetails(requestIds); // 输出 for (String result : results) { System.out.println(result); } // 预期输出 // details 0 // details 1 // details 2 // ....... // details 999 } /** * 某个 rpc service 的接口只提供单个调用 * 此处需要做一个封装,多次请求后返回 * * 要求按照顺序返回 * * @param ids * @return */ public static List<String> batchGetDetails(List<Integer> ids) throws ExecutionException, InterruptedException { // 单次调用 // RpcService rpcService = new RpcService(); // String rpcResult = rpcService.rpcGetDetailsById(1); List<String> list=new ArrayList<>(); HashMap<Integer,String> map=new HashMap<>(); List<CompletableFuture<Void>> futures = new ArrayList<>(); //for循环里面的每一个都开启一个for for(int i=0;i<ids.size();i++) { int finalI = i; CompletableFuture future=CompletableFuture.supplyAsync(() -> { String s = rpcService.rpcGetDetailsById(ids.get(finalI)); map.put(finalI, s); return s; }); futures.add(future); } //futures.toArray(new CompletableFuture[0])) 将future数组转成CompletableFuture数组 //如果你传入 new CompletableFuture[0],Java 会动态调整数组大小,以适应 futures 中的元素数 //addOf()等待所有Completable异步线程都执行完 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); // TODO 在此处实现批量调用 for(int i=0;i<ids.size();i++) { list.add(map.get(i)); } return list; } } class RpcService { public String rpcGetDetailsById(int id) { // 模拟 rpc service 耗时 try { Thread.sleep(50L); } catch (InterruptedException e) { throw new RuntimeException(e); } return "details " + id; } }
解决思路2
分批推送解决思路,我们每批发送500个请求
然后我们allof()等待所有的异步线程都执行完
package com.kira.scaffoldmvc.appender; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.IntStream; public class RpcBatchRequestTest2 { static RpcService rpcService = new RpcService(); public static void main(String[] args) throws ExecutionException, InterruptedException { // rpc 请求参数 List<Integer> requestIds = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); // rpc 调用 List<String> results = batchGetDetails(requestIds); // 输出 for (String result : results) { System.out.println(result); } } /** * 按批次异步调用 RPC 接口,并确保按顺序返回 * * @param ids 请求 ID 列表 * @return 按顺序返回的结果列表 */ public static List<String> batchGetDetails(List<Integer> ids) throws ExecutionException, InterruptedException { int batchSize = 500; // 每批大小 List<CompletableFuture<List<String>>> batchFutures = new ArrayList<>(); // 按批次切分数据 for (int i = 0; i < ids.size(); i += batchSize) { int start = i; int end = Math.min(i + batchSize, ids.size()); List<Integer> batch = ids.subList(start, end); // 异步处理每个批次 CompletableFuture<List<String>> batchFuture = CompletableFuture.supplyAsync(() -> batch.stream() .map(rpcService::rpcGetDetailsById) // 调用 RPC 方法 .collect(Collectors.toList()) ); batchFutures.add(batchFuture); } // 等待所有批次完成并收集结果 List<String> results = new ArrayList<>(); CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).join(); for (CompletableFuture<List<String>> future : batchFutures) { results.addAll(future.get()); } return results; } } class RpcService2 { public String rpcGetDetailsById(int id) { // 模拟 rpc service 耗时 try { Thread.sleep(50L); } catch (InterruptedException e) { throw new RuntimeException(e); } return "details " + id; } }