Redis应用—5.Redis相关解决方案

大纲

1.数据库与缓存一致性方案

2.热key探测系统处理热key问题

3.缓存大value监控和切分处理方案

4.Redis内存不足强制回收监控告警方案

5.Redis集群缓存雪崩自动探测 + 限流降级方案

6.缓存击穿的解决方法

线上Redis比较严重的问题排序是:数据库和缓存一致性、热key、大value、缓存雪崩限流降级、内存不足强制回收

1.数据库与缓存一致性方案

(1)数据库与缓存同步双写强一致性方案

(2)数据库与缓存异步同步最终一致性方案

现有的业务场景下,都会涉及到数据库以及缓存双写的问题。⽆论是先删缓存再更新数据,或先更新数据再删缓存,都⽆法保证⼀致性。本身它们就不是⼀个数据源,⽆法通过代码上的谁先谁后去保证顺序。

(1)数据库与缓存同步双写强一致性方案

这是适合中小企业的方案:读数据时自动进行读延期,实现数据冷热分离。在保证数据库和缓存一致性时使用分布式锁,第一个获得分布式锁的线程双写数据库和缓存成功后才释放分布式锁。然后在高并发下,通过锁超时时间,实现"串行等待分布式锁 + 串行读缓存"转"串行读缓存"。

(2)数据库与缓存异步同步最终一致性方案

如果不想对数据库和缓存进行双写,可以监听数据库binlog日志,通过异步来进行数据复制同步,从而保证数据的最终一致性。

这个方案需要先写成功DB,之后才能读到缓存value。这个方案需要确保binlog不能丢失,并且需要使用Canal监听binlog。

一.具体的数据一致性方案设计

⾸先对于所有的DB操作都不去添加具体的删除缓存操作,⽽是待数据确认已提交到数据库后,通过Canal去监听binlog的变化。

Canal会将binlog封装成消息发送到MQ,然后系统消费MQ的消息时,需要过滤出增删改类型的binlog消息。接着根据binlog消息 + 一致性相关的表和字段组装需要进行缓存删除的key,最后组装出key就可以对缓存进行删除了。

二.具体的数据一致性方案流程图

三.处理MQ消息保证最终数据一致性

//处理MQ消息保证最终数据一致性
@Slf4j
@Component
public class CookbookConsistencyListener implements MessageListenerConcurrently {
    @Autowired
    private RedisCache redisCache;

    //处理MySQL的binlog变化,处理需要清理的缓存key
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for (MessageExt messageExt : list) {
                String msg = new String(messageExt.getBody());
                //解析binlog数据模型,并过滤掉查询
                BinlogDataDTO binlogData = buildBinlogData(msg);
                //获取binlog的模型,获取本次变化的表名称,在本地配置常量类里面匹配对应的缓存key前缀以及缓存标识字段,非配置的表不进行处理
                String cacheKey = filterConsistencyTable(binlogData);
                //删除该key的缓存
                deleteCacheKey(cacheKey);
            }
        } catch (Exception e) {
            log.error("consume error, 缓存清理失败", e);
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    //解析binlog的数据模型,并过滤掉查询的binlog
    private BinlogDataDTO buildBinlogData(String msg) {
        //先解析binlog的对象,转换为模型
        BinlogDataDTO binlogData = BinlogUtils.getBinlogData(msg);
        //模型为null,则直接返回
        if (Objects.isNull(binlogData)) {
            return null;
        }
        Boolean isOperateType = BinlogType.INSERT.getValue().equals(binlogData.getOperateType())
            || BinlogType.DELETE.getValue().equals(binlogData.getOperateType())
            || BinlogType.UPDATE.getValue().equals(binlogData.getOperateType());
        //只保留增删改的binlog对象,如果数据对象为空则也不处理
        if (!isOperateType || CollectionUtils.isEmpty(binlogData.getDataMap())) {
            return null;
        }
        //返回解析好的可用模型
        return binlogData;
    }

    //过滤掉目前不需要处理的表的Binlog,并返回组装所需的缓存key
    private String filterConsistencyTable(BinlogDataDTO binlogData) {
        if (Objects.isNull(binlogData)) {
            return null;
        }
        String tableName = binlogData.getTableName();
        List<Map<String, Object>> dataList = binlogData.getDataMap();
        //获取配置的常量映射的具体配置
        ConsistencyTableEnum consistencyTableEnum = ConsistencyTableEnum.findByEnum(tableName);
        if (Objects.isNull(consistencyTableEnum)) {
            return null;
        }
        String cacheValue = "";
        if (CollectionUtils.isNotEmpty(dataList)) {
            Map<String, Object> dataMap = dataList.get(0);
            cacheValue = dataMap.get(consistencyTableEnum.getCacheField()) + "";
        }
        if (StringUtils.isBlank(cacheValue)) {
            return null;
        }
        //获取配置的缓存前缀key + 当前的标识字段,组装缓存key
        return consistencyTableEnum.getCacheKey() + cacheValue;
    }

    //对缓存进行清理
    private void deleteCacheKey(String cacheKey) {
        if (StringUtils.isBlank(cacheKey)) {
            return;
        }
        redisCache.delete(cacheKey);
    }
}

