教你实现简单版的字节RPC优雅重试组件

前段时间在做毕设时,发现自己的机器网络有点差,RPC出现调用失败的次数有点多,因此想寻找比较好的重试方式。在网上找了相关资料后,发现字节有一篇技术文章写了其内部是如何优雅的重试RPC,一开始看的不是很懂,但我看完后大受震撼(在文末贴出了链接)。本文是基于该文章的思路,实现了部分功能:对Feign调用实现了链路级别的重试,以及针对调用链超时情况做了优化。之所以选择Feign,是因为实现起来较为简单,当然在企业中用的比较多应该还是dubbo、thrift等等这些rpc框架,不过思路都是一样的。

为何需要优雅的RPC重试

在微服务系统中,会拆分出多个小服务,每个小服务之间会有大量的 RPC 调用,但由于微服务网络环境较为复杂,经常可能因为网络抖动、存储资源抖动等原因导致 RPC 调用失败。使用重试机制可以提高请求的最终成功率,提升系统的稳定性。
虽然重试能够提高服务的稳定性,但在实际情况中由于微服务系统中调用链路较长,普通的重试存在链路放大的效应,继而可能导致链路上每层的服务被突增的流量打挂而导致系统的雪崩,即重试风暴
链路放大导致重试风暴的示意图如图所示。假设重试场景是服务 A 调用服务 B,服务 B 调用 DB 服务,服务 A 和 B 的重试次数均为 3。如果服务 B 请求 DB 服务 3 次都失败了,这时服务 B 会给服务 A 返回失败的状态码,但由于服务 A 也有重试逻辑,因此最终DB 服务会被请求 9 次,是预想设定的重试次数 3 次的指数倍。

本文基于 Spring AOP 和 GuavaRetryer 对 Feign 的服务调用实现了一个重试组件,该组件限制了链路重试,达到了只有在最靠近错误的那一层服务才可以重试的效果。

实现过程

防止重试风暴的核心是限制每一层的重试,在最理想情况下只有最接近故障的那一层会发生重试。本文使用了特殊错误码的方式来实现,该错误码含义为本层调用失败,但别对本层重试。在任何一层失败后返回此错误码给上层,上层收到后停止重试并继续返回给上层。同时针对在超时情况下,上层接收不到下层返回的错误码的场景添加了其他的错误码来限制重试。此种方式对业务有一定的入侵性,但好在系统内部统一使用 Feign,可以将错误码放在Header 中,避免对现有业务接口进行改动,同时基于 Spring AOP 和 Feign 拦截器对错误码进行解析和传递,可实现当前业务代码的无需改动即可接入。

核心类设计

  1. 基础实体类
    设计 RetryStatus 实体类保存上游传来的信息,包括当前请求是否为重试请求和当前链路中还剩余的可调用时间。若请求为重试请求,则当前层不会发生重试;若剩余的可调用时间为 0,表示整个链路的耗时已经超过了设定的超时时间,那么就不会处理该请求,直接返回,减少无用的重试。
    RetryConfig 实体类用于保存当前层发起 RPC 的配置信息,包括当前的重试次数、请求开始处理的时间、上层传递过来的剩余可调用时间、上层的请求是否为重试请求和下游的响应错误码是否为停止重试错误码。
    RetryConfigContext 是保存了 RetryConfig 的上下文工具类,使用了线程安全的 ThreadLocal 实现,用于在同一线程中的不同处理方法都能过访问和修改同一 RetryConfig。
  2. 切面设计
    @RpcRetry 是注解,作用在需要使用重试机制的 RPC 方法上,可配置的属性有重试次数、超时时间和是否需要为上层返回停止重试的错误码。
    RpcRetryAspect 是该注解的切面类,使用了环绕通知的方式来处理切点,实现了重试机制的主要处理逻辑。
  3. 拦截器设计
    RetryFeignInterceptor 是发起 RPC 前的拦截器,用于向下游传递当前请求是否为重试请求、当前链路剩余的可调用时间。
    FeignOkHttpClientResponseInterceptor 是 RPC 响应结果的拦截器,用于解析下游是否传递了停止重试的错误码,然后使用 RetryConfigContext 更新当前线程的 RetryConfig。

类图

图片说明

详细设计

先看看注解,属性有重试次数、超时时间和是否需要为上层返回停止重试的错误码。

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcRetry {
    int retryTimes() default 3;
    int ddl() default 3000;
    // 返回给上一层的
    boolean noMoreRetry() default true;
}

RPC重试注解的切面的实现过程:

@Aspect
@Component
@Slf4j
@Order(Ordered.HIGHEST_PRECEDENCE)
public class RpcRetryAspect {

    @Resource
    private RetryConfigContext retryConfigContext;

