从零到一编码实现Redis分布式锁
使用场景和选型
分布式多节点的部署方式,使得共享变量有可能被同时操作,遇到有数据一致性要求的情况,就需要采取全局锁定的措施来保障并发操作下的一致性要求,如,库存扣减操作、同一个商品的上下架和更新操作等等。
生产环境下,性能往往被优先考虑,相比较各自的优缺点,综合考虑,我们一般更倾向于redis。
从0到1 实现分布式锁
step1: 加锁 和 解锁的基础能力构建
Jedis.set(key, value, params) 👏🏻
这个2.6之后新增的加强版set命令是真不错,解决了加锁时设置锁超时时间的原子诉求,防止服务宕机导致的死锁~
(1) 一个具有加锁解锁功能的分布式锁对象,最少要有 jedis客户端 、 对应的redis key 、 锁超时时间 :
//构建分布式锁对象 public class DistributedLock { private Jedis jedis; private String lockName; private long lockExpireSecond; public DistributedLock(Jedis jedis, String lockName, long lockExpireSecond) { this.jedis = jedis; this.lockName = lockName; this.lockExpireSecond = lockExpireSecond; } }(2) 利用jedis提供的SetParams ,对NX , PX 在jedis.set操作中一次性的原子的完成设置:
public void lock() throws BizException { String lockResult = null; try { //设置 NX PX 参数 SetParams params = new SetParams(); params.nx(); params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond)); //执行加锁 , value 暂定 为固定字符串 lockResult = this.jedis.set(this.lockName, "lockValue", params); } catch (Exception e) { LOG.error("lock error",e); } if ("OK".equals(lockResult)) { LOG.debug("locked success,lockName:{}",lockName); } else { throw new BizException("Get lock failed."); } }(3) 用jedis.del命令完成解锁:
public boolean unlock() { boolean unlockResult=false; try { this.jedis.del(this.lockName); unlockResult=true; }catch (Exception e){ LOG.error("unLock error",e); } return unlockResult; }
step2: 加锁失败直接结束? 希望多试几次
从上面的构造函数和lock()实现,发现当前实现属于一锤子买卖,不成功便成仁。这其实不太满足我们的生产需求,很多场景下,业务执行速度是很快的,只要稍微等一等,就可以。那怎么办?
自定义重试次数和等待间隔,有限重试等待
//新增重试间隔属性 private long retryIntervalTime; //通过构造方法初始化重试间隔 public DistributedLock(Jedis jedis, String lockName, long lockExpireSecond, long retryIntervalTime) { ...略 this.retryIntervalTime = retryIntervalTime; } //新增入参,加锁超时时间 public void lock(long timeout,TimeUnit unit) throws TimeoutException { String lockResult = null; try { //设置 NX PX 参数 SetParams params = new SetParams(); params.nx(); params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond)); //加锁开始时间 long startTime=System.nanoTime(); //循环有限等待 while (!"OK".equals(lockResult=this.jedis.set(this.lockName, "lockValue", params))&&!isTimeout(startTime,unit.toNanos(timeout))){ Thread.sleep(retryIntervalTime); } } catch (Exception e) { LOG.error("lock error",e); } //修改抛出异常类型为超时异常 if ("OK".equals(lockResult)) { LOG.debug("locked success,lockName:{}",lockName); } else { throw new TimeoutException("Get lock failed because of timeout."); } }
step3: 只能解自己加的锁,别人的锁不能乱动
考虑一个问题:我们为了防止加锁后机器宕机的情况,给锁设置了过期时间,以此来保障锁可以在服务节点宕机不能解锁时,也可以给后续业务提供锁操作。
参考《How to do distributed locking》
上图中,因为业务执行时间的不可控(或者遇到GC等不可预期的停顿),给分布式锁带来了使用问题。
我们先看问题一:用户线程1 把 线程2的锁释放了!怎么办呢?
加锁保存线程标识,解锁校验,非自己的锁不释放
//其他属性略,新增lockOwner标识 private String lockOwner; //通过构造函数初始化lockOwner标识 public DistributedLock(Jedis jedis, String lockName, String lockOwner, long lockExpireSecond, long retryIntervalTime) { ...略 this.lockOwner = lockOwner; } public void lock(long timeout,TimeUnit unit) throws TimeoutException { String lockResult = null; try { //设置 NX PX 参数 SetParams params = new SetParams(); params.nx(); params.px(TimeUnit.SECONDS.toMillis(lockExpireSecond)); //加锁开始时间 long startTime=System.nanoTime(); // set时的value 改为 lockOwner while (!"OK".equals(lockResult=this.jedis.set(this.lockName, this.lockOwner, params))&&!isTimeout(startTime,unit.toNanos(timeout))){ Thread.sleep(retryIntervalTime); } } catch (Exception e) { LOG.error("lock error",e); } ...略 } public boolean unlock() { boolean unlockResult=false; try { // 先getValue ,并和当前lockOwner匹配,匹配上才去解锁 if (this.lockOwner.equals(this.jedis.get(this.lockName))) { this.jedis.del(this.lockName); unlockResult = true; } }catch (Exception e){ LOG.error("unLock error",e); } return unlockResult; }
有的同学说,这个解锁的地方,需要用lua包成原子操作。单从功能上来讲,上面的实现也是OK的,因为只有get到的结果和本身匹配,才会进行下述操作。包成lua脚本的目的,应该主要是为了减少一次传输,提高执行效率。
step4: expire时间不够产生并发冲突
也就是之前的图中的问题二:线程1 还在执行中,锁就过期释放了,导致线程2也加锁成功,这直接导致了线程间的业务冲突。怎么办呢?
锁持有期内,根据需要,动态延长锁的过期时间
触发锁延期的方案选型,也是个大事,jdk原生timer、调度线程池、netty的Timer都可以实现,选哪个好?
综合对比精度、资源消耗等方面,Netty中采用时间轮算法的Timer应该是首选,都能管理成千上万的连接、调度心跳检测,拿来搞个锁延期还不是手拿把掐?
public class LockContext { private HashedWheelTimer timer; private LockContext(){ //时间轮参数可以从业务自己的配置获取 // long tickDuration=(Long) config.get("tickDuration"); // int tickPerWheel=(int) config.get("tickPerWheel"); //默认1024 // boolean leakDetection=(Boolean)config.get("leakDetection"); timer = new HashedWheelTimer(new DefaultThreadFactory("distributedLock-timer",true), 10, TimeUnit.MILLISECONDS, 1024, false); }通过构造函数,将上下文及调度器传入分布式锁对象:
public class DistributedLock { //上下文 private LockContext context; //当前持有的 Timer 调度对象 private volatile Timeout lockTimeout; public DistributedLock(Jedis jedis, String lockName, String lockOwner, long lockExpireSecond, long retryIntervalTime, LockContext context) { ...其他属性略 this.context = context; }加锁成功之后,执行调度器注册操作:
public void lock(long timeout, TimeUnit unit) throws TimeoutException { //...加锁 略 if ("OK".equals(lockResult)) { LOGGER.info("locked success,lockName:{}",lockName); try { //注册循环延期事件 registerLoopReExpire(); }finally { if (Thread.currentThread().isInterrupted()&&this.lockTimeout!=null){ LOGGER.warn("线程中断,定时任务取消"); this.lockTimeout.cancel(); } } } else { throw new TimeoutException("Get lock failed because of timeout."); } }方法registerLoopReExpire()中是实际的任务注册和延期操作:
private void registerLoopReExpire() { LOGGER.info("分布式锁延期任务注册"); //每次注册,都把timeout赋给当前锁对象,用于后续解锁时取消 this.lockTimeout = context.getTimer().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { //校验是否还在持有锁,并延长过期时间 boolean isReExpired=reExpireLock(lockName,lockOwner); if (isReExpired) { //自己调自己,循环注册 registerLoopReExpire(); }else { lockTimeout.cancel(); } } }, TimeUnit.SECONDS.toMillis( lockExpireSecond)/2, TimeUnit.MILLISECONDS); LOGGER.info("分布式锁延期任务注册完成"); }这里有几个点需要重点关注:•newTimeout()操作会返回一个Timeout实体,我们需要依赖该实体来对当前任务进行管理,所以需要赋值给锁内部对象。•锁延期,需要根据lockOwner 和 lockName来判断,持有锁才加锁,需要使用lua方式来保证判断和执行的原子性。•执行完延期操作之后,需要根据结果进行后续处理,成功则继续注册,失败则取消当前任务。•定时任务的执行时间,应该要小于锁的过期时间,取过期时间的1/2或1/3或自定义传入。来验证一下,我们设置 锁的过期时间为3秒,业务执行时间为10秒 ,执行:
可以看到,定时任务一共延期了6次,最后一次注册成功了,但是业务执行完随着解锁任务取消了。