public enum ConsistencyTableEnum {
    //商品表缓存配置
    SKU_INFO("sku_info", RedisKeyConstants.GOODS_INFO_PREFIX, "id");

    //配置相关的表名称
    private final String tableName;

    //缓存的前缀key
    private final String cacheKey;

    //缓存的标识字段
    private final String cacheField;
    ...
}

2.热key探测系统处理热key问题

(1)什么是热key问题

(2)如何解决热key问题

(3)开源热key探测系统的工作流程图和架构图

当系统大量使用Redis进行开发后,线上必然会遇到热key和大value问题。

(1)什么是热key问题

比如微博大热点,瞬间千万级大流量都会涌入某微博来浏览某个数据。这时如果这个热点数据是存储在一个Redis节点里,那么就会出现每秒百万级的请求都到一个Redis节点去了。如下图示:

(2)如何解决热key问题

为了解决热key问题,我们首先需要一个热key探测系统。热key探测系统会在服务系统(Redis客户端)进行接入统计,一旦热key探测系统在服务系统识别出某个key符合热key的条件,那么就会将这个热key的数据缓存到服务系统的JVM本地缓存里。

所以,热key探测系统具备的两大核心功能:

一.自动探测热key

二.自动缓存热key数据到JVM本地缓存

(3)热key探测系统的工作流程图和架构图

一.热key探测系统的工作流程简图

二.热key探测系统的详细架构图

3.缓存大value监控和切分处理方案

大value,顾名思义,就是value值特别大,几M甚至几十M。如果在一次网络读取里面,频繁读取大value,会导致网络带宽被占用掉。value太大甚至会把带宽打满,导致其他数据读取请求异常。所以对于大value,要进行特殊的切分处理。

一.首先要能够对Redis里的大value进行监控。如果发现超过1MB的大value 的值,就监控和报警。

二.然后进行自动处理,也就是把这个value的值,转换为拆分字符串来缓存。比如将一个大value拆分为10个串,把一个kv拆分为10个kv来存储:test_key => test_key_01、test_key_02...

三.接着进行读取的时候,则依次读取这拆分字符串对应的key,最后将读出来的拆分字符串进行重新拼接还原成原来的大value值。

处理流程如下:

4.Redis内存不足强制回收监控告警方案

(1)背景

(2)方案设计

(3)通知方式

(4)配置环境

(5)Redis回收事件订阅者

(6)Redis回收事件消费者

(1)背景

当Redis内存中的数据达到maxmemory指定的数值时,将会触发执行Redis的回收策略(Eviction Policies)。

如果执⾏的回收策略对key进⾏了回收(Eviction),那么被回收的key应该要通知到业务端,让业务端⾃⾏去判断处理。

(2)方案设计

根据Redis的发布订阅机制(pub/sub)与键空间通知特性,可通过订阅Redis的回收事件去捕捉被回收的key信息,然后通知到业务端。

键空间通知特性:

每当从数据集中删除⼀个具有⼀定⽣存时间的键时,就会⽣成⼀个过期事件。每次执行回收策略要从数据集中回收⼀个键时,就会⽣成⼀个回收事件。

(3)通知方式

方式一:中间层去订阅Redis的回收事件,收到回收消息后就发消息给MQ。不同业务端只和MQ对接,通过tags过滤消费消息,⾃⾏处理被回收的key。

方式二:不具有中间层,由业务端⾃⾏去订阅Redis的淘汰key,然后去处理。

这里使⽤的是第⼀种⽅式。

(4)配置环境

因为开启键空间通知功能需要消耗⼀些CPU,所以在默认配置下, 键空间通知功能处于关闭状态。可以通过修改配置⽂件或使⽤config set命令,可开启或关闭键空间通知。

通过config get notify-keyspace-events查看notify-keyspace-events属性,当notify-keyspace-events属性值为空时,表示功能关闭,否则功能开启。

notify-keyspace-events的参数可以是以下字符的任意组合,notify-keyspace-events指定了服务器该发送哪些类型的通知:

说明:输⼊的参数中⾄少要有⼀个K或者E,否则不会有任何通知被分发。如果要订阅键空间中和列表相关的通知, 那么参数就应该设为Kl。如果要开启键空间中回收事件的通知,那么参数就应该设为Ee:config set notify-keyspace-events Ee

(5)Redis回收事件订阅者

一.接⼝说明

订阅Redis所有数据库的回收事件。

二.代码实现

//com.demo.eshop.monitor.redis.config.RedisListenerConfig#container
@Configuration
public class RedisListenerConfig {
    @Bean
    @Primary
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        //订阅淘汰通知,*表示接收所有数据库淘汰key的通知
        container.addMessageListener(listenerAdapter, new PatternTopic("__keyevent@*__:evicted"));
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter();
    }
}

