分布式锁(第二部分)
分布式锁学习图
5、分布式锁如何解决,解决方案是什么?
分布式锁的解决方案有三大方式:
-
使用数据库分布式锁实现
-
使用Redis分布式锁实现
-
使用Zookeeper分布式锁实现
6、具体解决分布式锁问题
-
使用数据库的悲观锁实现
核心时使用 FOR UPDATE 实现。
该锁是一个行级锁,会锁定操作的这一行,并发效率最高,该锁的含义是指:当前行被当前事务锁定,其他的事务不得对该行进行任何的读和写的操作,只有当当前事务结束之后,才可以对该行数据进行操作。
并且,FOR UPDATE只能在select语句上使用,对于update、delete、insert上是不能是不能使用的。
测试代码:
/** * 创建一个订单 * @param id * @param count * @return * * 在这里值得说的是,for update这个只能添加到select语句的后面,像insert、update、denele语句后面是不能 * 添加的,这个代表的是一个行锁是一个独占锁,针对于当前的事务,其他的事务不得对该读取的数据进行读或者是写。 * 所以,在1000个请求中,才能保证请求正确。 * 有人想问:i你不是说Synchronized同步锁与@Trabsactional注解一起配置的时候需要注意坑吗? * 在这里是没有坑的?所谓的坑,就是该方法已经执行完毕了,但是当前的事务还没有提交事务,就会让第二个请求钻了空子 * 但是对于for update来说,他锁定的就是当前事务,当前事务没有释放锁的话,其他的事务也只能等待,等做完之后,才能继续执行。 */ @Transactional(rollbackFor = Exception.class) @GetMapping("/createOrder") public synchronized String createOrder(Integer id,Integer count) { // 1、根据商品ID查询出商品信息 Product product = productService.selectProduct(id); if (product == null) { return "该商品不存在"; } // 2、根据购买的数量进行判断是否合理 if (product.getCount() < count) { return "商品的库存不足,无法购买"; } // 3、创建订单,并且将其信息插入数据库中 Order insertOrder = new Order(); insertOrder.setId(12); insertOrder.setProductid(id); insertOrder.setCount(count); Integer insertResult = orderService.addOrder(insertOrder); // 4、修改商品表的库存数量信息 Product updateProduct = new Product(); updateProduct.setCount(product.getCount() - count); Integer updateResult = productService.consumerProduct(product.getId(),count); return "事务提交成功---订单创建成功"; }
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.hui.mapper.ProductMapper"> <!-- 1、selectById 据商品id查询商品信息 --> <select id="selectById" resultType="com.hui.pojo.Product"> select * from t_product where id = #{id} for update; </select> </mapper>
-
使用数据库的乐观锁实现
数据库乐观锁只要是使用一个version版本号进行控制的。
当取数据的时候,取出数据的version
当修改数据的时候,拿到取出的version与数据库中的version作比较,如果是一样的话,就执行更新操作,然后version自加1,反之,则不执行,或者重试。
代码如下:
数据库添加一个version字段
添加一个version字段在商品表中 CREATE TABLE `t_product` ( `id` int(11) NOT NULL AUTO_INCREMENT, `count` int(11) DEFAULT NULL, `price` int(10) DEFAULT NULL, `name` varchar(22) DEFAULT NULL, `version` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=10002 DEFAULT CHARSET=utf8;编写Controller执行逻辑
/** * 创建一个订单(乐观锁) * @param id * @param count * @return */ @Transactional(rollbackFor = Exception.class) @GetMapping("/createOrder") public synchronized String createOrder(Integer id,Integer count,Integer version) { // 1、根据商品ID查询出商品信息 Product product = productService.selectProduct(id); if (product == null) { return "该商品不存在"; } // 2、根据购买的数量进行判断是否合理 if (product.getCount() < count) { return "商品的库存不足,无法购买"; } // 3、修改商品表的库存数量信息 Integer updateResult = productService.consumerProduct(product.getId(),count,version); return "事务提交成功---订单创建成功"; } }编写数据库映射配置文件
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.hui.mapper.ProductMapper"> <!-- 1、selectById 据商品id查询商品信息 --> <select id="selectById" resultType="com.hui.pojo.Product"> select * from t_product where id = #{id}; </select> <!-- 2、根据商品的id修改商品信息 --> <update id="updateById" parameterType="int"> update t_product set count = count - #{count}, version = version + 1 where id = #{id} and version = #{version} </update> </mapper>
-
使用Redis实现
使用Redis实现的分布式锁的效率是很高的,因为它本身就是基于内存的非关系型数据库NoSql,可以使用setnx方式实现,但是Redis方式实现的话,是有一些问题的,如下:
存在的问题:Redis 虽然性能很高,但是会存在潜在的问题就是: 1、默认设置锁的时间是持久,但是这样的话,就可能会造成死锁问题,为了解决这个死锁问题,可以设置 key 的存活时间,问题就是存活时间到底长还是短, 时间长了,效率低下;时间短了,可能会造成任务还没有执行完毕,锁就过期了,第二个线程请求就会打进来进而执行。 2、删除锁的时候,要判断是不是自己的锁,如果是的话,在进行删除,反之,不做任何事。这个产生的原因是什么?按照正常的逻辑来看,应该是对于每一个请求 自己处理自己的锁,但是之所以会出现,是因为如果当前的业务执行时间缓慢导致超过了设置锁的存活时间,锁已经过期了,但是业务逻辑还没有执行完成,这时第二个请求进来 开始执行,执行了一半,该业务走到最后,操作了释放锁的操作,这个时候操作的却不是自己的锁,这样就逻辑错误了,进行了误删。 如何解决呢? 可以设置锁的标识,在删除的时候,判断是不是自己的标识,如果是,就删除,反之,就不做事情。 3、Redis 锁的不可重入性问题。
代码执行逻辑实现:
1、首先,解决的是使用Redis进行加锁的情况。
@Transactional(rollbackFor = Exception.class) @GetMapping("/createOrder") public String createOrder(Integer id,Integer count) { // 1、向 Redis 中设置Lock,如果可以设置的话,就返回true,反之,返回false Boolean requireLock = stringRedisTemplate.opsForValue().setIfAbsent("key:" + id, "lock:" + id); if (!requireLock) { log.info("我进入了锁机制"); return "当前锁已被其他人占用,不能重复下单"; } try { // 2、根据商品ID查询出商品信息 Product product = productService.selectProduct(id); if (product == null) { throw new Exception("商品不存在"); } // 3、根据购买的数量进行判断是否合理,当判断库存不足的时候,就需要把设置的锁释放掉,不然后续的请求无法处理 if (product.getCount() < count) { throw new Exception("商品的库存不足,无法购买"); } // 4、修改商品表的库存数量信息 Integer updateResult = productService.consumerProduct(product.getId(),count); return "事务提交成功---订单创建成功"; } catch (Exception e) { e.printStackTrace(); } finally { // 这里最好使用try cache finally 使用finally将锁进行释放掉。 // 5、操作完成之后,将 Redis中的Lock锁进行释放,也就是删除,就需要把设置的锁释放掉,不然后续的请求无法处理 stringRedisTemplate.delete("key:" + id); } return "订单创建失败"; }2、再者,就是进行对锁设置时间,防止服务器宕机产生死锁(我设置的是30秒)
@Transactional(rollbackFor = Exception.class) @GetMapping("/createOrder") public String createOrder(Integer id,Integer count) { // 1、向 Redis 中设置Lock,如果可以设置的话,就返回true,反之,返回false // 1.1、对该锁设置超时时间,设置的是30秒。 Boolean requireLock = stringRedisTemplate.opsForValue().setIfAbsent("key:" + id, "lock:" + id,30,TimeUnit.SECONDS); if (!requireLock) { log.info("我进入了锁机制"); return "当前锁已被其他人占用,不能重复下单"; } try { // 2、根据商品ID查询出商品信息 Product product = productService.selectProduct(id); if (product == null) { throw new Exception("商品不存在"); } // 3、根据购买的数量进行判断是否合理,当判断库存不足的时候,就需要把设置的锁释放掉,不然后续的请求无法处理 if (product.getCount() < count) { throw new Exception("商品的库存不足,无法购买"); } // 4、修改商品表的库存数量信息 Integer updateResult = productService.consumerProduct(product.getId(),count); return "事务提交成功---订单创建成功"; } catch (Exception e) { e.printStackTrace(); } finally { // 这里最好使用try cache finally 使用finally将锁进行释放掉。 // 5、操作完成之后,将 Redis中的Lock锁进行释放,也就是删除,就需要把设置的锁释放掉,不然后续的请求无法处理 stringRedisTemplate.delete("key:" + id); } return "订单创建失败"; }3、锁的时间考虑,一般都是根据业务执行进行考虑的,如果设置的锁的时间长了,虽然在数据一致性上是正确的,但是对于系统的整体性能是由一定的影响的,但是对于锁时间果断的话,就会出现,我的任务还没有执行完毕,那么锁就被释放掉了,这个是有问题的。比如,一样的代码逻辑,但是现在我让时间小一点,看会不会出现问题。
@Transactional(rollbackFor = Exception.class) @GetMapping("/createOrder") public String createOrder(Integer id,Integer count) { // 1、向 Redis 中设置Lock,如果可以设置的话,就返回true,反之,返回false // Boolean requireLock = stringRedisTemplate.opsForValue().setIfAbsent("key:" + id, "lock:" + id); Boolean requireLock = stringRedisTemplate.opsForValue().setIfAbsent("key:" + id, "lock:" + id,1, TimeUnit.SECONDS); if (!requireLock) { log.info("我进入了锁机制"); return "当前锁已被其他人占用,不能重复下单"; } try { // 2、根据商品ID查询出商品信息 Product product = productService.selectProduct(id); if (product == null) { throw new Exception("商品不存在"); } // 3、根据购买的数量进行判断是否合理,当判断库存不足的时候,就需要把设置的锁释放掉,不然后续的请求无法处理 if (product.getCount() < count) { throw new Exception("商品的库存不足,无法购买"); } // 4、修改商品表的库存数量信息 Integer updateResult = productService.consumerProduct(product.getId(),count); return "事务提交成功---订单创建成功"; } catch (Exception e) { e.printStackTrace(); } finally { // 这里最好使用try cache finally 使用finally将锁进行释放掉。 // 5、操作完成之后,将 Redis中的Lock锁进行释放,也就是删除,就需要把设置的锁释放掉,不然后续的请求无法处理 stringRedisTemplate.delete("key:" + id); } return "订单创建失败"; }从代码中可以看到,我设置的时间是1秒,现在发送1000个请求对2个资源做处理,看看会不会出现问题。
4、使用锁的标识
@Slf4j @RestController @RequestMapping("/order") public class RedisOrderController { @Autowired private ProductService productService; @Autowired private StringRedisTemplate stringRedisTemplate; // 1.1、设置锁的标识 private static final String KEY_PREFIX = "key:"; private static final String ID_PREFIX = UUID.randomUUID().toString().replace("-",""); // 使用final,标识不能被修改 /** * 以下方式是使用锁的标识对释放锁进行操作。 * @param id * @param count * @return */ @Transactional(rollbackFor = Exception.class) @GetMapping("/createOrder") public String createOrder(Integer id,Integer count) { // 1、向 Redis 中设置Lock,如果可以设置的话,就返回true,反之,返回false Boolean requireLock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + ID_PREFIX + id, ID_PREFIX + Thread.currentThread().getId()); if (!requireLock) { log.info("我进入了锁机制"); return "当前锁已被其他人占用,不能重复下单"; } try { // 2、根据商品ID查询出商品信息 Product product = productService.selectProduct(id); if (product == null) { throw new Exception("商品不存在"); } // 3、根据购买的数量进行判断是否合理,当判断库存不足的时候,就需要把设置的锁释放掉,不然后续的请求无法处理 if (product.getCount() < count) { throw new Exception("商品的库存不足,无法购买"); } // 4、修改商品表的库存数量信息 Integer updateResult = productService.consumerProduct(product.getId(),count); return "事务提交成功---订单创建成功"; } catch (Exception e) { e.printStackTrace(); } finally { // 这里最好使用try cache finally 使用finally将锁进行释放掉。 // 5、操作完成之后,将 Redis中的Lock锁进行释放,也就是删除,就需要把设置的锁释放掉,不然后续的请求无法处理 // 5.1、删除的时候,判断是不是自己的锁,如果是的话,就杀出 String getLockFlagValue = stringRedisTemplate.opsForValue().get(KEY_PREFIX + ID_PREFIX + id); if (getLockFlagValue.equals(ID_PREFIX + Thread.currentThread().getId())) { stringRedisTemplate.delete(KEY_PREFIX + ID_PREFIX + id); } } return "订单创建失败"; } }
5、Redis的锁的不可重入的问题
什么是锁的重入问题,锁的重入就是:在获得锁的基础之上再一次的获得锁,这就是锁的重入问题。
我将演示一下,在获得锁的基础上再一次的获得锁,然后发送1000个请求对2个共享资源访问,看会出现什么问题。
代码如下:
/** * 以下方法主要实现对Redis不可重入锁的尝试,看看是不是确实是有问题。 * @param id * @param count * @return */ @RequestMapping("/lockReview") public String methodOne(Integer id,Integer count) { // 1、在Redis中获得锁 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("key: " + id, "value: " + Thread.currentThread().getId()); // 2、判断锁的实现 if (!result) { log.info("获得锁失败!"); return "获得锁失败!"; } // 3、获得第二个锁 try { log.info("我已经获得第一把锁"); methodTwo(id,count); } catch (Exception e) { e.printStackTrace(); } finally { stringRedisTemplate.delete("key: " + id); } return "获得锁失败!"; } ============================================================================================================================================================= /** * Redis 第二分布式锁实现 * @param id * @param count * @return */ private String methodTwo(Integer id,Integer count) { // 1、在Redis中获得锁 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("key: " + id, "value: " + Thread.currentThread().getId()); // 2、判断锁的实现 if (!result) { log.info("获得第二把锁失败!"); return "获得第二把锁失败!"; } // 3、获得第二个锁 try { log.info("我已经获得第二把锁"); } catch (Exception e) { e.printStackTrace(); } finally { stringRedisTemplate.delete("key: " + id); } return "获得第二把锁失败!"; }结果:
在很多的结果里面会看到第二个方法是不能获得到锁的,这个也是能想清楚,这就表示前面的锁还没有释放掉自己的锁,那么其他的锁就不能获得。这就充分说明,Redis是不支持锁的重入机制的。
-
使用Zookeeper实现
Zookeeper实现分布式锁的核心就是文件系统进行存储节点再加上监听机制,用于实现节点监听。
这个图就是一个抢夺资源的示例图,共享资源是水,其他人都是在排队等待,当一号结束之后,就会让二号进行,二号结束之后,三号就会进行其他的都是这样。
具体实现过程:
一号线程在Zookeeper的锁路径下创建一个临时顺序节点,该节点扫描完整个锁路径下的临时节点后发现目前是编号最小的,他会占用锁,第二个线程进入Zookeeper之后,也会创建一个临时顺序节点,之后会扫描整个锁路径下看自己的节点序号是不是最小的,目前不是最小的,但是二号不甘心,所以就在一号上面安装一个Watcher用于监听,如果一号结束,二号就会马上收到通知,之后就会执行二号线程,其他的线程也是同样的道理。
Zookeeper是使用临时顺序节点实现分布式锁的。
Zookeeper实现分布式锁的最大特点是:可靠性。
7、最后,对这四种方式进行对比
三种分布式锁的使用师瑶结合具体的应用场景的。
-
数据库分布式锁
优点:简单、使用方便,不需要引入Redis、Zookeeper等中间件。
缺点:
1、不适合高并发场景。
2、DB操作性能较差。
-
Redis分布式锁
优点:
1、性能号,适合高并发场景。
2、较轻量级。
3、有较好的框架支持。比如:Redisson
缺点:
1、过期时间不好控制。
2、需要考虑锁被别的线程误删场景。
-
Zookeeper分布式锁
优点:
1、可靠性高。
缺点:
1、性能不如Redis实现的分布式锁。
2、比较重的分布式锁。
性能汇总:
1、从性能角度:Redis > Zookeeper >= 数据库
2、从实现的复杂性角度:Zookeeper > Redis > 数据库
3、从可靠性角度:Zookeeper > Redis > 数据库