[Day24] Redis 八股文 [2/2]
Redis 八股文2
目录
[toc]
reids 单线程和多线程
Redis 是单线程还多线程?
这要从不同角度和不同版本分析了;我们平常所说 Redis 是单线程是指 Redis 执行核心指令操作的方式是通过串行的方式单线程执行的;在 Redis6 之前是纯单线程模型,但是随着其请求量的增大和硬件平台的升级 redis 也开始逐步引入了多线程,这些多线程主要用于处理网络 IO、数据备份和集群支持等方面,但是核心指令执行依然是单线程;
单线程有哪些好处
单线程的好处:
- 没有线程切换的开销对 CPU 友好
- 没有并发操作不用担心数据并发安全
- 无锁的竞争效率更高
单线程好处这么多为什么需要引入多线程
随着 CPU 多核化还有整体硬件水平的提升,如果依然使用单线程的方式处理那么硬件系统的利用率就会很低,最明显的就是网络 IO 会变成 Redis 的瓶颈,因为 Redis 是内存操作并且数据结构的设计优秀所以瓶颈不会在内存和 CPU,但是高并发的场景下如果一个请求的处理时间过长或者执行了 AOF 刷盘或则 RDB 快照那么就会将串行等待时间变长导致性能变低;
为什么 Redis 设计成单线程也能这么快?
从硬件层面来说:Redis 是工作在内存上的读写操作很快对 CPU 友好; 从数据结构层面来说:Redis 为了追求高性能底层的数据结构都优秀比如 skipList、listpack 都是在有限的内存上做到了极致的性能; 从线程层面上来说:Redis 在核心指令执行是单线程的,这样不会有锁的竞争、不会有上下文的切换也不用考虑并发安全问题;Redis 的网络 IO、数据备份和集群支持等方面采用的是多线程,尤其是在大数据量的请求下 epoll 方式的网络 io 多路复用大大提升了整体的性能;
BigKey
你如何在生产中限制 keys */ flushdb/ flushall
这些危险命令
keys *
会进行全表扫描复杂度 O(n),flushdb 和 flushall 都是危险的删除操作;在生产中应该禁用,可以在 redis 配置文件中将命令进行替换,替换成空串就和禁用效果一样;
Redis 海量数据如何查询固定前缀的 key
生成环境中不能使用
keys *
这样的命令而应该采用更安全的非阻塞的方式遍历 key;scan <cursor> <match> <count>
每次遍历的时候都要指定游标的指,起始为 0,如果返回结果游标也为 0 那么就说明遍历完成了;返回结果分为一个游标和结果集两部分;上一次的返回游标作为下一次 scan 的游标;
SCAN 0 MATCH user:* COUNT 1000
BigKey 问题,多大算大?你是如何发现?如何删除?如何解决?如何避免的?
String 类型不超过 10 kb, hash, set, list 元素不超过 5000 个;是阿里巴巴定义的开发规范; 对于 bigkey 只有 string 类型可以使用 del 删除,其他的都应该配合 scan 进行渐进式删除避免阻塞
bigkey 的危害:
- 内存不均匀,集群数据同步迁移困难
- 超时删除,bigkey 会是瓶颈
- 网络传输流量阻塞
bigkey 的产生:
- 设计时候的缺陷
- 突发的数据剧增,比如某个爆火博主的关注列表
如何发现 bigkey:
redis cli --bigkeys
:redis-cli -h 127.0.0.1 -p 6379 -a <password> -bigkeys
memory usage
: memory usage userid: 2009如何删除 bigkey:
- 只有 String 类型可以使用 del 删除或则用 unlink,其他集合类型应该配合 scan 进行渐进式删除
public class RedisBigKeyDeleter {
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6379;
public void deleteBigSet(String keyName, int batchSize, double sleepTime) {
try (Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT)) {
String cursor = "0";
long totalDeleted = 0;
while (!cursor.equals("0")) {
// 分批次扫描集合元素
ScanParams scanParams = new ScanParams().match("*").count(batchSize);
ScanResult<String> result = jedis.sscan(keyName, cursor, scanParams);
cursor = result.getCursor();
List<String> elements = result.getResult();
if (!elements.isEmpty()) {
// 删除当前批次的元素
long deleted = jedis.srem(keyName, elements.toArray(new String[0]));
totalDeleted += deleted;
System.out.println("Deleted " + deleted + " elements, total: " + totalDeleted);
// 控制删除速度,避免压力过大
Thread.sleep((long) (sleepTime * 1000));
}
}
// 删除空集合
jedis.del(keyName);
System.out.println("BigKey '" + keyName + "' deleted completely.");
} catch (Exception e) {
System.err.println("Error deleting BigKey: " + e.getMessage());
}
}
public static void main(String[] args) {
RedisBigKeyDeleter deleter = new RedisBigKeyDeleter();
// 用法示例:删除名为 "big_set" 的集合,每批次处理 500 个元素,每次删除后休眠 0.1 秒
deleter.deleteBigSet("big_set", 500, 0.1);
}
}
如何避免 BigKey?
避免将所有的数据存在同一个 key 中,可以采用分片的思想,比如避免将所有用户 id 放到 userIds 这个 key 中,而应该通过对 userid 分片存储到 userIds:{id%100}
选择必要的字段进行缓存而不是将整个对象存储;
为每个 key 设置元素和内存的上限;在插入元素时进行控制
设置合理的过期时间和淘汰机制配合惰性删除;
缓存一致性双写一致性工程落地
双检加锁策略你了解过吗?如何尽量避免缓存击穿?
是指在从 Redis 中查询一个 key 如果 key 不存在时要从 Mysql 中进行查找; 但是为了避免缓存穿透通过单个 key 同时将很多的相同 key 查询打向 Mysql,我们可以通过锁来进行控制,获取锁之前先检查 redis 中是否有 key 的缓存,没有则竞争锁,获取到锁之后再次进行 redis 中 key 查询,两个检查都没有再进行 mysql 的查询,并且对 mysql 的返回结果进行会写或则过滤操作;对于没有获取到锁的请求可以通过等待后再次进行重试;这里的锁应该和业务 key 相关联
String lockKey = key + ":lock";
分布式场景下的话要使用分布式锁
public String getData(String key) {
// 第一次检查:无锁读取缓存
String value = redis.get(key);
if (value == null) {
// 尝试获取分布式锁
String lockKey = key + ":lock";
String lockToken = UUID.randomUUID().toString();
boolean locked = redis.setnx(lockKey, lockToken, 30, TimeUnit.SECONDS);
if (locked) {
try {
// 第二次检查:加锁后再次确认缓存是否已加载
value = redis.get(key);
if (value == null) {
// 从数据库加载数据
value = db.query(key);
if(value == null) {//如果为空那么可以回写空或则将查询key的ip拉入黑名单}
// 写入缓存(设置过期时间)
redis.setex(key, 300, value);
}
} finally {
// 释放锁(需保证原子性,防止误删其他线程的锁)
String currentToken = redis.get(lockKey);
if (lockToken.equals(currentToken)) {
redis.del(lockKey);
}
}
} else {
// 未获取到锁,短暂等待后重试
Thread.sleep(100);
return getData(key); // 递归重试或返回默认值
}
}
return value;
}
什么情况下会出现数据库和缓存不一致的问题?
在对数据进行多级缓存的时候就一定会出现缓存一致性问题;不论是在 CPU 层面的缓存一致性通过 EMSI 实现可见性;还是 JMM 通过读写屏障机制实现工作内存和主内存的数据一致性;只要涉及到了多级缓存我们就必须要保证存储多放的数据一致性问题; 在 Redis 和 Mysql 的层面,线程和线程之间的透明和隔离的,并且无法保证一个线程对 redis 和 mysql 的操作是原子的,也就是在操作 redis 和 mysql 的过程中会被其他的线程进行并发访问操作导致数据的不一致; 比如线程 A 将数据库的值 50修改成了 100,但是还没来得及修改 redis 中的值,此时 B 线程读取到了 redis 中的旧值 50;这样就会导致数据不一致问题;
如何解决一致性问题?
一致性问题的根本原因就是因为 redis 和 mysql 的数据更新操作不是原子性的并且线程和线程之间是隔离的无法感知自己的操作是否会有并发问题; 所以现在的问题就简化成了 redis 和 mysql 应该先操作哪一个,并且如何保证在操作过程中其他线程的并发风险降到最低,并且这保证数据的最终一致性问题;
对于先操作谁的问题,我们一般都规定,当持久化数据库和缓存的数据冲突时以持久化数据库的数据为准,所以我们一定是先修改数据库;
- 那么就很容易想到先删除缓存中的数据,然后更新数据库,这样修改完成之后新的读请求就可以将数据库的值同步到 redis 中;这样只是理想情况,因为在修改线程将 redis 缓存删除修改 Mysql 但是还没有修改完成的过程中,可能有其他的线程查询缓存发现没有数据,从而查询数据库,这样会查询到旧的值,缓存中的值也是旧的;解决这个问题可以通过延迟双删策略,也就是在更新线程更新完成数据库之后延迟一段时间,再次将 redis 中的值进行删除;这样的目的是确保更新线程执行完成后 redis 中不会有旧值的缓存,实现 redis 和 mysql 的最终一致性,过程中其他线程只会读取到没修改之前的数据,影响相对较小;但是这个做法还是存在问题,那就是无法确定应该延迟多久,理论上来说是要悲观的认为我在修改 mysql 的过程中一定有线程重新将旧值读取到了 redis 中,所以要确保第二次删除在这些操作之后,但是具体时间只能根据经验判断;
- 因为删除-更新-再删除的方式有局限性,所以目前主流的解决方案是先更新数据库再删除 redis; 业务修改线程先对数据库进行修改,完成后再将 redis 的数据进行删除;那么过程中其他线程就有可能读取到 redis 中还没被删除和更新的旧值,影响相对较小;那么就只需要思考最后的最终一致性问题,因为修改线程修改完成之后可能会出现 redis 删除失败的情况,主流的做法就是通过监听数据库 binlog 的变化,当数据发生变化时在 redis 中定位到数据涉及的 key,然后对这些 key 进行删除,如果删除成功那么就完成了一致性;如果删除失败那么就将删除操作放入消息队列中,通过消息队列异步通知的方式来保证消息的最终一致性,如果 MQ 中多次删除依然失败那么就可以通过通知机制人为进行介入排查;
延时双删你做过吗?会有哪些问题?
延迟双删是在做 redis 和 mysql 双写一致性时候的一种确保最终一致性的手段;主要流程是当修改线程发现 redis 中有对应的 key 先将 key 删除然后进行数据库数据的修改,修改完成后休眠一段时间再次尝试删除 redis 中的 key;两次删除的目的是避免在第一次删除后,其他线程在修改线程修改 mysql 的过程中讲旧值再次读取到 redis 中; 双写一致性的最大问题就是无法判断修改线程应该延迟多久,这个延迟时间只能通过人为的经验结合实践进行主观的预测,并且在第二次删除的时候也可能发生失败;
你如何保证最终一致性?
我在项目中使用的是旁路缓存的方式通过 canel 监听数据库的 binlog 日志,将数据库的所有更新操作进行监听,然后定位到 redis 中相关联的 key,然后将 key 进行删除,如果 key 删除失败则将删除操作通过 MQ 实现异步重试,如果多次删除依然失败那么可以通过死信队列加通知机制人为进行排查;
介绍一下实现双写一致性的 canel
先回忆一下 mysql 中的主从同步流程;从服务器在认证通过后会会开启一个 IO 线程监听主服务器的 binlog 变化,通过偏移量来判断当前节点和主服务器的同步进度,如果发生了新的 binlog 变动那么从服务器就会发出同步请求,主服务器也会开启一个 dump 线程将数据同步给从节点;从节点将数据存储到 relay log 中,同步阶段就算完成了,然后从节点通过自己的 SQL 线程将 relay log 中的语句在自己数据库中进行再次执行;
canel 的设计是以 mysql 的主从同步做参考的;canel 会用从节点的身份向 mysql 发起请求;然后监听到 binlog 的变化后会通知给不同的消息接收方,相当于一个 MQ 的角色;
如何确保 Redis 主从复制的一致性?
以最简单的主从复制架构为例;一个主服务器有多个从服务器;从节点第一次同步之前会向直接点发起同步请求,主节点进行身份认证后会通过 RDB 快照的方式同步主节点的全量数据;同步完成后就可以实现读写分离,从节点可以为主节点做备份和读取数据的负载均衡;当主节点发生了操作时会通过 AOF 的方式进行增量数据同步给从节点;
案例实战 hyperloglog
解释一下 UV, PV, DAU, MAU
- UV unique visitor:独立访问量,一般以客户端 ip 为单位,需要去重
- PV page view:页面访问量不去重
- DAU,MAU:日活月活
hyperloglog 项目统计
固定 12 kb 大小;记录的是一个去重后集合的元素个数;他只是去重后的数量不记录具体的每个集合元素,并且是一个近似值;
实现思路
- 区分登录用户和匿名访问;登录用户 UserId,匿名访问用 ip+设备信息哈希
- key 的设计:
uv:{yyyyMMdd}
例如 uv:20240331- 核心操作:
pfadd pfcount pfmerge
- 引入 redis 依赖并配置 redis
- service
@Service
public class EnhancedUVStatisticsService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final DateTimeFormatter DATE_FORMATTER =
DateTimeFormatter.ofPattern("yyyyMMdd");
/**
* 多维度UV记录
* @param dimension 维度标识(如"product_detail")
* @param identifier 用户标识
*/
public void recordUV(String dimension, String identifier) {
String key = buildDimensionKey(dimension, LocalDate.now());
redisTemplate.opsForHyperLogLog().add(key, identifier);
}
/**
* 查询维度UV
* @param dimension 维度标识
* @param date 日期
*/
public Long queryDimensionUV(String dimension, LocalDate date) {
return redisTemplate.opsForHyperLogLog()
.size(buildDimensionKey(dimension, date));
}
// 构建维度Key(格式:uv:{dimension}:{date})
private String buildDimensionKey(String dimension, LocalDate date) {
return String.format("uv:%s:%s",
dimension,
DATE_FORMATTER.format(date)
);
}
}
- 用户识别工具类
public class UserIdentifierGenerator {
/**
* 生成去标识化用户ID
* @param request HttpServletRequest对象
* @param userId 登录用户ID(未登录传null)
*/
public static String generateIdentifier(HttpServletRequest request, Long userId) {
if (userId != null) {
return "user:" + userId; // 已登录用户直接使用ID
}
// 未登录用户组合IP+设备信息(SHA256哈希去标识化)
String ip = request.getRemoteAddr();
String userAgent = request.getHeader("User-Agent");
String rawData = ip + userAgent;
return DigestUtils.sha256Hex(rawData);
}
}
对于登录的用户可以从 ThreadLocal 中取出用户的 id 用 userId 作为去重依据;如果是匿名用户如果仅仅用 ip 去重的话可能粒度不够,可以通过 ip+userAgent 的方式进行更细致的去重;
- userAgent 浏览器用户标记当前设备,系统和浏览器版本等信息;然后通过 hash256 进行编码处理
- 通过 AOP 对需要进行 UV 统计的接口做无侵入式增强
自定义注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UVStatistic {
/**
* 统计维度标识
* 示例: "product_detail", "user_center"
*/
String value();
/**
* 是否计入全局统计(默认true)
*/
boolean global() default true;
}
AOP 切面
@Aspect
@Component
public class MultiDimensionUVAspect {
@Autowired
private UVStatisticsService uvStatisticsService;
// 定义切点:所有带有@UVStatistic注解的方法
@Pointcut("@annotation(com.yourpackage.annotation.UVStatistic)")
public void uvStatisticPointcut() {}
@Around("uvStatisticPointcut()")
public Object handleUVStatistic(ProceedingJoinPoint joinPoint) throws Throwable {
// 获取方法上的注解
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
UVStatistic annotation = signature.getMethod().getAnnotation(UVStatistic.class);
// 生成用户标识(复用原有逻辑)
ServletRequestAttributes attributes = (ServletRequestAttributes)
RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
String identifier = UserIdentifierGenerator.generateIdentifier(request, getCurrentUserId(request));
// 记录维度统计
uvStatisticsService.recordUV(annotation.value(), identifier);
// 全局统计(根据配置)
if (annotation.global()) {
uvStatisticsService.recordUV("global", identifier);
}
return joinPoint.proceed();
}
// 获取用户ID的逻辑(需根据实际鉴权系统实现)
private Long getCurrentUserId(HttpServletRequest request) {
// 示例:从JWT或Session中获取
return null;
}
}
controller 增强
@RestController
@RequestMapping("/products")
public class ProductController {
// 商品详情页UV统计(同时计入全局)
@UVStatistic("product_detail")
@GetMapping("/{id}")
public Product getProductDetail(@PathVariable Long id) {
// ...业务逻辑
}
}
@RestController
@RequestMapping("/users")
public class UserController {
// 用户中心UV统计(不重复计入全局)
@UVStatistic(value = "user_center", global = false)
@GetMapping("/profile")
public UserProfile getProfile() {
// ...业务逻辑
}
}
布隆过滤器
什么是布隆过滤器,实现原理是什么?布隆过滤器有什么缺点,如何解决?
布隆过滤器用一句话概括就是在海量数据中快速判断某个原始是否存在;他的底层是通过哈希函数+bitmap 实现的; 布隆过滤器在初始化的时候会将 bitmap 中的所有位置为 0,表示目前没有任何元素;然后插入元素,每个元素都会被多个哈希函数映射到多个不同的位置,将这些位置置为 1,这样就完成了一个元素的插入; 在进行查询的时候,会将查询元素通过同样的方式进行多次映射到多个位置,如果每个映射的位置都是 1 那么就认为这个元素存在集合中,否则认为集合中没有元素; 布隆过滤器的优势就可以用极少的空间就完成数据的存在性判断,但是因为底层是通过 hash'实现的那么就不可避免的会出现 hash 冲突,具体来说就是某个元素被映射到了 a、b 和 c 三个下标,但是并且这个三个下标都为 1,这时候只能说这个元素大概率存在集合中,以为可能这些位置的 1 不是因为当前查询的元素插入置为 1 的而可能是其他元素发生了 hash 冲突;所以如果查询结果是存在那么大概率存在,如果不存在那么就绝对不存在;
这种误判其实是发生概率很小的,我们可以通过加大 bitmap 的容量和加多 hash 的次数,来降低冲突;以为 hash 冲突的本质就是 hash 的集合是无限大的但是输出集合是有限的所以必然会冲突,但是如果 hash 次数过多那么布隆过滤器的高效优势就会降低;所以如果对查询的结果可以容忍一定的误判,并且对空间和性能要求高那么可以使用布隆过滤器;如果无法接受误判那么只能通过 B+树红黑树来进行 logN 级别的查询;
布隆过滤器还有个问题就是不能删除元素,本质还是以为 hash 冲突的原因;如果要删除一个元素,我们可以将元素进行多次 hash 找到所有对于的索引然后将这个些索引值置为 0,但是因为冲突的可能,我们无法判断当前位置上的 1 是被多少个元素映射后的结果,以为 bitmap 每个位置只能保存 01 不能保存被映射的次数;
缓存预热、雪崩、击穿、穿透
缓存预热你是怎么做的?
缓存预热一般在服务启动时,容器初始化完成之后通过钩子执行,比如
@PostConstruct afterPropertiesSet init_method
这些方法;如果希望的是在程序运行过程中预热那么可以通过定时任务完成;
什么是缓存穿透,击穿和雪崩?分别如何解决?
缓存穿透
缓存穿透是指,大量的 redis 查询都没有命中,导致很多的查询都打到了 mysql 上,redis 的命中率很低全部穿透到了 mysql;出现这种问题我认为有一下三种可能;第一种就是对于热点数据我们没有进行预热;第二种可能是对于某些热点查询数据库中没有对应数据,redis 中也没有记录导致一直被重复查询;还有可能是有攻击者恶意的进行类似撞库的扫描查询,比如他会一直查询 id 为 1 的商品,id 为 2 的商品...;
- 对于没有预热的缓存,我们可以通过启动容器后通过 Bean 生命周期的钩子方法进行数据预热;
- 对于没有数据库中没有数据的热点查询可以将查询结果置为 null,这样一来同一个请求就不会重复打到 mysq;
- 对于疑似攻击者的恶意请求我们可以通过限流等方式进行限制;
- 还有一个通用性很高的解决方案就是通过布隆过滤器;他可以高效的查询一个数据是否在集合中;但是他有小概率会误判,如果返回数据存在那么有小概率不存在被误判了,如果不存在那么就一定不存在;其实就算不存在的数据误判了,那么在 mysql 查询的时候也会返回空,所以影响不大;布隆过滤器可以大大降低 mysql 的压力
缓存击穿
缓存击穿是指某个十分热点的 key 过期了,同时有很多的查询针对这个 key,那么统一时间就会有很多对这个 key 的相同查询打到 mysql;通过单点 key 大量请求打向 mysql 就是缓存击穿;
解决缓存击穿主要就是通过过期时间和查询两个环节做预防;针对过期时间,我们可以将十分热点的 key 设为永不过期,或则在即将过期之前通过异步定时更新有效期的方式;如果一个热点 key 过期了为了避免同时大量的请求同时请求数据库,我们可以通过加互斥锁的双锁检查机制实现,如果 redis 中没有命中数据,那么就会竞争锁,只有持有锁的线程才有权限在 mysql 中查询 key,在查询 mysql 之前会再次检查 redis 中是否有 key 对应的数据,如果没有再进行查询, 其他竞争失败的线程进行一段时间的休眠等待然后再次尝试查询和竞争锁直到获取到数据;
缓存雪崩
指的时同一时间大量不同 key 的请求打到了 mysql 上,导致 mysql 压力剧增;出现这种问题一般有两个层面的可能,一方面就是物理机构层面,在一个时刻一个或多个 redis 节点崩溃了,导致缓存失效只能直接查询 mysql;另一方面可能是在设计层面,比如大量的 key 的过期时间很集中在某个点,就会导致一个时刻大面积的 key 同时过期失效,如果这个时候针对这些 key 的查询并发量也很大那么就会导致雪崩; 根据两种不同的原因有一下的预防方法,对于架构我们应该做健康健壮的 redis 集群,区别某个节点挂掉之后能够通过快速故障转移实现节点的恢复,一般采用分片集群加上 rdb 和 aof 混合备份实现数据的备份和节点的集群;在代码层面应该为过期时间添加随机因子,避免同一个时间大面积的 key 同时过期;
总结
缓存穿透是大量的 key 没有命中,导致 redis 形同虚设很多查询直接达到 mysql;
- 缓存预热
- 查询结果置为 null
- 恶意查询进行限流
- 布隆过滤器
缓存击穿是大量的请求通过同一个 key 打向数据库;
- 针对过期策略进行调整,设为永不过期或则在即将过期之前异步更新过期时间
- 互斥锁加锁双检机制
缓存雪崩是同一时间大量的 key 不可用大量请求打向数据库;
- 缓存设计层面要在过期时间加上随机时间
- 做高可用的集群实现高效的故障转移
分布式锁
通过 Redis 实现分布式锁
redis 实现分布式锁的方式和分布式任务和分布式配置注册和配置中心一样,都是通过中性化的思想;用一个 redis 节点或一个 redis 集群来管理锁,这个节点是所有服务器都可见的;如果某个服务器节点想要获取分布式锁就通过在 redis 上执行 setnx 命令,如果设置成功那么就认为获取锁成功,如果获取失败那么就认为获取失败,那么进行等待或则重试;
从分布式锁的实现层面来说就是通过简单的 setnx 实现加锁;但是只是这样简单的加解锁操作完全不能投入生产,因为还需要考虑很多问题;如果一个线程在操作并发资源过程中锁过期了怎么解决;如何实现高可用,如果 redis 节点挂了如何实现迅速的故障转移;如何实现公平锁和可重入锁;
设置过期时间:避免持有锁的线程突然宕机导致锁无法释放; 在每次释放锁的时候通过 lua 删除 key:通过 lua 脚本可以让判断和删除具有原子性,避免了误删不属于自己的锁; 通过 hset 和 HINCRABY 代替 setnx 实现可重入锁:可重入和思想和 synchronized 和 reentrantLock 一样 通过看门狗机制实现自动续期:可以通过获取锁后定时任务来为锁重新设置过期时间,确保线程在执行完成之前都一直持有锁;
redis 是如何实现可重入锁的?
实现思路和 java 中的可重入锁思想相同,都是通过计数方式,如果重入了 n 次那么计数器就是 n,重入时候让这个值+1,释放锁的时候让这个值-1 ;如果计数器的值为 0 那么就释放锁;可以在 setnx 的 value 前后加入一个用于计数的部分;也可以通过 hset 和 hincrby 实现
具体的实现是通过 redis 中的 hset 和 hincrby 实现
-- 参数:KEYS[1]=锁Key, ARGV[1]=过期时间, ARGV[2]=唯一标识(uuid:threadId)
local key = KEYS[1]
local expireTime = ARGV[1]
local identifier = ARGV[2]
-- 1. 判断锁是否存在,且持有者是否是自己
local lockData = redis.call('HGETALL', key)
if lockData and #lockData > 0 then
-- 锁存在,检查持有者
if lockData[1] == 'uuid' and lockData[2] == identifier then
-- 持有者是当前客户端,重入次数 +1
redis.call('HINCRBY', key, 'count', 1)
redis.call('EXPIRE', key, expireTime) -- 续期
return 1 -- 加锁成功
else
return 0 -- 锁被其他客户端持有,获取失败
end
else
-- 锁不存在,初始化锁
redis.call('HMSET', key, 'uuid', identifier, 'count', 1)
redis.call('EXPIRE', key, expireTime)
return 1 -- 加锁成功
end
redis 分布式锁是如何实现自动续期?什么是 watch dog 机制?
redis 的分布式锁在加锁的时候都会设置过期时间,这个过期时间很多情况下无法准确预计持有锁线程的运行时长,如果一个持有锁的线程还没执行完成但是这个时候锁却过期了那么就会有其他线程获取到锁;不但会有并发问题而且还会出现一个线程删除不属于自己锁的情况;所以出现了自动续期的需求;也就是看门狗机制,看门狗机制通过定时器或则定时任务,开启一个新线程不断的为当前线程的锁续期;
public class SimpleWatchDog {
private ScheduledExecutorService scheduler;
private String lockKey;
private String clientId;
private Jedis jedis;
public SimpleWatchDog(String lockKey, String clientId) {
this.lockKey = lockKey;
this.clientId = clientId;
this.scheduler = Executors.newSingleThreadScheduledExecutor();
this.jedis = new Jedis("localhost", 6379);
}
// 启动续期任务
public void start(int expireSeconds) {
scheduler.scheduleAtFixedRate(() -> {
String luaScript =
"if redis.call('GET', KEYS[1]) == ARGV[1] then\n" +
" return redis.call('EXPIRE', KEYS[1], ARGV[2])\n" +
"else\n" +
" return 0\n" +
"end";
Object result = jedis.eval(luaScript, Collections.singletonList(lockKey),
Arrays.asList(clientId, String.valueOf(expireSeconds)));
System.out.println("续期结果: " + result);
}, 0, expireSeconds / 3, TimeUnit.SECONDS); // 每 1/3 过期时间续期一次
}
// 停止续期
public void stop() {
scheduler.shutdown();
jedis.close();
}
}
Lua 脚本
Lua 脚本是将多个 redis 指令通过脚本封装执行的方式确保指令的原子操作,在指令执行的过程中不会被其他线程打断和抢占;
set goods:ipad uuid123
expire goods:ipad 30
get goods:ipad
上面的指令是非原子的,可以通过 lua 将他们封装成一个原子操作
eval "redis.call('set', KEYS[1], ARGS[1]) redis.call('exprie', KEYS[1], '30' return redis.call('get', KEYS[1]))" 1 goods:ipad uuid123
下标从 1 开始
-- 注释以两个减号开头
local key = KEYS[1] -- 获取第1个键
local value = ARGV[1] -- 获取第1个参数
local ttl = ARGV[2] -- 获取第2个参数
if redis.call("EXISTS", key) == 1 then
redis.call("SET", key, value, "EX", ttl)
return 1
else
return 0
end
-- 参数:KEYS[1]=锁的Key, ARGV[1]=锁的唯一标识
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0 -- 非自己的锁,不删除
end
EVAL "if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end" 1 order_lock client1_uuid
redisTemplate 调用 Lua
public class RedisTemplateLuaExample {
private RedisTemplate<String, String> redisTemplate;
// 初始化 RedisTemplate(需在 Spring 中配置 Bean)
public RedisTemplateLuaExample(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 释放分布式锁
* @param lockKey 锁的Key
* @param requestId 锁的唯一标识(防止误删)
* @return 是否释放成功
*/
public boolean releaseLock(String lockKey, String requestId) {
// 1. 创建 RedisScript 对象(指定返回值类型为 Long)
RedisScript<Long> script = new DefaultRedisScript<>(
"if redis.call('GET', KEYS[1]) == ARGV[1] then\n" +
" return redis.call('DEL', KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end",
Long.class
);
// 2. 执行脚本
Long result = redisTemplate.execute(
script,
Collections.singletonList(lockKey), // KEYS 列表
requestId // ARGV 参数(可多个)
);
// 3. 返回是否成功(1=成功,0=失败)
return result != null && result == 1;
}
}
为什么 Lua 脚本可以保证原子性?
lua 脚本会将一组指令封装成一个原子,可以简单理解成一个事务,当 redis 执行这个原子事务的时候其他到执行的指令都只能进行串行等待,直到 lua 脚本执行完成; 比如在删除一个 key 有两个操作:1 判断 key 中的 value 是否是属于当前线程,2 如果属于那么就删除 key;这两个操作需要具备原子性,所以我们应该通过 lua 脚本对他们进行封装;
Lua 和 redis 的事务有什么区别
Lua 和事务都提供了将一组指令原子化的能力;但是两者是有很多不同;
- 原子强度上来说;Lua 会强于事务,因为事务在提交过程中的指令没有执行而是缓存在 redis 中等到 exec 指令,过程中 redis 可以执行其他请求的指令;但是 Lua 利用的是 Redis 单线程的特性,将其他请求指令进行串行等待;
- 从错误处理上来说:Lua 如果中间的指令报错了那么后面的指令都不会执行,还可以通过手动处理错误的方式进行逻辑处理,逻辑处理能力灵活;事务中间报错的指令不会影响后续指令,在指令前后依赖关系强的场景下不适用;
- 从网络传续效率来说:Lua 只继续一次网络 IO,而事务会进行多次。效率上 Lua 更强
Redisson
什么是 RedLock,他解决了什么问题?
RedLock 是解决单点分布式锁架构中单点故障导致锁信息丢失的问题; 如果在普通架构,不论才用是的复制、哨兵还是 cluster 集群架构,如果分布式锁信息都保存在一个 master 节点然后通过主从同步的方式进行锁信息备份,那么就一定会出现锁丢失的风险,因为主从的数据同步是异步执行的,也就是说可能锁的信息还没有同步到从节点但是主节点挂了,这时候同故障转移将从节点选成新的主节点那么也没有丢失锁的信息; RedLock 就是为了解决多点分布式锁的算法,目的是让分布式锁信息保存在多个 master 节点上保证 CP,同时要将加解锁的习惯做到最好;
ReLock 的做法是对多个 Redis 节点做解锁和节点操作一般这个节点的数量是计数,如果半数以上的都加解锁成功并且在超市时间内那么就认为操作是成功的;
RedLock 虽然解决了在故障转移情况下锁信息丢失的问题,但是同时也受到了很多的质疑,主要是针对 RedLock 的脱离了 Redis 性能有限的主旨,RedLock 让系统变得复杂性能也降低了,所以针对不同的场景可以选择用 zookeeper 进行代替;
Redisson的lock和tryLock有什么区别?
Redisson 中的 lock 和 unlock 都是由 JUC 中的 Lock 接口定义的;
- tryLock 是指尝试获取锁,如果获取到那么返回 true,如果获取失败那么就返回失败,不会进行阻塞,但是一定能获取到锁;
- lock 方法则是一定会获取到锁但是会进行阻塞; 以 ReentrantLock 的加锁流程来分析,一开始会通过 tryLock 尝试获取锁获取成功则返回,如果获取失败那么就调用 lock,lock 能保证获取到锁,然后将线程放入队列中进行等待唤醒; Redisson 中也是一样,我们要根据不同的场景选择用 lock 和 tryLock;
Redisson的watchdog机制是怎么样的?
一般为了避免持有锁的线程宕机导致锁无法释放,我们都会为锁设置一个过期时间,但是大部分场景我们都无法预估线程持有锁的具体时长,如果一个锁提前释放那么不会有什么问题,如果一个锁过期了但是业务依然没有执行完成那么就会出现被其他线程抢占锁的情况,这会导致并发问题也会出现删除不属于自己线程锁的问题; 看门狗机制就是为了把控锁的过期时间锁诞生的;Redisson 中默认每个 key 的过期时间是 30s,然后开启看门狗,他每 10s 查看当前持有锁的线程是否在活跃,如果依然在活跃那么就重新将过期时间置为 30s;这样就能确保锁的过期时间和线程的执行时长基本一致;
Redisson 的 watchdog 什么情况下可能会失效?
- 持有锁的线程主动释放锁
- 线程所在进行被 kill 或则电脑宕机
- 线程被中断
- 线程持有锁时间超过上限 leaseTime;
缓存淘汰策略
惰性释放 lazyfree 了解过吗?
惰性释放没有开启之前,我们主线程删除一个 key 的时候其他指令只能进行串行排队等待,如果系统中有很多 bigkey 需要删除,那么就会让阻塞时间变长,造成系统的吞吐量和 IO 处理下降; 开启惰性释放后,在主线程进行删除 key 的时候,会将 key 标记为已删除,然后直接返回,释放操作又后台线程进行,实现了非阻塞异步化的删除降低阻塞等待时间;
Redis 的过期策略是怎么样的?
Redis 采用的是定时删除和惰性删除相结合的方式实现的过期策略; Redis 会没 100 ms 随机抽取部分 key 看他们是否过期;过期的话就标记为已删除然后通过惰性删除的方式释放 key
Redis 的内存淘汰策略是怎么样的?
内存淘汰策略是指当内存满了的情况下应该释放哪些 key 的策略; 可以根据淘汰方式不同分为一下几类:
- 不淘汰,如果内存满了就报错
- 根据 TTL 进行淘汰;会选择过期时间最近的一个 key 进行淘汰;
- LRU:会淘汰最久没有访问的 key;
- LFU:会淘汰访问频率最低的 key;
- 随机淘汰; 他们还可以组合,比如在过期时间最近的 key 中淘汰最久未访问的 key; LRU 的缺点是可能会淘汰某些热 key,LFU 的缺点是可能某些曾经的热 key 会始终保留;
其他问题
除了做缓存,Redis 还能用来干什么?
Redis 提供了很多的数据类型,我们可以利用不同类型的特性实现很多缓存以外的功能;
- 利用 zset 的有序性,可以实现排名功能
- 利用 List 和 Set 具备的随机抽取可以实现抽奖随机能功能
- 利用 stream 流可以实现轻量级的消息订阅和发布
- 利用 GEO,实现附近的人,附近司机最近站点等功能
- 利用 hyperloglog 可以实现 UV,DAU 等信息;
- 利用 bitmap 可以实现签到打卡,布隆过滤器等功能;
- 利用 set 的运算做猜你喜欢和功能好友等功能; 此外还可以利用 Redis 的全局可见性,实现全局的分布式 id 或则全局计数;
Redis 是 AP 的还是 CP 的?
我认为 Redis 是 AP,因为不论是从底层设计还是不断的迭代升级,redis 的思想失踪都是效率和可用性优先;比如单线程的执行设计,主从复制、集群等架构都体现了高可用优先的思想;但是整体系统的角度来看,我们也会通过向 MQ,canel 等手段来实现数据的一致性,还有 ReLock 做分布式锁多点记录也是一致性的体现; 所以 Redis 是 AP,但是在生产中会通过其他机制或则中间件来保证一部分的数据一致性,但是过程中 redis 有一部分的数据不一致;
如何通过 Redis 实现 delayQueue
- 通过 Zset+轮询
将 key 的过期时间作为 score,这样 zset 就会根据过期时间将元素从近到远的方式进行排列; 然后开启一个轮询线程,每秒通过 zrangebyscore 的命令查询 score 小于当前时间戳的元素将他们弹出然后进行处理;
- 通过 Redisson
Redisson 的底层也是通过 Zset 来实现的;redisson 做了很优秀的封装; Redisson 实现消息队列是通过定义生产者和消费者,然后消费中通过 take 阻塞的方式等待消费任务;
Redis 的虚拟内存机制是什么?
#java##面试#是指将内存中一部分不常用的数据暂存到磁盘中,如果需要使用再从磁盘中读取到内存;这个技术已经被移除了,主要原因就是违背了 Redis 的设计初衷,磁盘的读写速度会变成 Redis 的性能瓶颈;