订阅者具体实现如下:

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
    @Autowired
    private DefaultProducer defaultProducer;

    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    //针对Redis数据回收事件,进行数据处理
    @Override
    public void onMessage(Message message, byte[] pattern) {
        //回收的key
        String evictedKey = message.toString();
        //配对模式
        String patternString = new String(pattern, Charsets.UTF_8);
        if (StringUtils.isEmpty(evictedKey)) {
            log.warn("收到evicted事件, 但是key值不正确, key: {}, pattern: {}", evictedKey, patternString);
            return;
        }
        log.info("收到evicted事件, key: {}", evictedKey);

        String[] split = evictedKey.split(":");
        String tags = "";
        if (split.length == 0) {
            tags = evictedKey;
        } else {
            tags = split[0];
        }

        //假设被淘汰的key是goodsInfo:600000011
        //那么消息的tags是goodsInfo
        //消息的key就是goodsInfo:600000011
        defaultProducer.sendMessage(RocketMqConstant.EVICTED_MONITOR_KEY_TOPIC, evictedKey, tags, evictedKey, "evicted");
        log.info("发送消息成功, topic: {}, tags: {}, key: {}, msg: {}", RocketMqConstant.EVICTED_MONITOR_KEY_TOPIC, tags, evictedKey, evictedKey);
    }
}

(6)Redis回收事件消费者

一.接⼝说明

模拟商品服务中,消费tags中包含goodsInfo的消息的消费者。

二.代码实现