    @Pointcut("@annotation(com.coder.common.annotation.RpcRetry)")
    public void pointcut() {
    }

    @Around("pointcut()")
    public Object around(ProceedingJoinPoint joinPoint) throws NoMoreRetryException {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        RpcRetry rpcRetryBaseConfig = method.getAnnotation(RpcRetry.class);
        // 解析出上游的重试参数
        RetryStatus retryUpstreamParams = extractRetryUpstreamParams(rpcRetryBaseConfig);
        if(retryUpstreamParams.getDdl()<=0){
            log.error("The request has reached the deadline.");
            throw new BusinessException("The request has reached the deadline.");
        }
        // 生成重试过程的记录器
        RetryConfig retryConfig = buildRetryRecorder(retryUpstreamParams);
        if(rpcRetryBaseConfig.noMoreRetry()){
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
            requestAttributes.getResponse().addHeader(RetryStatus.NO_MORE_RETRY_HEADER, String.valueOf(rpcRetryBaseConfig.noMoreRetry()));
        }
        // 当前配置的重试次数
        int retryTimes = rpcRetryBaseConfig.retryTimes();
        if(retryUpstreamParams.isRetryRequest()){
            retryTimes = 1;
        }
        RetryerBuilder<RetryResult> retryerBuilder = RetryerBuilder.newBuilder();
        Retryer<RetryResult> retryer = retryerBuilder
                .withStopStrategy(StopStrategies.stopAfterAttempt(retryTimes))
                .retryIfExceptionOfType(RetryForCaughtException.class)
                .retryIfRuntimeException()
                .withWaitStrategy(WaitStrategies.fixedWait(500, TimeUnit.MILLISECONDS))
                .withRetryListener(new RetryListener() {
                    @Override
                    public <V> void onRetry(Attempt<V> attempt) {
                        retryConfig.setRetryTimes(attempt.getAttemptNumber());
                        retryConfigContext.set(retryConfig);
                    }
                })
                .build();
        AtomicReference<Object> ret = new AtomicReference<>();
        RetryResult retryResult = RetryResult.FAIL;
        try {
            retryResult = retryer.call(()->{
            // 检查下游传来的停止重试标志
                if(retryConfigContext.getRetryConfig().isNoMoreRetry()){
                    log.warn("Received no more retry flag. Stop retrying.");
                    return RetryResult.NO_MORE_RETRY;
                }
                try {
                    ret.set(joinPoint.proceed());
                } catch (Throwable throwable) {
                    log.error("Throwable: ",throwable);
                    throw new RetryForCaughtException(throwable);
                }
                return RetryResult.SUCCESS;
            });
        } catch (ExecutionException e) {
            log.error("Occur ExecutionException",e);
        } catch (RetryException e) {
            log.error("Occur RetryException",e);
        }
        if(retryResult==RetryResult.NO_MORE_RETRY){
            throw new NoMoreRetryException("Received no more retry flag. Stop retrying.");
        }
        return ret.get();
    }

    public RetryConfig buildRetryRecorder(RetryStatus retryUpstreamParams){
        RetryConfig retryConfig = new RetryConfig();
        retryConfig.setLastDdl(retryUpstreamParams.getDdl());
        retryConfig.setRetryRequest(retryUpstreamParams.isRetryRequest());
        retryConfig.setStartTime(System.currentTimeMillis());
        retryConfigContext.set(retryConfig);
        return retryConfig;
    }

    public RetryStatus extractRetryUpstreamParams(RpcRetry rpcRetry){
        RetryStatus retryStatus = new RetryStatus();
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        try {
            retryStatus.setRetryRequest(Boolean.parseBoolean(request.getHeader(RetryStatus.IS_RETRY_REQUEST_HEADER)));
        }catch (Exception e){
            retryStatus.setRetryRequest(false);
        }
        try {
            retryStatus.setDdl(Integer.parseInt(request.getHeader(RetryStatus.DDL_HEADER)));
        }catch (Exception e){
            retryStatus.setDdl(rpcRetry.ddl());
        }
        return retryStatus;
    }

}

接下来看看拦截器的实现过程,分为请求拦截器和响应结果的拦截器。
先看看请求拦截器,实现比较简单,只需要把标志设置到header中去即可。标志有两个,一个是标志当前请求是否为重试的请求;另一个是当前链路剩余的可用时间,如果小于0,下游会直接拒绝执行。

@Slf4j
@Component
public class RetryFeignRequestInterceptor implements RequestInterceptor {

    @Resource
    private RetryConfigContext configContext;

