基于SpringBoot与Redis实现分布式锁
Docker运行Redis
拉取最近版本的Redis镜像:
docker pull redis
启动容器:
docker run -d --name redis -p 6379:6379 redis:latest
进入容器内部,测试存储:
# docker exec -it redis redis-cli 127.0.0.1:6379> set name qcy OK 127.0.0.1:6379> get name "qcy"
到这里,说明我们启动成功了。
实现要求
实现的分布式锁必须具有以下特点:
- 互斥:无论在什么时刻,最多只有一个客户端拥有锁
- 锁拥有超时时间。否则加锁的客户端突然宕机,来不及释放锁,会导致所有客户端获取锁失败。
- 在不超时的情况下,客户端只能释放掉自己申请的锁。有这样的一个例子:客户端a申请了锁,设置超时为5秒,可是其运行同步代码超过了5秒。此时客户端b请求锁时,由于该锁超时被redis自动删除,于是客户端b申请锁成功。接着客户端a的同步代码运行结束,这样就会把客户端b申请的锁给释放掉,这样可能会引起数据错乱。
实现原理
如何满足互斥与具有超时时间?
redis是单线程模型,指令是一条一条执行的,因此不存在并发问题。
我们使用以下的指令,来往reids中存一个过期时间为5秒的键值:
set name qcy nx ex 5
其中的name为key,qcy为value,nx代表不存在此key则存储,并且返回OK,若存在则会返回null,ex 5代表过期时间为5秒。
127.0.0.1:6379> set name qcy nx ex 5 OK 127.0.0.1:6379> set name qcy nx ex 5 (nil)
当我们在过期时间内重复存储时,redis会提示操作失败。间隔5秒以后,redis会提示插入成功。
释放锁时,只要删除对应的key即可:
127.0.0.1:6379> set name qcy nx ex 5 OK 127.0.0.1:6379> del name (integer) 1
那么如何释放掉自己申请的锁?
key为锁名称,value可以为申请锁的客户端的唯一标识。那么在释放锁时,不仅仅比较key是否相同,还要比较value是否为申请锁的客户端的唯一标识,且这两次比较必须是原子操作。
为什么必须是原子操作?
假设有这样的一种场景:客户端a申请了名称为name,值为a的锁,接着客户端a想要释放锁,查询出key=name,value=a的键值对,紧接着锁过期,客户端b申请到了锁,将值变为b。可是客户端a内存中的值仍然为a,于是释放掉了客户端b申请到的锁。
利用以上的原理,来实现我们的分布式锁。
代码实现
代码使用最简单的配置
工程目录结构:
pom依赖:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.6</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
其中,SpringBoot与Redis集成需要使用到spring-boot-starter-data-redis,输出日志使用到lombok,测试时用到spring-boot-starter-test
配置如下:
spring: redis: host: localhost port: 6379 logging: level: root: info
最核心的RedisLock类
package com.yang.redislock1; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Arrays; import java.util.concurrent.TimeUnit; /** * @author qcy * @create 2020/08/28 14:21:35 */ @Slf4j @Component public class RedisLock { @Resource StringRedisTemplate template; public boolean tryLock(String key, String value, int expireTime, TimeUnit timeUnit) { Boolean flag = template.opsForValue().setIfAbsent(key, value, expireTime, timeUnit); if (flag == null || !flag) { log.info("申请锁(" + key + "," + value + ")失败"); return false; } log.error("申请锁(" + key + "," + value + ")成功"); return true; } public void unLock(String key, String value) { String script = "if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end"; RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class); Long result = template.execute(redisScript, Arrays.asList(key, value)); if (result == null || result == 0) { log.info("释放锁(" + key + "," + value + ")失败,该锁不存在或锁已经过期"); } else { log.info("释放锁(" + key + "," + value + ")成功"); } } }
其中unLock方法中的lua脚本是一个原子操作,此时这里的KEYS[1]是key,KEYS[2]是value。
if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end
该段脚本的意思是,首先获取锁的value,判断是否等于期待的value,满足的话,则删除该锁,并返回1;否则直接返回0。
为什么不直接删除该key?
因为需要满足实现要求中的第三点:在锁不过期的情况,只能释放掉自己申请的锁。
锁的名称都一样,因此将客户端的唯一标识存进了其value中,那么删除前,需要判断是否是自己创建的锁。
测试类:
第一种情况,两个客户端都正常的创建、释放锁。
package com.yang.redislock1; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; @Slf4j @SpringBootTest class Redislock1ApplicationTests { @Resource RedisLock redisLock; public static final String LOCK_NAME = "name"; public static final String CLIENT_A = "a"; public static final String CLIENT_B = "b"; public static final int EXPIRE_TIME = 5; @Test public void test1() { //客户端a boolean lockResultA = redisLock.tryLock(LOCK_NAME, CLIENT_A, EXPIRE_TIME, TimeUnit.SECONDS); if (lockResultA) { try { //模拟客户端a的操作耗时 Thread.sleep(2 * 1000); } catch (Exception e) { e.printStackTrace(); } finally { redisLock.unLock(LOCK_NAME, CLIENT_A); } } //客户端b boolean lockResultB = redisLock.tryLock(LOCK_NAME, CLIENT_B, EXPIRE_TIME, TimeUnit.SECONDS); if (lockResultB) { try { //模拟客户端b的操作耗时 Thread.sleep(2 * 1000); } catch (Exception e) { e.printStackTrace(); } finally { redisLock.unLock(LOCK_NAME, CLIENT_B); } } } }
运行结果如下:
第二种情况,客户端a申请的锁超时
只需要把上述代码中,客户端a的操作耗时改为6秒即可。
输出如下:
第三种情况,假设客户端a不释放锁
修改测试类代码:
//客户端a boolean lockResultA = redisLock.tryLock(LOCK_NAME, CLIENT_A, EXPIRE_TIME, TimeUnit.SECONDS); if (lockResultA) { try { //模拟客户端a的操作耗时 Thread.sleep(2 * 1000); } catch (Exception e) { e.printStackTrace(); } finally { //redisLock.unLock(LOCK_NAME, CLIENT_A); log.info("客户端a不释放锁"); } } //客户端b boolean lockResultB = redisLock.tryLock(LOCK_NAME, CLIENT_B, EXPIRE_TIME, TimeUnit.SECONDS); if (lockResultB) { try { //模拟客户端b的操作耗时 Thread.sleep(2 * 1000); } catch (Exception e) { e.printStackTrace(); } finally { redisLock.unLock(LOCK_NAME, CLIENT_B); } } }
输出如下:
在集群模式下的问题
以上的场景,在单机版的redis中问题不大,可是在redis集群中,可能存在问题。
客户端a在redis集群中节点1上申请到了锁,然后执行业务逻辑,可是这个时候节点1的锁还没同步到节点2上,节点1突然挂了。这个时候客户端b在节点2上申请锁,立即就成功了。这个时候就出现了一把锁同时被多个客户端持有的情况。
解决此类问题,可以借助RedLock。其大致原理就是,客户端a请求节点1,节点1加锁前向集群中其他超过一半的节点发送加锁请求,只要过半的节点反馈加锁成功,节点1就认为加锁成功。
接着客户端a释放锁时,请求节点1,节点1等待数据同步成功后,向所有其他节点发起释放锁的请求,接着节点1认为释放锁成功。
当然,RedLock算法还会考虑更多的细节问题,这里就不细讲了。