//com.demo.eshop.monitor.mq.consumer.listener.CookbookEvictedKeyMonitorListener#receiveEvictedKeyMonitorConsumer
@Component
public class CookbookEvictedKeyMonitorListener implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for (MessageExt messageExt : list) {
                String msg = new String(messageExt.getBody());
                log.info("消费到evicted消息, tags: {}, keys: {}, msg: {}", messageExt.getTags(), messageExt.getKeys(), msg);
                //消费tags中包含goodsInfo的消息,然后按照自己的业务去处理逻辑
                log.info("商品服务进行对淘汰key的业务处理...");
            }
        } catch (Exception e) {
            //本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

5.Redis集群缓存雪崩自动探测 + 限流降级方案

(1)常见的系统限流算法

(2)限流降级背景

(3)通过hotkey实现自动探测缓存雪崩

(4)探测缓存雪崩后的RateLimiter限流降级方案

(5)限流降级后操作数据库时也要通过本地缓存降级

(6)缓存雪崩解决方案总结

(1)常见的系统限流算法

一.计数器算法

计数器算法是限流算法⾥最简单也是最容易实现的⼀种算法。

⽐如规定1分钟访问某接口的次数不能超过100个,那么可以这么做:在⼀开始时设置⼀个计数器counter,每当⼀个请求过来counter就加1。如果该请求与第⼀个请求的时间间隔在1分钟之内,就判断counter值。如果counter的值⼤于100,则说明请求数过多,需要触发限流。如果该请求与第⼀个请求⼤于1分钟,则重置counter + 重置第一个请求。

但是这样的计数器算法有⼀个致命的问题,那就是临界问题。⽐如在0:59时,瞬间有100个请求,且在1:00时⼜来了100个请求。那么其实在这1秒内,是有200个请求的。我们规定的1分钟内最多100个请求,就是平均1秒内不超过2个请求,但是在时间窗⼝的重置节点处突发请求,就会瞬间超过限制速率。

为了解决这个问题,便引⼊了滑动窗⼝算法。

二.滑动窗口算法

在下图中,整个虚线框表示的矩形框就是⼀个时间窗⼝(1分钟)。然后将这个时间窗⼝进⾏划分成6格,所以每格代表的是10秒钟。每过10秒钟,这个时间窗⼝就会往右滑动⼀格。每个小格⼦都有⾃⼰独⽴的计数器counter。⽐如当⼀个请求在0:35秒时到达,那么0:30~0:39对应的counter就会加1。

那么滑动窗⼝怎么解决刚才的临界问题的呢?可以看上图:0:59到达的100个请求会落在左起第6个格⼦中,1:00到达的100个请求会落在第7个格⼦中。那么当时间到达1:00时,由于时间窗⼝会往右滑动⼀格,所以此时时间窗⼝内的总请求数量⼀共是200个,超过了限定的100个,因此此时能够检测出来触发限流。

其实计数器算法就是只有一格的滑动窗⼝算法,只是它没有对时间窗⼝做进⼀步地划分,所以只有1格。由此可⻅,当滑动窗⼝的格⼦划分的越多,限流的统计就会越精确。

三.令牌桶算法

令牌桶算法是⽐较常⻅的限流算法,⼤概描述如下:

说明一:所有的请求在处理前都需要拿到⼀个可⽤的令牌才会被处理

说明二:根据限流⼤⼩,设置按照⼀定的速率往桶⾥添加令牌

说明三:桶设置最⼤的令牌限制,当桶满时、新添加的令牌就会被丢弃或拒绝

说明四:请求达到后⾸先要获取令牌桶中的令牌,拿着令牌才可以进⾏业务逻辑,处理完业务逻辑会将令牌直接删除

说明五:令牌桶有最低限额,当桶中的令牌达到最低限额时,请求处理完之后将不会删除令牌,以此保证⾜够的限流

四.漏桶算法

漏桶算法其实很简单,可以粗略的认为就是注⽔漏⽔过程。往桶中以⼀定速率流出⽔,以任意速率流⼊⽔。当⽔超过桶流量则丢弃,因为桶容量是不变的,保证了整体的速率。

(2)限流降级背景

如果线上⽣产环境Redis集群崩溃了,只有数据库可以访问。此时所有的请求都会打到数据库,这样会导致整个系统都崩溃。

所以系统需要自动识别缓存是否崩溃:如果缓存崩溃了,则应该马上进⾏限流,对数据库进⾏保护和防护,让数据库不要崩溃掉。同时启动接⼝的降级机制,让少部分⽤户还能够继续使⽤系统。

降级就是大量的请求没法请求到MySQL,只能通过JVM本地缓存,提供有限的一些数据缓存默认值。

缓存雪崩和缓存惊群的区别:缓存雪崩是Redis集群崩溃了,无法去读缓存了。缓存惊群是大量缓存集中过期,还可以去读缓存。

(3)通过hotkey实现自动探测缓存雪崩

为了防⽌Redis崩溃后,系统⽆法正常运转,所以需要做降级处理。

一.通过AOP切面统计Redis连接异常

由于系统操作Redis的所有⽅法都是通过RedisCache和RedisLock来处理的,RedisCache提供对Redis缓存key的操作,RedisLock提供分布式锁操作。所以可以通过AOP切⾯的⽅式,对这两个类中的所有⽅法做⼀个切⾯。

当这两个类的方法在执⾏Redis操作时,发现Redis挂了就会抛出异常。所以在切⾯处理⽅法上,如果捕捉到异常就进行记录一下。

二.通过JdHotKey来实现自动识别缓存故障

如果捕捉到Redis连接失败,那么就先不抛出异常,而是返回一个空值,继续用数据库提供服务,避免出现整个服务异常。

如果一分钟之内或者30秒之内出现了几次Redis连接失败,此时再设置一个hotkey表示Redis连接不上,并指定过期时间是1分钟左右。那么下次获取缓存时,就可以先根据hotkey来判断Redis是否异常。hotkey在1分钟之后会删除key,下次有请求过来会去看Redis能否连接,这样就可以简单实现Redis挂掉后直接查数据库的降级机制了。

如何判断Redis掉了还是暂时⽹络波动?

可以在hotkey中配置规则,如30秒内出现了3次Redis连接失败,那么就认为Redis是挂掉了,在本地缓存中设置一个hotkey。

如何⾃动恢复呢?

可以设置hotkey的过期时间是60秒,缓存过期会重新尝试去操作Redis。如果此时Redis恢复了,那么由于hotkey已经失效了,所以就可以正常使⽤回Redis功能。如果Redis还是没有恢复,那么继续往本地缓存中的hotkey设置数据。

以上就实现了⾃动识别缓存故障。当识别出缓存故障后,需要再操作Redis时,就可以直接返回null或者返回false。从而实现在缓存故障时,绕过缓存,继续⽤数据库提供服务。

具体代码如下:

@Aspect
@Slf4j
@Component
public class RedisAspect {
    //切入点,RedisCache的所有方法
    @Pointcut("execution(* com.demo.eshop.common.redis.RedisCache.*(..))")
    public void redisCachePointcut() {
    }

    //切入点,RedisLock的所有方法
    @Pointcut("execution(* com.demo.eshop.common.redis.RedisLock.*(..))")
    public void redisLockPointcut() {
    }

    //环绕通知,在方法执行前后
    @Around("redisCachePointcut() || redisLockPointcut()")
    public Object around(ProceedingJoinPoint point) {
        //签名信息
        Signature signature = point.getSignature();
        //强转为方法信息
        MethodSignature methodSignature = (MethodSignature) signature;
        //参数名称
        String[] parameterNames = methodSignature.getParameterNames();
        //执行的对象
        Object target = point.getTarget();

        log.debug("处理方法:{}.{}", target.getClass().getName(), methodSignature.getMethod().getName());
        Object[] parameterValues = point.getArgs();

        //查看入参
        log.debug("参数名:{},参数值:{}", JSONObject.toJSONString(parameterNames), JSONObject.toJSONString(parameterValues));

        Class returnType = methodSignature.getReturnType();

        //返回类型是否布尔类型
        boolean booleanType = boolean.class.equals(returnType) || Boolean.class.equals(returnType);
        try {
            if (Objects.nonNull(JdHotKeyStore.get(REDIS_CONNECTION_FAILED))) {
                //值不为空表示Redis连接失败,这里就不再继续请求Redis了,直接返回false或者null
                log.error("获取缓存失败,redis连接失败,直接返回false或者null");
                if (booleanType) {
                    return false;
                }
                return null;
            }
            return point.proceed();
        } catch (Throwable throwable) {
            log.error("执行方法:{}失败,异常信息:{}", methodSignature.getMethod().getName(), throwable);

            //Redis连接失败,不抛异常,而是返回空值,继续用数据库提供服务,避免整个服务异常
            //如果一分钟之内或者30秒之内出现了几次Redis连接失败,此时可以设置一个key,告诉JdHotKey此时Redis连接不上了,并指定这个key的过期时间是1分钟左右
            //那么下次获取缓存时,先根据JdHotKey来判断Redis是否异常
            //JdHotKey在1分钟之后,会删除key,下次再有Redis请求过来,重新去看Redis能否连接
            //这样就可以简单的实现Redis挂掉后直接查数据库的降级
            if (JdHotKeyStore.isHotKey(REDIS_CONNECTION_FAILED)) {
                JdHotKeyStore.smartSet(REDIS_CONNECTION_FAILED, CacheSupport.EMPTY_CACHE);
            }

            //让后续操作继续,判断返回类型是Boolean则返回false,其他类型返回null
            log.error("缓存操作失败,直接返回 false 或者 null");
            if (booleanType) {
                return false;
            }
            return null;
        }
    }
}

hotkey中规则配置如下:

[{
    "duration": 60,
    "interval": 3,
    "key": "redis_connection_failed",
    "prefix": false,
    "threshold": 30,
    "desc": "redis连接失败"
}]

(4)探测缓存雪崩后的RateLimiter限流降级方案

一.首先添加拦截器对接口进行拦截

二.然后创建Limiter配置类LimiterProperties

三.最后在LimiterInterceptor拦截器中判断Redis连接是否正常

RateLimiter是Guava的⼀个限流组件,我们的系统就有⽤到这个限流组件。RateLimiter它是基于令牌桶算法的,API⾮常简单。

系统限流分为两种⽅式:Redis正常情况下各个接⼝的限流,Redis异常情况下各个接口的限流。这⾥我们在拦截器中进行实现。

一.首先添加拦截器对接口进行拦截

@Configuration
public class LimiterConfig implements WebMvcConfigurer {
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        InterceptorRegistration registration = registry.addInterceptor(getLimiterInterceptor());
        registration.addPathPatterns("/api/**");
    }

    @Bean
    public HandlerInterceptor getLimiterInterceptor() {
        return new LimiterInterceptor();
    }
}

