分布式锁—5.Redisson的读写锁
大纲
1.Redisson读写锁RedissonReadWriteLock概述
2.读锁RedissonReadLock的获取读锁逻辑
3.写锁RedissonWriteLock的获取写锁逻辑
4.读锁RedissonReadLock的读读不互斥逻辑
5.RedissonReadLock和RedissonWriteLock的读写互斥逻辑
6.写锁RedissonWriteLock的写写互斥逻辑
7.写锁RedissonWriteLock的可重入逻辑
8.读锁RedissonReadLock的释放读锁逻辑
9.写锁RedissonWriteLock的释放写锁逻辑
1.Redisson读写锁RedissonReadWriteLock概述
(1)RedissonReadWriteLock的简介
(2)RedissonReadWriteLock的使用
(3)RedissonReadWriteLock的初始化
(1)RedissonReadWriteLock的简介
RedissonReadWriteLock提供了两个方法分别获取读锁和写锁。
RedissonReadWriteLock的readLock()方法可以获取读锁RedissonReadLock。
RedissonReadWriteLock的writeLock()方法可以获取写锁RedissonWriteLock。
由于RedissonReadLock和RedissonWriteLock都是RedissonLock的子类,所以只需关注RedissonReadLock和RedissonWriteLock的如下内容即可。
一是获取读锁(写锁)的lua脚本逻辑
二是释放读锁(写锁)的lua脚本逻辑
三是读锁(写锁)的WathDog检查读锁(写锁)和处理锁过期时间的逻辑
(2)RedissonReadWriteLock的使用
//读写锁 RedissonClient redisson = Redisson.create(config); RReadWriteLock rwlock = redisson.getReadWriteLock("myLock"); rwlock.readLock().lock();//获取读锁 rwlock.readLock().unlock();//释放读锁 rwlock.writeLock().lock();//获取写锁 rwlock.writeLock().unlock();//释放写锁 --------------------------------------------------------------- //如果没有主动释放锁的话,10秒后将会自动释放锁 rwlock.readLock().lock(10, TimeUnit.SECONDS); rwlock.writeLock().lock(10, TimeUnit.SECONDS); //加锁等待最多是100秒;加锁成功后如果没有主动释放锁的话,锁会在10秒后自动释放 boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS); boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
(3)RedissonReadWriteLock的初始化
RedissonReadWriteLock实现了RReadWriteLock接口,RedissonReadLock实现了RLock接口,RedissonWriteLock实现了RLock接口。
public class Redisson implements RedissonClient { //Redis的连接管理器,封装了一个Config实例 protected final ConnectionManager connectionManager; //Redis的命令执行器,封装了一个ConnectionManager实例 protected final CommandAsyncExecutor commandExecutor; ... protected Redisson(Config config) { this.config = config; Config configCopy = new Config(config); //初始化Redis的连接管理器 connectionManager = ConfigSupport.createConnectionManager(configCopy); ... //初始化Redis的命令执行器 commandExecutor = new CommandSyncService(connectionManager, objectBuilder); ... } @Override public RReadWriteLock getReadWriteLock(String name) { return new RedissonReadWriteLock(commandExecutor, name); } ... } public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock { public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); } @Override public RLock readLock() { return new RedissonReadLock(commandExecutor, getRawName()); } @Override public RLock writeLock() { return new RedissonWriteLock(commandExecutor, getRawName()); } } public class RedissonReadLock extends RedissonLock implements RLock { public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); } ... } public class RedissonWriteLock extends RedissonLock implements RLock { protected RedissonWriteLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); } ... }
2.读锁RedissonReadLock的获取读锁逻辑
(1)加读锁的lua脚本逻辑
(2)WathDog处理读锁过期时间的lua脚本逻辑
(1)加读锁的lua脚本逻辑
假设客户端A的线程(UUID1:ThreadID1)作为第一个线程进来加读锁,执行流程如下:
public class RedissonLock extends RedissonBaseLock { ... //不带参数的加锁 public void lock() { ... lock(-1, null, false); ... } //带参数的加锁 public void lock(long leaseTime, TimeUnit unit) { ... lock(leaseTime, unit, false); ... } private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); //加锁成功 if (ttl == null) { return; } //加锁失败 ... } private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //非公平锁,接下来调用的是RedissonLock.tryLockInnerAsync()方法 //公平锁,接下来调用的是RedissonFairLock.tryLockInnerAsync()方法 //读写锁中的读锁,接下来调用RedissonReadLock.tryLockInnerAsync()方法 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } //对RFuture<Long>类型的ttlRemainingFuture添加回调监听 CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { //tryLockInnerAsync()里的加锁lua脚本异步执行完毕,会回调如下方法逻辑: //加锁成功 if (ttlRemaining == null) { if (leaseTime != -1) { //如果传入的leaseTime不是-1,也就是指定锁的过期时间,那么就不创建定时调度任务 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //创建定时调度任务 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); } ... } public class RedissonReadLock extends RedissonLock implements RLock { ... @Override <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, //执行命令"hget myLock mode",尝试获取一个Hash值mode "local mode = redis.call('hget', KEYS[1], 'mode'); " + //mode为false则执行加读锁的逻辑 "if (mode == false) then " + //hset myLock mode read "redis.call('hset', KEYS[1], 'mode', 'read'); " + //hset myLock UUID1:ThreadID1 1 "redis.call('hset', KEYS[1], ARGV[2], 1); " + //set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1 "redis.call('set', KEYS[2] .. ':1', 1); " + //pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000 "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " + //pexpire myLock 30000 "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //如果已经有线程加了读锁 或者 有线程加了写锁且是自己加的写锁 //所以一个线程如果加了写锁,它是可以重入自己的写锁和自己的读锁的 "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " + //hincrby myLock UUID2:ThreadID2 1 //ind表示重入次数,线程可以重入自己的读锁和写锁,线程后加的读锁可以重入线程自己的读锁或写锁 "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //key = {myLock}:UUID2:ThreadID2:rwlock_timeout:1 "local key = KEYS[2] .. ':' .. ind;" + //set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1 "redis.call('set', key, 1); " + //pexpire myLock 30000 "redis.call('pexpire', key, ARGV[1]); " + "local remainTime = redis.call('pttl', KEYS[1]); " + //pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000 "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " + "return nil; " + "end;" + //执行命令"pttl myLock",返回myLock的剩余过期时间 "return redis.call('pttl', KEYS[1]);", //KEYS[1] = myLock //KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout 或 KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)), unit.toMillis(leaseTime),//ARGV[1] = 30000 getLockName(threadId),//ARGV[2] = UUID1:ThreadID1 或 ARGV[2] = UUID2:ThreadID2 getWriteLockName(threadId)//ARGV[3] = UUID1:ThreadID1:write 或 ARGV[3] = UUID2:ThreadID2:write ); } ... }
一.参数说明
KEYS[1] = myLock KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout ARGV[1] = 30000 ARGV[2] = UUID1:ThreadID1 ARGV[3] = UUID1:ThreadID1:write
二.执行lua脚本的获取读锁逻辑
首先执行命令"hget myLock mode",尝试获取一个Hash值mode,也就是从key为myLock的Hash值里获取一个field为mode的value值。但是此时一开始都还没有加锁,所以mode肯定是false。于是就执行如下加读锁的逻辑:设置两个Hash值 + 设置一个字符串。
hset myLock mode read //用来记录当前客户端线程重入锁的次数 hset myLock UUID1:ThreadID1 1 //用来记录当前客户端线程第1个重入锁过期时间 set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1 pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000 pexpire myLock 30000
执行完加读锁逻辑后,Redis存在如下结构的数据。其实加读锁的核心在于构造一个递增序列,记录不同线程的读锁和同一个线程不同的重入锁。
field为类似于UUID1:ThreadID1的value值,是用来记录当前客户端线程重入锁次数的。key为类似于{myLock}:UUID1:ThreadID1:rwlock_timeout:1的String,是用来记录当前客户端线程第n个重入锁过期时间的。
假设将key为myLock称为父读锁,key为UUID1:ThreadID1称为子读锁。那么记录每一个子读锁的过期时间,是因为需要根据多个子读锁的过期时间更新父读锁的过期时间。
//1.线程1第一次加读锁 //Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 //2.线程1第二次加读锁 //Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 2 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1 //3.线程1第三次加读锁 //Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 3 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1 //4.线程2第一次加读锁 //Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 3, "UUID2:ThreadID2": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1 {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1 //5.线程2第二次加读锁 //Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 3, "UUID2:ThreadID2": 2 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1 {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1 {myLock}:UUID2:ThreadID2:rwlock_timeout:2 ==> 1
(2)WathDog处理读锁过期时间的lua脚本逻辑
假设客户端A的线程(UUID1:ThreadID1)已经成功获取到一个读锁,此时会创建一个WatchDog定时调度任务,10秒后检查该读锁。执行流程如下:
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock { ... protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); try { //创建一个更新过期时间的定时调度任务 renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { cancelExpirationRenewal(threadId); } } } } //更新过期时间 private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //使用了Netty的定时任务机制:HashedWheelTimer + TimerTask + Timeout //创建一个更新过期时间的定时调度任务,下面会调用MasterSlaveConnectionManager.newTimeout()方法 //即创建一个定时调度任务TimerTask交给HashedWheelTimer,10秒后执行 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } //异步执行lua脚本去更新锁的过期时间 //对于读写锁,接下来会执行RedissonReadLock.renewExpirationAsync()方法 RFuture<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } //res就是执行renewExpirationAsync()里的lua脚本的返回值 if (res) { //重新调度自己 renewExpiration(); } else { //执行清理工作 cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); } protected void cancelExpirationRenewal(Long threadId) { ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null) { return; } if (threadId != null) { task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { timeout.cancel(); } EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } } ... } public class RedissonReadLock extends RedissonLock implements RLock { ... @Override protected RFuture<Boolean> renewExpirationAsync(long threadId) { String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, //执行命令"hget myLock UUID1:ThreadID1",获取当前这个线程是否还持有这个读锁 "local counter = redis.call('hget', KEYS[1], ARGV[2]); " + "if (counter ~= false) then " + //指定的线程还在持有锁,那么就执行"pexpire myLock 30000"刷新锁的过期时间 "redis.call('pexpire', KEYS[1], ARGV[1]); " + "if (redis.call('hlen', KEYS[1]) > 1) then " + //获取key为myLock的Hash值的所有key "local keys = redis.call('hkeys', KEYS[1]); " + //遍历已被线程获取的所有重入和非重入的读锁 "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + //排除掉key为mode的Hash值 "if type(counter) == 'number' then " + //递减拼接重入锁的key,刷新同一个线程的所有重入锁的过期时间 "for i=counter, 1, -1 do " + "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + "end; " + "end; " + "end; " + "end; " + "return 1; " + "end; " + "return 0;", //KEYS[1] = myLock //KEYS[2] = {myLock} Arrays.<Object>asList(getRawName(), keyPrefix), internalLockLeaseTime,//ARGV[1] = 30000毫秒 getLockName(threadId)//ARGV[2] = UUID1:ThreadID1 ); } ... }
一.参数说明
KEYS[1] = myLock KEYS[2] = {myLock} ARGV[1] = 30000 ARGV[2] = UUID1:ThreadID1
二.执行lua脚本的处理逻辑
执行命令"hget myLock UUID1:ThreadID1",尝试获取一个Hash值,也就是获取指定的这个线程是否还持有这个读锁。如果指定的这个线程还在持有这个锁,那么这里返回的是1,于是就会执行"pexpire myLock 30000"刷新锁的过期时间。
接着执行命令"hlen myLock",判断key为锁名的Hash元素个数是否大于1。如果指定的这个线程还在持有这个锁,那么key为myLock的Hash值就至少有两个kv对。其中一个key是mode,一个key是UUID1:ThreadID1。所以这里的判断是成立的,于是遍历处理key为锁名的Hash值。
在遍历处理key为锁名的Hash值时,需要排除掉key为mode的Hash值。然后根据key为UUID + 线程ID的Hash值,通过递减拼接,进行循环遍历,把每一个不同线程的读锁或同一个线程不同的重入锁,都刷新过期时间。
三.总结
WatchDog在处理读锁时,如果指定的线程还持有读锁,那么就会:刷新读锁key的过期时间为30秒,根据重入读锁的次数进行遍历,对重入读锁对应的key的过期时间也刷新为30秒。
//KEYS[1] = myLock //KEYS[2] = {myLock} "if (redis.call('hlen', KEYS[1]) > 1) then " + "local keys = redis.call('hkeys', KEYS[1]); " + //遍历处理key为锁名的Hash值 "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + //排除掉key为mode的Hash值 "if type(counter) == 'number' then " + "for i=counter, 1, -1 do " + //递减拼接,把不同线程的读锁或同一个线程不同的重入锁,都刷新过期时间 "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + "end; " + "end; " + "end; " + "end; " + //Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 3, "UUID2:ThreadID2": 2 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1 {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1 {myLock}:UUID2:ThreadID2:rwlock_timeout:2 ==> 1
3.写锁RedissonWriteLock的获取写锁逻辑
(1)获取写锁的执行流程
(2)获取写锁的lua脚本逻辑
(1)获取写锁的执行流程
假设客户端A的线程(UUID1:ThreadID1)作为第一个线程进来加写锁,执行流程如下:
public class RedissonLock extends RedissonBaseLock { ... //不带参数的加锁 public void lock() { ... lock(-1, null, false); ... } //带参数的加锁 public void lock(long leaseTime, TimeUnit unit) { ... lock(leaseTime, unit, false); ... } private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); //加锁成功 if (ttl == null) { return; } //加锁失败 ... } private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //非公平锁,接下来调用的是RedissonLock.tryLockInnerAsync()方法 //公平锁,接下来调用的是RedissonFairLock.tryLockInnerAsync()方法 //读写锁中的读锁,接下来调用RedissonReadLock.tryLockInnerAsync()方法 //读写锁中的写锁,接下来调用RedissonWriteLock.tryLockInnerAsync()方法 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } //对RFuture<Long>类型的ttlRemainingFuture添加回调监听 CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { //tryLockInnerAsync()里的加锁lua脚本异步执行完毕,会回调如下方法逻辑: //加锁成功 if (ttlRemaining == null) { if (leaseTime != -1) { //如果传入的leaseTime不是-1,也就是指定锁的过期时间,那么就不创建定时调度任务 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //创建定时调度任务 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); } ... } public class RedissonWriteLock extends RedissonLock implements RLock { ... @Override <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, //执行命令"hget myLock mode",尝试获取一个Hash值mode "local mode = redis.call('hget', KEYS[1], 'mode'); " + //获取不到,说明没有加读锁或者写锁 "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'write'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //如果加过锁,那么就要看是不是写锁 + 写锁是不是自己加过的(即重入写锁) "if (mode == 'write') then " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //重入写锁 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " + "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " + "return nil; " + "end; " + "end;" + //执行命令"pttl myLock",返回myLock的剩余过期时间 "return redis.call('pttl', KEYS[1]);", Arrays.<Object>asList(getRawName()),//KEYS[1] = myLock unit.toMillis(leaseTime),//ARGV[1] = 30000 getLockName(threadId)//ARGV[2] = UUID1:ThreadID1:write ); } ... }
(2)获取写锁的lua脚本逻辑
一.参数说明
KEYS[1] = myLock ARGV[1] = 30000 ARGV[2] = UUID1:ThreadID1:write
二.执行分析
首先执行命令"hget myLock mode",尝试获取一个Hash值mode,也就是从key为myLock的Hash值里获取一个field为mode的value值。但是此时一开始都还没有加锁,所以mode肯定是false。于是就执行如下加读锁的逻辑:设置两个Hash值。
hset myLock mode write hset myLock UUID1:ThreadID1:write 1 pexpire myLock 30000
完成加锁操作后,Redis中存在如下数据:
//Hash结构 myLock: { "mode": "write", "UUID1:ThreadID1:write": 1 }
4.读锁RedissonReadLock的读读不互斥逻辑
(1)不同客户端线程读锁与读锁不互斥说明
(2)客户端A先加读锁的Redis命令执行过程和结果
(3)客户端B后加读锁的Redis命令执行过程和结果
(1)不同客户端线程读锁与读锁不互斥说明
假设客户端A(UUID1:ThreadID1)对myLock这个锁先加了一个读锁,客户端B(UUID2:ThreadID2)也要对myLock这个锁加一个读锁,那么此时这两个读锁是不会互斥的,客户端B可以加锁成功。
public class RedissonReadLock extends RedissonLock implements RLock { ... @Override <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, //执行命令"hget myLock mode",尝试获取一个Hash值mode "local mode = redis.call('hget', KEYS[1], 'mode'); " + //mode为false则执行加读锁的逻辑 "if (mode == false) then " + //hset myLock mode read "redis.call('hset', KEYS[1], 'mode', 'read'); " + //hset myLock UUID1:ThreadID1 1 "redis.call('hset', KEYS[1], ARGV[2], 1); " + //set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1 "redis.call('set', KEYS[2] .. ':1', 1); " + //pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000 "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " + //pexpire myLock 30000 "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //如果已经有线程加了读锁 或者 有线程加了写锁且是自己加的写锁 //所以一个线程如果加了写锁,它是可以重入自己的写锁和自己的读锁的 "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " + //hincrby myLock UUID2:ThreadID2 1 //ind表示重入次数,线程可以重入自己的读锁和写锁,线程后加的读锁可以重入线程自己的读锁或写锁 "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //key = {myLock}:UUID2:ThreadID2:rwlock_timeout:1 "local key = KEYS[2] .. ':' .. ind;" + //set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1 "redis.call('set', key, 1); " + //pexpire myLock 30000 "redis.call('pexpire', key, ARGV[1]); " + "local remainTime = redis.call('pttl', KEYS[1]); " + //pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000 "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " + "return nil; " + "end;" + //执行命令"pttl myLock",返回myLock的剩余过期时间 "return redis.call('pttl', KEYS[1]);", //KEYS[1] = myLock //KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout 或 KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)), unit.toMillis(leaseTime),//ARGV[1] = 30000 getLockName(threadId),//ARGV[2] = UUID1:ThreadID1 或 ARGV[2] = UUID2:ThreadID2 getWriteLockName(threadId)//ARGV[3] = UUID1:ThreadID1:write 或 ARGV[3] = UUID2:ThreadID2:write ); } ... }
(2)客户端A先加读锁的Redis命令执行过程和结果
参数说明:
KEYS[1] = myLock KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout ARGV[1] = 30000 ARGV[2] = UUID1:ThreadID1 ARGV[3] = UUID1:ThreadID1:write
Redis命令的执行过程:
hset myLock mode read hset myLock UUID1:ThreadID1 1 set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1 pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000 pexpire myLock 30000
Redis执行结果:
//Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
(3)客户端B后加读锁的Redis命令执行过程和结果
参数说明:
KEYS[1] = myLock KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout ARGV[1] = 30000 ARGV[2] = UUID2:ThreadID2 ARGV[3] = UUID2:ThreadID2:write
Redis命令的执行过程:
hget myLock mode ===> 获取到mode=read,表示此时已经有线程加了读锁 hincrby myLock UUID2:ThreadID2 1 set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1 pexpire myLock 30000 pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000
Redis执行结果:
//Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 1, "UUID2:ThreadID2": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
需要注意的是:多个客户端同时加读锁,读锁与读锁不互斥。会不断在key为锁名的Hash里,自增field为客户端UUID + 线程ID的value值。每个客户端成功加的一次读锁或写锁,都会维持一个WatchDog,不断刷新myLock的生存时间 + 刷新该客户端这次加的锁的过期时间。
加读锁的lua脚本中,ind表示重入次数。线程可重入自己的读锁和写锁。也就是说,线程后加的读锁可以重入线程自己先加的读锁或写锁。
5.RedissonReadLock和RedissonWriteLock的读写互斥逻辑
(1)不同客户端线程先读锁后写锁如何互斥
(2)不同客户端线程先写锁后读锁如何互斥
(1)不同客户端线程先读锁后写锁如何互斥
首先,客户端A(UUID1:ThreadID1)和客户端B(UUID2:ThreadID2)先加读锁,此时Redis中存在如下的数据:
//Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 1, "UUID2:ThreadID2": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
接着,客户端C(UUID3:ThreadID3)来加写锁。
public class RedissonWriteLock extends RedissonLock implements RLock { ... @Override <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, //执行命令"hget myLock mode",尝试获取一个Hash值mode "local mode = redis.call('hget', KEYS[1], 'mode'); " + //此时发现mode=read,说明已有线程加了锁了 "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'write'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //如果加过锁,那么就要看是不是写锁 + 写锁是不是自己加过的(即重入写锁) "if (mode == 'write') then " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //重入写锁 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " + "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " + "return nil; " + "end; " + "end;" + //执行命令"pttl myLock",返回myLock的剩余过期时间 "return redis.call('pttl', KEYS[1]);", Arrays.<Object>asList(getRawName()),//KEYS[1] = myLock unit.toMillis(leaseTime),//ARGV[1] = 30000 getLockName(threadId)//ARGV[2] = UUID3:ThreadID3:write ); } ... }
客户端C(UUID3:ThreadID3)加写锁时的参数:
KEYS[1] = myLock ARGV[1] = 30000 ARGV[2] = UUID3:ThreadID3:write
客户端C(UUID3:ThreadID3)加写锁时:首先执行命令"hget myLock mode"发现mode = read,说明已有线程加了锁了。由于已加的锁不是当前线程加的写锁,而是其他线程加的读锁。所以此时会执行命令"pttl myLock",返回myLock的剩余过期时间。这会导致客户端C加锁失败,会在while循环中阻塞和重试,从而实现先读锁后写锁的互斥。
(2)不同客户端线程先写锁后读锁如何互斥
假设客户端A(UUID1:ThreadID1)先加了一个写锁,此时Redis中存在如下的数据:
//Hash结构 myLock: { "mode": "write", "UUID1:ThreadID1:write": 1 }
然后客户端B(UUID2:ThreadID2)再来加读锁。
public class RedissonReadLock extends RedissonLock implements RLock { ... @Override <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, //执行命令"hget myLock mode",尝试获取一个Hash值mode "local mode = redis.call('hget', KEYS[1], 'mode'); " + //发现mode=write,说明已有线程加了锁了 "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'read'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('set', KEYS[2] .. ':1', 1); " + "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //如果已经有线程加了读锁 或者 有线程加了写锁且是自己加的写锁 //所以一个线程如果加了写锁,它是可以重入自己的写锁和自己的读锁的 "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " + //hincrby myLock UUID2:ThreadID2 1 //ind表示重入次数,线程可以重入自己的读锁和写锁,线程后加的读锁可以重入线程自己的读锁或写锁 "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //key = {myLock}:UUID2:ThreadID2:rwlock_timeout:1 "local key = KEYS[2] .. ':' .. ind;" + //set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1 "redis.call('set', key, 1); " + //pexpire myLock 30000 "redis.call('pexpire', key, ARGV[1]); " + "local remainTime = redis.call('pttl', KEYS[1]); " + //pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000 "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " + "return nil; " + "end;" + //执行命令"pttl myLock",返回myLock的剩余过期时间 "return redis.call('pttl', KEYS[1]);", //KEYS[1] = myLock //KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)), unit.toMillis(leaseTime),//ARGV[1] = 30000 getLockName(threadId),//ARGV[2] = UUID2:ThreadID2 getWriteLockName(threadId)//ARGV[3] = UUID2:ThreadID2:write ); } ... }
客户端B(UUID2:ThreadID2)加读锁时的参数:
KEYS[1] = myLock KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout ARGV[1] = 30000 ARGV[2] = UUID2:ThreadID2 ARGV[3] = UUID2:ThreadID2:write
客户端B(UUID2:ThreadID2)加读锁时:首先执行命令"hget myLock mode"发现mode = write,说明已有线程加了锁了。接下来执行命令"hexists myLock UUID2:ThreadID2:write",发现不存在。也就是说,如果客户端B之前加过写锁,此时B加读锁才能通过判断。但是,之前加写锁的是客户端A,所以这里的判断条件不会通过。于是返回"pttl myLock",导致加读锁失败,会在while循环中阻塞和重试,从而实现先写锁后读锁的互斥。
(3)总结
如果客户端线程A之前先加了写锁,此时该线程再加读锁,可以成功。
如果客户端线程A之前先加了写锁,此时该线程再加写锁,可以成功。
如果客户端线程A之前先加了读锁,此时该线程再加读锁,可以成功。
如果客户端线程A之前先加了读锁,此时该线程再加写锁,不可以成功。
所以写锁可以被自己的写锁重入,也可以被自己的读锁重入。但是读锁可以被任意的读锁重入,不可以被任意的写锁重入。
6.写锁RedissonWriteLock的写写互斥逻辑
(1)不同客户端线程先加写锁的情况
(2)不同客户端线程再加写锁的情况
(1)不同客户端线程先加写锁的情况
假设客户端A(UUID1:ThreadID1)先加写锁:
//传入参数 KEYS[1] = myLock ARGV[1] = 30000 ARGV[2] = UUID1:ThreadID1:write //执行结果 myLock: { "mode": "write", "UUID1:ThreadID1:write": 1 }
(2)不同客户端线程再加写锁的情况
假设客户端B(UUID2:ThreadID2)再加写锁:首先执行命令"hget myLock mode"发现mode = write,说明已有线程加了写锁。然后继续执行命令"hexists myLock UUID2:ThreadID2:write",判断已加的写锁是否是当前客户端B(UUID2:ThreadID2)加的。由于已加的写锁是客户端A(UUID1:ThreadID1)加的,所以判断不通过。于是执行"pttl myLock"返回myLock的剩余过期时间。这样会导致客户端B加写锁失败,于是会在while循环阻塞和重试加写锁,从而实现不同客户端线程的写锁和写锁的互斥。
public class RedissonWriteLock extends RedissonLock implements RLock { ... @Override <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, //执行命令"hget myLock mode",尝试获取一个Hash值mode "local mode = redis.call('hget', KEYS[1], 'mode'); " + //获取不到,说明没有加读锁或者写锁 "if (mode == false) then " + "redis.call('hset', KEYS[1], 'mode', 'write'); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //如果加过锁,那么就要看是不是写锁+写锁是不是自己加过的(即重入写锁) "if (mode == 'write') then " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //重入写锁 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local currentExpire = redis.call('pttl', KEYS[1]); " + "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " + "return nil; " + "end; " + "end;" + //执行命令"pttl myLock",返回myLock的剩余过期时间 "return redis.call('pttl', KEYS[1]);", Arrays.<Object>asList(getRawName()),//KEYS[1] = myLock unit.toMillis(leaseTime),//ARGV[1] = 30000 getLockName(threadId)//ARGV[2] = UUID1:ThreadID1:write 或 ARGV[2] = UUID2:ThreadID2:write ); } ... }
7.写锁RedissonWriteLock的可重入逻辑
(1)同一个客户端线程先加读锁再加读锁
(2)同一个客户端线程先加读锁再加写锁
(3)同一个客户端线程先加写锁再加读锁
(4)同一个客户端线程先加写锁再加写锁
前面分析了不同客户端线程的四种加锁情况:
情况一:先加读锁再加读锁,不互斥
情况二:先加读锁再加写锁,互斥
情况三:先加写锁再加读锁,互斥
情况四:先加写锁再加写锁,互斥
接下来分析同一个客户端线程的四种加锁情况:
情况一:先加读锁再加读锁,不互斥
情况二:先加读锁再加写锁,互斥
情况三:先加写锁再加读锁,不互斥
情况四:先加写锁再加写锁,不互斥
可以这样理解:写锁优先级高,读锁优先级低。同一个线程如果先加了优先级高的写锁,那就可以继续加优先级低的读锁。同一个线程如果先加了优先级低的读锁,那就不可以再加优先级高的写锁。一般锁可以降级,不可以升级。
(1)同一个客户端线程先加读锁再加读锁
客户端A(UUID1:ThreadID1)先加了一次读锁时:
//传入参数 KEYS[1] = myLock KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout ARGV[1] = 30000 ARGV[2] = UUID1:ThreadID1 ARGV[3] = UUID1:ThreadID1:write //执行结果 //Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
客户端A(UUID1:ThreadID1)再加一次读锁时,判断通过可以加成功。
//执行命令 hget myLock mode,发现mode=read,表示已经加过读锁 hincrby myLock UUID1:ThreadID1 1 set {myLock}:UUID1:ThreadID1:rwlock_timeout:2 1 pexpire myLock 30000 pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:2 30000 //执行结果 //Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 2 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
(2)同一个客户端线程先加读锁再加写锁
客户端A(UUID1:ThreadID1)先加了一次读锁时:
//传入参数 KEYS[1] = myLock KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout ARGV[1] = 30000 ARGV[2] = UUID1:ThreadID1 ARGV[3] = UUID1:ThreadID1:write //执行结果 //Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
客户端A(UUID1:ThreadID1)再加一次写锁时,判断不通过,不可以加成功。
//传入参数 KEYS[1] = myLock ARGV[1] = 30000 ARGV[2] = UUID1:ThreadID1:write
执行命令"hget myLock mode",发现mode = read,不符合加写锁条件。所以同一个客户端线程,先加读锁再加写锁,是会互斥的。
(3)同一个客户端线程先加写锁再加读锁
客户端A(UUID1:ThreadID1)先加了一次写锁时:
//传入参数 KEYS[1] = myLock ARGV[1] = 30000 ARGV[2] = UUID1:ThreadID1:write //执行结果 myLock: { "mode": "write", "UUID1:ThreadID1:write": 1 }
客户端A(UUID1:ThreadID1)再加一次读锁时,判断通过,可以加成功。
//传入参数 KEYS[1] = myLock KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout ARGV[1] = 30000 ARGV[2] = UUID1:ThreadID1 ARGV[3] = UUID1:ThreadID1:write //执行命令 hget myLock mode,发现mode=write,表示已经加过写锁 hexists myLock UUID1:ThreadID1:write,判断写锁是自己加的,条件成立 hincrby myLock UUID1:ThreadID1 1,表示此时加了一个读锁 set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1 pexpire myLock 30000 pexpire {myLock}:UUID1:ThreadID11:rwlock_timeout:1 30000 //执行结果 //Hash结构 myLock: { "mode": "write", "UUID1:ThreadID1:write": 1, "UUID1:ThreadID1": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
可见:如果是同一个客户端线程,先加写锁再加读锁,是可以加成功的。所以默认在线程持有写锁的期间,同样的线程可以多次加读锁。
(4)同一个客户端线程先加写锁再加写锁
客户端A(UUID1:ThreadID1)先加了一次写锁时:
//传入参数 KEYS[1] = myLock ARGV[1] = 30000 ARGV[2] = UUID1:ThreadID1:write //执行结果 myLock: { "mode": "write", "UUID1:ThreadID1:write": 1 }
客户端A(UUID1:ThreadID1)再加一次写锁时,判断通过,可以加成功。
//执行命令 hexists myLock UUID1:ThreadID1:write,判断是否是自己加的写锁 hincrby myLock UUID1:ThreadID1:write 1 pexpire myLock 50000 //执行结果 myLock: { "mode": "write", "UUID1:ThreadID1:write": 2 }
可见:读写锁也是一种可重入锁。同一个客户端线程多次加写锁,是可以重入加锁的。先加的写锁是可以被读锁重入,先加的读锁则不可以被写锁重入。
8.读锁RedissonReadLock的释放读锁逻辑
(1)RedissonReadLock的释放读锁的流程
(2)释放读锁前主要三种情况
(3)RedissonReadLock的释放读锁的lua脚本
(4)对合并的情况一和情况二执行lua脚本
(5)对情况三执行lua脚本
(1)RedissonReadLock的释放读锁的流程
释放读锁调用的是RedissonLock的unlock()方法。
在RedissonLock的unlock()方法中,会执行get(unlockAsync())代码。也就是首先调用RedissonBaseLock的unlockAsync()方法,然后调用RedissonObject的get()方法。
其中unlockAsync()方法是异步化执行的方法,释放锁的操作就是异步执行的。而RedisObject的get()方法会通过RFuture同步等待获取异步执行的结果,可以将get(unlockAsync())理解为异步转同步。
在RedissonBaseLock的unlockAsync()方法中:可重入锁会调用RedissonLock.unlockInnerAsync()方法进行异步释放锁,读锁则会调用RedissonReadLock的unlockInnerAsync()方法进行异步释放锁,然后当完成释放锁的处理后,再通过异步去取消定时调度任务。
public class Application { public static void main(String[] args) throws Exception { Config config = new Config(); config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001"); //读写锁 RedissonClient redisson = Redisson.create(config); RReadWriteLock rwlock = redisson.getReadWriteLock("myLock"); rwlock.readLock().lock();//获取读锁 rwlock.readLock().unlock();//释放读锁 rwlock.writeLock().lock();//获取写锁 rwlock.writeLock().unlock();//释放写锁 ... } } public class RedissonLock extends RedissonBaseLock { ... @Override public void unlock() { ... //异步转同步 //首先调用的是RedissonBaseLock的unlockAsync()方法 //然后调用的是RedissonObject的get()方法 get(unlockAsync(Thread.currentThread().getId())); ... } ... } public abstract class RedissonBaseLock extends RedissonExpirable implements RLock { ... @Override public RFuture<Void> unlockAsync(long threadId) { //异步执行释放锁的lua脚本 RFuture<Boolean> future = unlockInnerAsync(threadId); CompletionStage<Void> f = future.handle((opStatus, e) -> { //取消定时调度任务 cancelExpirationRenewal(threadId); if (e != null) { throw new CompletionException(e); } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); throw new CompletionException(cause); } return null; }); return new CompletableFutureWrapper<>(f); } protected abstract RFuture<Boolean> unlockInnerAsync(long threadId); ... } public class RedissonReadLock extends RedissonLock implements RLock { ... @Override protected RFuture<Boolean> unlockInnerAsync(long threadId) { String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "...", Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix), LockPubSub.UNLOCK_MESSAGE, getLockName(threadId) ); } ... }
(2)释放读锁前主要三种情况
情况一:不同客户端线程加了读锁
//Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 1, "UUID2:ThreadID2": 1, } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
情况二:同一个客户端线程多次重入加读锁
//Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 2 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
情况一可以和情况二进行合并:
//Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 2, "UUID2:ThreadID2": 1, } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1 {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
情况三:同一个客户端线程先加写锁再加读锁
//Hash结构 myLock: { "mode": "write", "UUID1:ThreadID1:write": 1, "UUID1:ThreadID1": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
(3)RedissonReadLock的释放读锁的lua脚本
public class RedissonReadLock extends RedissonLock implements RLock { ... @Override protected RFuture<Boolean> unlockInnerAsync(long threadId) { String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, //执行命令"hget myLock mode" "local mode = redis.call('hget', KEYS[1], 'mode'); " + //如果mode为false就发布一个消息 "if (mode == false) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + //执行命令"hexists myLock UUID1:ThreadIdD1",判断当前线程对应的Hash值是否存在 "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " + "if (lockExists == 0) then " + "return nil;" + "end; " + //执行命令"hincrby myLock UUID1:ThreadID1 -1",递减当前线程对应的Hash值 "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + "if (counter == 0) then " + "redis.call('hdel', KEYS[1], ARGV[2]); " + "end;" + //例如执行"del {myLock}:UUID1:ThreadId1:rwlock_timeout:2" //删除当前客户端线程UUID1:ThreadId1的一个重入读锁; "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " + //执行命令"hlen myLock > 1",判断Hash里的元素是否超过1个 "if (redis.call('hlen', KEYS[1]) > 1) then " + "local maxRemainTime = -3; " + //获取key为锁名的Hash值的所有key "local keys = redis.call('hkeys', KEYS[1]); " + //遍历这些key,获取这些重入和非重入的读锁的最大剩余过期时间 "for n, key in ipairs(keys) do " + "counter = tonumber(redis.call('hget', KEYS[1], key)); " + //把key为mode的kv对排除 "if type(counter) == 'number' then " + //通过递减拼接重入锁的key "for i=counter, 1, -1 do " + "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + "maxRemainTime = math.max(remainTime, maxRemainTime);" + "end; " + "end; " + "end; " + //找出所有重入的和非重入的读锁的最大剩余过期时间后,就重置锁的过期时间为该时间 "if maxRemainTime > 0 then " + "redis.call('pexpire', KEYS[1], maxRemainTime); " + "return 0; " + "end;" + "if mode == 'write' then " + "return 0;" + "end; " + "end; " + //删除锁 "redis.call('del', KEYS[1]); " + //发布一个事件 "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; ", //KEYS[1] = myLock,表示锁的名字 //KEYS[2] = redisson_rwlock:{myLock},用于Redis的发布订阅用 //KEYS[3] = {myLock}:UUID1:ThreadID1:rwlock_timeout //KEYS[4] = {myLock} Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix), LockPubSub.UNLOCK_MESSAGE,//ARGV[1] = 0,表示发布事件类型 getLockName(threadId)//ARGV[2] = UUID1:ThreadID1,表示锁里面的该客户端线程代表的key ); } ... }
参数说明:
KEYS[1] = myLock,表示锁的名字 KEYS[2] = redisson_rwlock:{myLock},用于Redis的发布订阅用 KEYS[3] = {myLock}:UUID1:ThreadID1:rwlock_timeout KEYS[4] = {myLock} ARGV[1] = 0,表示发布事件类型 ARGV[2] = UUID1:ThreadID1,表示锁里面的该客户端线程代表的key
(4)对合并的情况一和情况二执行lua脚本
一.客户端A(UUID1:ThreadID1)先释放一次读锁
二.客户端A(UUID1:ThreadID1)再释放一次读锁
三.客户端B(UUID2:ThreadID2)再释放一次读锁
//Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 2, "UUID2:ThreadID2": 1, } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1 {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
一.客户端A(UUID1:ThreadID1)先释放一次读锁
首先执行命令"hget myLock mode",发现mode = read。然后执行命令"hexists myLock UUID1:ThreadIdD1",发现肯定是存在的,因为这个客户端线程UUID1:ThreadIdD1加过读锁。
接着执行命令"hincrby myLock UUID1:ThreadID1 -1",将这个客户端线程对应的加读锁次数递减1,counter由2变成1。当counter大于1,说明还有线程持有着这个读锁。于是接着执行"del {myLock}:UUID1:ThreadId1:rwlock_timeout:2",也就是删除用来记录当前客户端线程第2个重入锁过期时间的key。
此时myLock锁的数据变成如下:
//Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 1, "UUID2:ThreadID2": 1, } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1 {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
于是接着执行命令"hlen myLock",判断Hash里的元素是否超过1个。如果超过1,那么就遍历已被线程获取的所有重入和非重入的读锁,即遍历所有类似"{myLock}:UUID2:ThreadID2:rwlock_timeout:1"的key。
然后接着执行命令"pttl {myLock}:UUID1:ThreadID1:rwlock_timeout:1"。即获取每一个重入读锁和非重入读锁的剩余过期时间,并找出其中最大的。执行"pexpire myLock"重置读锁的过期时间,为最大的剩余过期时间。
二.客户端A(UUID1:ThreadID1)再释放一次读锁
首先执行命令"hincrby myLock UUID1:ThreadID1 -1",将这个客户端线程对应的加读锁次数递减1,counter由1变成0。当counter=0时,就执行命令"hdel myLock UUID1:ThreadID1",即删除用来记录当前客户端线程重入锁次数的key。
然后接着执行命令"del {myLock}:UUID1:ThreadID1:rwlock_timeout:1",即删除用来记录当前客户端线程第1个重入锁过期时间的key。最后获取每个重入读锁和非重入读锁的剩余过期时间,并找出其中最大的。执行"pexpire myLock"重置读锁的过期时间,为最大的剩余过期时间。
此时myLock锁的数据变成如下:
//Hash结构 myLock: { "mode": "read", "UUID2:ThreadID2": 1, } //String结构 {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
三.客户端B(UUID2:ThreadID2)再释放一次读锁
首先执行命令"hincrby myLock UUID2:ThreadID2 -1",将这个客户端线程对应的加读锁次数递减1,counter由1变成0。然后执行命令"hdel myLock UUID2:ThreadID2",即删除用来记录当前客户端线程重入锁次数的key。接着执行命令"del {myLock}:UUID1:ThreadID1:rwlock_timeout:1",即删除用来记录当前客户端线程第1个重入锁过期时间的key。
此时myLock锁的数据变成如下:
//Hash结构 myLock: { "mode": "read" }
此时继续执行命令"hlen myLock",发现为1,判断不通过,于是执行"del myLock"。也就是当没有线程再持有这个读锁时,就会彻底删除这个读锁,然后发布一个事件出去。
(5)对情况三执行lua脚本
这种情况是:同一个客户端线程先加写锁再加读锁。此时myLock锁的数据如下:
//Hash结构 myLock: { "mode": "write", "UUID1:ThreadID1:write": 1, "UUID1:ThreadID1": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
首先执行命令"hincrby myLock UUID1:ThreadID1 -1",将这个客户端线程对应的加读锁次数递减1,counter由1变成0。然后执行命令"hdel myLock UUID1:ThreadID1",即删除用来记录当前客户端线程重入锁次数的key。接着执行"del {myLock}:UUID1:ThreadID1:rwlock_timeout:1",即删除用来记录当前客户端线程第1个重入锁过期时间的key。
此时myLock锁的数据变成如下:
//Hash结构 myLock: { "mode": "write", "UUID1:ThreadID1:write": 1 }
接着执行命令"hlen myLock > 1",判断Hash里的元素是否超过1个。发现判断通过,但由于没有了读锁,所以最后会判断mode如果是write,就返回0。
9.写锁RedissonWriteLock的释放写锁逻辑
(1)释放写锁前主要有两种情况
(2)RedissonWriteLock的释放写锁的lua脚本
(3)执行释放写锁的lua脚本
(1)释放写锁前主要有两种情况
情况一:同一个客户端线程多次重入加写锁
情况二:同一个客户端线程先加写锁再加读锁
这两种情况的锁数据可以合并为如下:
//Hash结构 myLock: { "mode": "write", "UUID1:ThreadID1:write": 2, "UUID1:ThreadID1": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
接下来以这种锁数据为前提进行lua脚本分析。
(2)RedissonWriteLock的释放写锁的lua脚本
public class RedissonWriteLock extends RedissonLock implements RLock { ... @Override protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, //首先执行命令"hget myLock mode",发现mode=write "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (mode == 'write') then " + //然后执行命令"hexists myLock UUID1:ThreadIdD1:write",发现存在 "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " + "if (lockExists == 0) then " + "return nil;" + "else " + //于是接着执行命令"hincrby myLock UUID1:ThreadID1:write -1" "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + //当counter大于0,说明还有线程持有写锁,那么就重置锁的过期时间 "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + //当counter为0,就执行命令"hdel myLock UUID1:ThreadID1:write" "redis.call('hdel', KEYS[1], ARGV[3]); " + //判断key为锁名的Hash里元素是否超过1个 "if (redis.call('hlen', KEYS[1]) == 1) then " + //如果只有1个,则说明没有线程持有锁了,此时可以删除掉锁对应的key "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "else " + //如果有超过1个,则说明还有线程持有读锁,此时需要将写锁转读锁 "redis.call('hset', KEYS[1], 'mode', 'read'); " + "end; " + "return 1; "+ "end; " + "end; " + "end; " + "return nil;", //KEYS[1] = myLock,KEYS[2] = redisson_rwlock:{myLock} Arrays.<Object>asList(getRawName(), getChannelName()), LockPubSub.READ_UNLOCK_MESSAGE,//ARGV[1] = 0 internalLockLeaseTime,//ARGV[2] = 30000 getLockName(threadId)//ARGV[3] = UUID1:ThreadID1:write ); } ... }
(3)执行释放写锁的lua脚本
一.参数说明
KEYS[1] = myLock KEYS[2] = redisson_rwlock:{myLock} ARGV[1] = 0 ARGV[2] = 30000 ARGV[3] = UUID1:ThreadID1:write
二.lua脚本执行分析
首先执行命令"hget myLock mode",发现mode = write。然后执行命令"hexists myLock UUID1:ThreadIdD1:write",发现存在。于是接着执行命令"hincrby myLock UUID1:ThreadID1:write -1",也就是将这个客户端线程对应的加写锁次数递减1,counter由2变成1。当counter大于0,说明还有线程持有写锁,那么就重置锁的过期时间。当counter为0,就执行命令"hdel myLock UUID1:ThreadID1:write",即删除用来记录当前客户端线程重入写锁次数的key。
删除后,myLock的锁数据如下:
//Hash结构 myLock: { "mode": "write", "UUID1:ThreadID1": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
接着执行命令"hlen myLock",判断key为锁名的Hash里元素是否超过1个。如果只有1个,则说明没有线程持有锁了,此时可以删除掉锁对应的key。如果有超过1个,则说明还有线程持有读锁,此时需要将写锁转读锁。
因此,最后myLock的锁数据如下:
//Hash结构 myLock: { "mode": "read", "UUID1:ThreadID1": 1 } //String结构 {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等