    @Override
    public void apply(RequestTemplate template) {
        RetryConfig retryConfig = configContext.getRetryConfig();
        if(retryConfig!=null){
            template.header(RetryStatus.IS_RETRY_REQUEST_HEADER, String.valueOf(retryConfig.getRetryTimes()>=1|| retryConfig.isRetryRequest()));
            template.header(RetryStatus.DDL_HEADER, String.valueOf(retryConfig.getLastDdl()-(System.currentTimeMillis()- retryConfig.getStartTime())));
        }
    }
}

然后是响应结果的拦截器,这个主要用来解析下游的标志,然后更新重试的参数。

@Component
@Slf4j
public class FeignOkHttpClientResponseInterceptor implements Interceptor {

        @Resource
        RetryConfigContext configContext;

        Gson gson = new Gson();

        @Override
        public Response intercept(Chain chain) throws IOException {

            Request originalRequest = chain.request();
            Response response = chain.proceed(originalRequest);

            MediaType mediaType = response.body().contentType();
            RetryConfig retryConfig = configContext.getRetryConfig();
            if(retryConfig==null){
                retryConfig = new RetryConfig();
            }
            try {
                // 解析重试的是否要继续重试的标志
retryConfig.setNoMoreRetry(Boolean.parseBoolean(response.header(RetryStatus.NO_MORE_RETRY_HEADER)));
            }catch (Exception e){
            }
            configContext.set(retryConfig);

            String resp = response.body().string();
            try {
                // 解析业务处理结果
                Result result = gson.fromJson(resp, Result.class);
                if(!result.getCode().equals(STATUS_CODE.SUCCESS)){
                    throw new RpcBizErrorException("Rpc biz failed.");
                }
            }catch (Exception e){
                log.error("Rpc result error:{}",resp);
                throw new RpcBizErrorException("Rpc biz failed.");
            }
            //生成新的response返回,网络请求的response如果取出之后,直接返回将会抛出异常
            return response.newBuilder()
                    .body(ResponseBody.create(mediaType, resp))
                    .build();
        }
    }

测试结果

看看这个重试组件究竟能不能发挥它的作用吧。测试场景如下:有三个微服务 A、B 和 C,调用链为 A 调用 B,B 调用 C;微服务 A 和 B 的重试次数都为 3;分别在 A 和 B 的 RPC 接口上使用重试组件;服务 C 模拟业务异常和超时的情况。
预期结果是非超时情况下,C 的调用次数是 3 次;超时情况下,C 的调用次数不超过 5 次。
首先测试在 A 和 B 都不使用重试组件时,观察最后一层服务 C 的被调用次数,如图所示,服务 C 最终是被调用了 9 次,即产生了重试风暴。
图片说明
接下来测试在 A 和 B 都使用重试组件后,调用都不超时的情况。观察服务B 和 C 的被调用次数,以及 A 的重试过程。如图所示,服务 A 收到特殊错误码后停止重试;服务 B 被调用了 1 次;服务 C被调用了 3 次。
图片说明
图片说明
图片说明
接下来测试在 A 和 B 都使用重试组件后,调用超时的情况,且不开启调用链超时停止重试,此时服务 A 接收不到特殊的错误码。观察服务 B 和 C 的被调用次数。服务 A 因收不到错误码而重试了 3 次;服务 B 被调用了 3 次;服务 C 被调用了 5 次,少于 9 次。
图片说明
图片说明
若开启调用链超时停止重试机制,服务 C 的调用次数在时间设置得比较合理的情况下,调用次数会少于 5 次。
图片说明

当然测试还不够全面,就当前结果来看,是符合预期结果的。

改进的地方

  1. 目前基于注解的方式去配置重试参数,会比较死板,参数改动后需要重启项目。后续可以基于Nacos的动态配置方式,在 RPC 调用的时候自动从Nacos上读取最新的RPC重试配置并生效。字节跳动的配置的维度是选定 [调用方服务,调用方集群,被调用服务, 被调用方法] 为一个元组,按照元组来进行配置。
  2. 目前没有实现单点重试,一个服务可能会不受限制地对下游发起重试。
  3. 没有实现 Backup Requests 超时优化方案来降低整体延时。

最后

从整体上来看,实现的过程并不复杂,难点主要在于每个部分如切面逻辑、拦截器等的数据流向应该如何设计。在实现这个组件的过程中,我的动手能力得到了一定提升。后续有时间的话我会接着去实现上面可改进的点。觉得不错的同学三连走起吧

参考文献:字节跳动技术团队公众号——如何优雅地重试

全部评论

相关推荐

2024-12-30 22:49
长沙理工大学 Java
神哥了不得:没什么可以指导的地方了,简历确实牛,我大号分享过投递策略,广投就行
点赞 评论 收藏
分享
评论
3
6
分享

创作者周榜

更多
牛客网
牛客企业服务