二.然后创建Limiter配置类LimiterProperties

LimiterProperties类⾸先会尝试读取limiter.properties中的配置,如果这⾥没有读取到,则使⽤application.yml中的配置信息。

如果后续需要更改限流配置,则通过application.yml文件中配置的limiter.propertiesPath的⽂件地址,将配置⽂件放在propertiesPath对应的⽂件中,通过limiter配置类LimiterProperties的reloadLimiterProperty()⽅法重新加载配置⽂件,即可实现限流的动态配置。

整体流程如下:

项⽬启动时,会执⾏initLimiter()⽅法先读取limiter.properties中的配置。如果该配置⽂件中有单独配置某个接⼝的限流,则使⽤单独配置的。如果没有,则使⽤defaultLimiter默认的配置。如果没有读取到配置⽂件,则使⽤application.yml中的配置。

项⽬运⾏时,如果需要动态配置,就将配置⽂件放在application.yml中配置的limiter.propertiesPath路径下,接口调⽤limiter配置类的reloadLimiterProperty()⽅法重新加载配置文件。

@Component
public class LimiterProperties {
    //如果需要替换配置文件信息,将配置文件放在这里就可以
    @Value("${limiter.propertiesPath}")
    private String limiterPropertiesPath;

    //初始化时使用的配置信息
    @Value(value = "classpath:/limiter.properties")
    private Resource limiterProperties;

    //默认的Redis宕机之后的限流器
    @Value("${limiter.noRedisLimiter}")
    private Double noRedisLimiter;

    //默认的限流器
    @Value("${limiter.defaultLimiter}")
    private Double defaultLimiter;

    //限流器配置信息
    private Properties properties;

    //各个接口的限流,每个接口限流都可以单独配置
    private Map<String, RateLimiter> apiRateLimiterMap = null;

    //Redis挂掉之后限流
    private RateLimiter noRedisRateLimiter = null;

    @PostConstruct
    private void initLimiter() {
        //初始化限流配置
        properties = new Properties();
        try {
            properties.load(limiterProperties.getInputStream());
        } catch (IOException e) {
            log.error("读取配置文件失败", e);
        }
        this.reloadLimiter();
    }

    private void reloadLimiter() {
        //加载没有Redis时的限流器
        //优先读取限流配置文件中的配置,如果没有则使用项目配置文件中的配置
        String noRedisLimiterProperty = properties.getProperty(CookbookConstants.NO_REDIS_LIMITER);
        noRedisRateLimiter = RateLimiter.create(StringUtils.isEmpty(noRedisLimiterProperty) ? noRedisLimiter : Double.valueOf(noRedisLimiterProperty));

        //加载配置文件中配置的接口对应的限流器
        apiRateLimiterMap = new HashMap<>(16);

        //优先从配置文件中读取默认配置信息
        String defaultLimiterProperty = properties.getProperty(CookbookConstants.DEFAULT_LIMITER);

        //默认限流QPS
        Double defaultProperty;
        if (StringUtils.isEmpty(defaultLimiterProperty)) {
            defaultProperty = defaultLimiter;
        } else {
            defaultProperty = Double.valueOf(defaultLimiterProperty);
        }

        Set<String> propertyNames = properties.stringPropertyNames();
        for (String propertyName : propertyNames) {
            String property = properties.getProperty(propertyName);
            apiRateLimiterMap.put(propertyName, RateLimiter.create(StringUtils.isEmpty(property) ? defaultProperty : Double.valueOf(property)));
        }
    }

    //用于配置文件变更后,使配置信息重新生效
    //可以通过定时任务来触发,也可以通过接口来触发
    public void reloadLimiterProperty() {
        //保存旧的配置信息,如果重新加载配置文件失败,可以用来还原
        Properties newProperties = new Properties();
        try {
            Resource resource = new PathResource(limiterPropertiesPath);
            newProperties.load(resource.getInputStream());
            //配置文件加载成功后生效
            properties = newProperties;
            this.reloadLimiter();
        } catch (IOException e) {
            log.error("读取配置文件失败", e);
        }
    }
}

application.yml配置⽂件中的配置:

# 限流相关配置
limiter:
    # 动态加载时,各个接⼝限流配置存放路径
    propertiesPath: /app/propertie/limiter.properties
    # redis宕机之后限流QPS
    noRedisLimiter: 20
    # 默认的限流QPS
    defaultLimiter: 200

limiter.properties中的配置信息:

# redis宕机之后QPS限流
noRedisLimiter=20
# 默认的限流QPS
defaultLimiter=200
# 接⼝ /api/cookbook/info/{cookbookId} 的QPS限流
/api/cookbook/info/{cookbookId}=500

三.最后在LimiterInterceptor拦截器中判断Redis连接是否正常

如果Redis连接失败,则通过limiterProperties的getNoRedisRateLimiter()方法的tryAcquire()方法去尝试获取令牌。如果获取不到令牌,那么就返回系统繁忙的提示。

如果Redis连接正常,则先获取到当前请求对应的requestMapping,通过limiterProperties的getApiRateLimiterMap()方法的get()方法获取对应接⼝限流器。如果不存在,则创建该接⼝的限流器,并put到map中。

public class LimiterInterceptor implements HandlerInterceptor {
    @Autowired
    private LimiterProperties limiterProperties;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        if (Objects.nonNull(JdHotKeyStore.get(REDIS_CONNECTION_FAILED))) {
            if (!limiterProperties.getNoRedisRateLimiter().tryAcquire()) {
                log.warn("Redis连接失败,全局限流");
                ServletUtil.writeJsonMessage(response, JsonResult.buildError("系统繁忙,请稍后重试"));
                return false;
            }
        } else {
            //获取请求的requestMapping
            String requestMapping = getRequestMapping((HandlerMethod) handler);
            RateLimiter rateLimiter = limiterProperties.getApiRateLimiterMap().get(requestMapping);
            if (rateLimiter == null) {
                String property = limiterProperties.getProperties().getProperty(requestMapping);
                rateLimiter = RateLimiter.create(StringUtils.isEmpty(property) ? limiterProperties.getDefaultLimiter() : Double.valueOf(property));
                limiterProperties.getApiRateLimiterMap().put(requestMapping, rateLimiter);
            }
            if (!rateLimiter.tryAcquire()) {
                log.warn("全局限流");
                ServletUtil.writeJsonMessage(response, JsonResult.buildError("系统繁忙,请稍后重试"));
                return false;
            }
        }
        return true;
    }
    ...
}

(5)限流降级后操作数据库时也要通过本地缓存降级

以商品详情接⼝为例:⾸先通过redisCache.getCache()查询hotkey内存数据和Redis缓存数据。由于Redis连接失败时会返回null,于是就会从数据库中获取数据。

接下来看getCookbookFromDB()⽅法的上半部分的降级处理部分:此时会判断JdHotKeyStore.get(REDIS_CONNECTION_FAILED)是否存在。如果存在则表示Redis连接失败,需要做降级处理。

降级处理时会通过caffeineCache.getIfPresent()获取降级后的本地缓存数据,如果缓存中没有数据,那么就通过获取锁去查询数据库,然后将数据库的查询结果存放在caffeineCache中。

@Service
public class CookbookServiceImpl implements CookbookService {
    @Autowired
    private RedisCache redisCache;

    @Autowired
    private RedisLock redisLock;

    @Autowired
    private Cache<String, Object> caffeineCache;

    private Lock lock = new ReentrantLock();
    ...
    
    @Override
    public CookbookDTO getCookbookInfo(CookbookQueryRequest request) {
        Long cookbookId = request.getCookbookId();
        String cookbookKey = RedisKeyConstants.COOKBOOK_PREFIX + cookbookId;
        //从JdHotKey内存或者Redis缓存中获取数据
        Object cookbookValue = redisCache.getCache(cookbookKey);

        if (Objects.equals(CacheSupport.EMPTY_CACHE, cookbookValue)) {
            //如果是空缓存,则要防止缓存穿透,直接返回null
            return null;
        } else if (cookbookValue instanceof CookbookDTO) {
            //如果是对象,则表示是从内存中获取到的数据,可以直接返回
            return (CookbookDTO) cookbookValue;
        } else if (cookbookValue instanceof String) {
            //如果是字符串,则表示是从Redis缓存中获取到的数据,需要转换成对象之后才能返回
            redisCache.expire(cookbookKey, CacheSupport.generateCacheExpireSecond());
            CookbookDTO dto = JsonUtil.json2Object((String) cookbookValue, CookbookDTO.class);
            return dto;
        }

        //未能在内存和缓存中获取到值,那么就从数据库中获取
        return getCookbookFromDB(cookbookId);
    }
    
    private CookbookDTO getCookbookFromDB(Long cookbookId) {
        String cookbookLockKey = RedisKeyConstants.COOKBOOK_LOCK_PREFIX + cookbookId;
        String cookbookKey = RedisKeyConstants.COOKBOOK_PREFIX + cookbookId;
        //如果发现Redis连接失败,那么降级为先从本地缓存尝试获取 -> 再从本地获取锁后查询DB获取
        if (Objects.nonNull(JdHotKeyStore.get(REDIS_CONNECTION_FAILED))) {
            //先查看本地缓存是否有数据,有就返回,没有则需要获取本地锁,然后查询DB
            Object value = caffeineCache.getIfPresent(cookbookKey);
            log.warn("Redis连接失败,需要降级处理,此时先从本地缓存中尝试获取数据,key {},value {}", cookbookKey, value);
            if (value != null) {
                if (CacheSupport.EMPTY_CACHE.equals(value)) {
                    return null;
                }
                return (CookbookDTO) value;
            }
            //如果本地缓存中没有数据,则需要查询数据库,并将结果放至数据库
            if (lock.tryLock()) {
                try {
                    log.info("缓存数据为空,从数据库中获取数据,cookbookId:{}", cookbookId);
                    CookbookDTO dto = cookbookDAO.getCookbookInfoById(cookbookId);
                    caffeineCache.put(cookbookKey, Objects.isNull(dto) ? CacheSupport.EMPTY_CACHE : dto);
                    return dto;
                } finally {
                    lock.unlock();
                }
            }
            //如果没能拿到锁,直接抛异常返回
            throw new BaseBizException("系统繁忙,请稍后再试");
        }

        //以上是降级流程,这里是正常流程,下面使用Redisson获取分布式锁
        boolean lock = redisLock.lock(cookbookLockKey);

        if (!lock) {
            log.info("缓存数据为空,从数据库查询分享贴信息时获取分布式锁失败,cookbookId:{}", cookbookId);
            throw new BaseBizException("查询失败");
        }
        try {
            log.info("缓存数据为空,从数据库中获取数据,cookbookId:{}", cookbookId);
            CookbookDTO dto = cookbookDAO.getCookbookInfoById(cookbookId);
            if (Objects.isNull(dto)) {
                redisCache.setCache(cookbookKey, CacheSupport.EMPTY_CACHE, CacheSupport.generateCachePenetrationExpireSecond());
                return null;
            }

            redisCache.setCache(cookbookKey, dto, CacheSupport.generateCacheExpireSecond());
            return dto;
        } finally {
            redisLock.unlock(cookbookLockKey);
        }
    }
    ...
}

⾄此整个详情页的接⼝降级处理就处理完了。

(6)缓存雪崩解决方案总结

一.首先通过AOP切面 + JdHotKey实现自动化探测Redis故障,也就是判断出Redis是否故障,并设置hotkey标记Redis已经故障

二.根据hotkey + RateLimiter令牌桶,可以对接口进行故障时的限流

三.限流降级后操作数据库时也需要通过CaffeineCache本地缓存进行降级

6.缓存击穿的解决方法

(1)什么样的数据适合缓存

(2)什么是缓存击穿

(3)缓存击穿的解决方法一后台刷新

(4)缓存击穿的解决方法二检查更新

(5)缓存击穿的解决方法三分级缓存

(6)缓存击穿的解决方法四加锁

(1)什么样的数据适合缓存

要从访问频率、读写比例、数据一致性去分析一个数据是否适合缓存。

(2)什么是缓存击穿

在高并发下,多线程同时查询同一个资源。如果缓存中没有这个资源,那么这些线程都会去数据库查找。这样就会对数据库造成极大压力,缓存失去存在的意义。所以缓存击穿就是缓存中没有但数据库中有 + 并发查,而缓存穿透就是缓存中没有 + 数据库中也没有。

(3)缓存击穿的解决方法一后台刷新

后台定义一个定时任务专门主动更新缓存数据。比如,一个缓存中的数据过期时间是30分钟,那么定时任务每隔29分钟定时刷新数据。

这种方案比较容易理解,但会增加系统复杂度。比较适合那些key相对固定,缓存粒度较大的业务。如果key比较分散的则不太适合,实现起来也比较复杂。

(4)缓存击穿的解决方法二检查更新

首先将缓存key的过期时间和value一起保存到缓存中,可以拼接、可以添加新字段、可以采用单独的key保存,总之就是两者要建立好关联关系。

然后在每次执行get操作后,都将其过期时间与当前系统时间做对比。如果缓存过期时间 - 当前系统时间 <= 1分钟,则主动更新缓存。这样就能保证缓存中的数据始终是最新的。

这种方案在特殊情况下也会有问题。比如缓存过期了都没能等到get操作,然后又突然并发访问。

(5)缓存击穿的解决方法三分级缓存

采用一级缓存L1和二级缓存L2,L1缓存失效时间短,L2缓存失效时间长。

请求优先从L1缓存获取数据,如果L1缓存未命中则加锁。只有1个线程获取到锁,这个线程会读库 + 更新L1缓存 + 更新L2缓存,其他线程则依旧从L2缓存获取数据并返回。

这种方式,主要是通过避免缓存同时失效并结合锁机制实现。所以当更新数据时,只能删除L1缓存,不能将L1和L2中的缓存同时删除。L2缓存中可能会存在脏数据,需要能够容忍这种短时间的不一致,而且这种方案可能会造成额外的缓存空间浪费。

(6)缓存击穿的解决方法四加锁

一.读缓存使用synchronized加重量级锁

这种方式确实能够防止缓存失效时高并发到数据库,但缓存没失效时,从缓存中拿数据需要排队获取锁,这会大大的降低系统的吞吐量和用户体验。

public synchronized List<String> getData01() {
    List<String> result = new ArrayList<String>();
    //从缓存读取数据
    result = gerDataFromCache();
    if (result.isEmpty()) {
        //从数据库查询数据
        result = getDataFromDB();
        //将查询的数据写入缓存
        setDataToCache(result);    
    }
    return result;
}

二.查数据库使用synchronized加重量级锁

这个方法在缓存命中时,系统的吞吐量不会受到影响。但当缓存失效时,请求还是会打到数据库上,只不过不是高并发而是阻塞而已。但是,这样还是会造成用户体验不佳。

static Object lock = new Object();
public List<String> getData02() {
    List<String> result = new ArrayList<String>();
    //从缓存读取数据
    result = getDataFromCache();
    if (result.isEmpty()) {
        synchronized(lock) {
            //从数据库查询数据
            result = getDataFromDB();
            //将查询到的数据写入缓存
            setDataToCache(result);
        } 
    }
    return result;
}

三.synchronized加重量级锁 + 双重判断

双重判断虽然能阻止高并发请求打到数据库,但第二个及之后的请求在命中缓存时,还是排队进行的。

比如有30个请求一起并发过来,在双重判断时:第一个请求去数据库查询并更新缓存数据,剩下的29个请求则还是依次排队去缓存中取数据的。

public List<String> getData03() {
    List<String> result = new ArrayList<String>();
    //从缓存读取数据
    result = getDataFromCache();
    if (result.isEmpty()) {
        synchronized(lock) {
            //双重判断,第二个以及之后的请求不必去找数据库了,直接命中缓存
            //从缓存读取数据
            result = getDataFromCache();
            if (result.isEmpty()) {
                //从数据库查询数据
                result = getDataFromDB();
                //将查询到的数据写入缓存
                setDataToCache(result);
            }
        }
    }
    return result;
}

四.互斥锁 + 重试 + 双重检查可避免前面几种问题

static Lock lock = new ReentrantLock();
public List<String> getData04() throws InterruptedException {
    List<String> result = new ArrayList<String>();
    //从缓存读取数据
    result = getDataFromCache();
    if (result.isEmpty()) {
        if (lock.tryLock()) {
            try {
                System.out.println("拿到了锁,从DB读取数据后写入缓存");
                //从数据库查询数据
                result = getDataFromDB();
                //将查询到的数据写入缓存
                setDataToCache(result);
            } finally {
                //释放锁
                lock.unlock();
            }
        } else {
            //先查一下缓存
            result = getDataFromCache();
            if (result.isEmpty()) {
                System.out.println("没有拿到锁,再次读缓存也没读到数据,先睡眠一下");
                Thread.sleep(100);
                return getData04();//递归重试
            }
        }
    }

    return result;
}

#牛客创作赏金赛#
后端技术栈的基础修养 文章被收录于专栏

详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等

全部评论

相关推荐

评论
2
3
分享
牛客网
牛客企业服务