分布式锁(第二部分)

分布式锁学习图



5、分布式锁如何解决,解决方案是什么?

分布式锁的解决方案有三大方式:

  • 使用数据库分布式锁实现

  • 使用Redis分布式锁实现

  • 使用Zookeeper分布式锁实现

6、具体解决分布式锁问题

  • 使用数据库的悲观锁实现

核心时使用 FOR UPDATE 实现。

该锁是一个行级锁,会锁定操作的这一行,并发效率最高,该锁的含义是指:当前行被当前事务锁定,其他的事务不得对该行进行任何的读和写的操作,只有当当前事务结束之后,才可以对该行数据进行操作。

并且,FOR UPDATE只能在select语句上使用,对于update、delete、insert上是不能是不能使用的。

测试代码:

/**
     * 创建一个订单
     * @param id
     * @param count
     * @return
     *
     * 在这里值得说的是,for update这个只能添加到select语句的后面,像insert、update、denele语句后面是不能
     * 添加的,这个代表的是一个行锁是一个独占锁,针对于当前的事务,其他的事务不得对该读取的数据进行读或者是写。
     * 所以,在1000个请求中,才能保证请求正确。
     * 有人想问:i你不是说Synchronized同步锁与@Trabsactional注解一起配置的时候需要注意坑吗?
     * 在这里是没有坑的?所谓的坑,就是该方法已经执行完毕了,但是当前的事务还没有提交事务,就会让第二个请求钻了空子
     * 但是对于for update来说,他锁定的就是当前事务,当前事务没有释放锁的话,其他的事务也只能等待,等做完之后,才能继续执行。
     */
    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/createOrder")
    public synchronized String createOrder(Integer id,Integer count) {
        // 1、根据商品ID查询出商品信息
        Product product = productService.selectProduct(id);
        if (product == null) {
            return "该商品不存在";
        }
        // 2、根据购买的数量进行判断是否合理
        if (product.getCount() < count) {
            return "商品的库存不足,无法购买";
        }
        // 3、创建订单,并且将其信息插入数据库中
        Order insertOrder = new Order();
        insertOrder.setId(12);
        insertOrder.setProductid(id);
        insertOrder.setCount(count);
        Integer insertResult = orderService.addOrder(insertOrder);
        // 4、修改商品表的库存数量信息
        Product updateProduct = new Product();
        updateProduct.setCount(product.getCount() - count);
        Integer updateResult = productService.consumerProduct(product.getId(),count);
        return "事务提交成功---订单创建成功";
    }
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hui.mapper.ProductMapper">

<!--    1、selectById 据商品id查询商品信息  -->
    <select id="selectById" resultType="com.hui.pojo.Product">
        select * from t_product where id = #{id} for update;
    </select>
        
</mapper>
  • 使用数据库的乐观锁实现

数据库乐观锁只要是使用一个version版本号进行控制的。

当取数据的时候,取出数据的version

当修改数据的时候,拿到取出的version与数据库中的version作比较,如果是一样的话,就执行更新操作,然后version自加1,反之,则不执行,或者重试。

代码如下:

数据库添加一个version字段

添加一个version字段在商品表中
CREATE TABLE `t_product` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `count` int(11) DEFAULT NULL,
  `price` int(10) DEFAULT NULL,
  `name` varchar(22) DEFAULT NULL,
  `version` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10002 DEFAULT CHARSET=utf8;
编写Controller执行逻辑
/**
     * 创建一个订单(乐观锁)
     * @param id
     * @param count
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/createOrder")
    public synchronized String createOrder(Integer id,Integer count,Integer version) {
        // 1、根据商品ID查询出商品信息
        Product product = productService.selectProduct(id);
        if (product == null) {
            return "该商品不存在";
        }
        // 2、根据购买的数量进行判断是否合理
        if (product.getCount() < count) {
            return "商品的库存不足,无法购买";
        }
        // 3、修改商品表的库存数量信息
        Integer updateResult = productService.consumerProduct(product.getId(),count,version);
        return "事务提交成功---订单创建成功";
    }

}
编写数据库映射配置文件
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hui.mapper.ProductMapper">

<!--    1、selectById 据商品id查询商品信息  -->
    <select id="selectById" resultType="com.hui.pojo.Product">
        select * from t_product where id = #{id};
    </select>

<!--    2、根据商品的id修改商品信息 -->
    <update id="updateById" parameterType="int">
        update t_product set count = count - #{count}, version = version + 1 where id = #{id} and version = #{version}
    </update>

</mapper>
  • 使用Redis实现

使用Redis实现的分布式锁的效率是很高的,因为它本身就是基于内存的非关系型数据库NoSql,可以使用setnx方式实现,但是Redis方式实现的话,是有一些问题的,如下:

存在的问题:Redis 虽然性能很高,但是会存在潜在的问题就是:

1、默认设置锁的时间是持久,但是这样的话,就可能会造成死锁问题,为了解决这个死锁问题,可以设置 key 的存活时间,问题就是存活时间到底长还是短,
时间长了,效率低下;时间短了,可能会造成任务还没有执行完毕,锁就过期了,第二个线程请求就会打进来进而执行。

2、删除锁的时候,要判断是不是自己的锁,如果是的话,在进行删除,反之,不做任何事。这个产生的原因是什么?按照正常的逻辑来看,应该是对于每一个请求
自己处理自己的锁,但是之所以会出现,是因为如果当前的业务执行时间缓慢导致超过了设置锁的存活时间,锁已经过期了,但是业务逻辑还没有执行完成,这时第二个请求进来
开始执行,执行了一半,该业务走到最后,操作了释放锁的操作,这个时候操作的却不是自己的锁,这样就逻辑错误了,进行了误删。
如何解决呢?
可以设置锁的标识,在删除的时候,判断是不是自己的标识,如果是,就删除,反之,就不做事情。

3、Redis 锁的不可重入性问题。

代码执行逻辑实现:

1、首先,解决的是使用Redis进行加锁的情况。

    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/createOrder")
    public String createOrder(Integer id,Integer count) {
        // 1、向 Redis 中设置Lock,如果可以设置的话,就返回true,反之,返回false
        Boolean requireLock = stringRedisTemplate.opsForValue().setIfAbsent("key:" + id, "lock:" + id);
        if (!requireLock) {
            log.info("我进入了锁机制");
            return "当前锁已被其他人占用,不能重复下单";
        }
        try {
            // 2、根据商品ID查询出商品信息
            Product product = productService.selectProduct(id);
            if (product == null) {
                throw new Exception("商品不存在");
            }
            // 3、根据购买的数量进行判断是否合理,当判断库存不足的时候,就需要把设置的锁释放掉,不然后续的请求无法处理
            if (product.getCount() < count) {
                throw new Exception("商品的库存不足,无法购买");
            }
            // 4、修改商品表的库存数量信息
            Integer updateResult = productService.consumerProduct(product.getId(),count);
            return "事务提交成功---订单创建成功";
        } catch (Exception e) {
            e.printStackTrace();
        } finally { // 这里最好使用try cache finally 使用finally将锁进行释放掉。
            // 5、操作完成之后,将 Redis中的Lock锁进行释放,也就是删除,就需要把设置的锁释放掉,不然后续的请求无法处理
            stringRedisTemplate.delete("key:" + id);
        }
        return "订单创建失败";
    }
2、再者,就是进行对锁设置时间,防止服务器宕机产生死锁(我设置的是30秒)
    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/createOrder")
    public String createOrder(Integer id,Integer count) {
        // 1、向 Redis 中设置Lock,如果可以设置的话,就返回true,反之,返回false
        // 1.1、对该锁设置超时时间,设置的是30秒。
        Boolean requireLock = stringRedisTemplate.opsForValue().setIfAbsent("key:" + id, "lock:" + id,30,TimeUnit.SECONDS);
        if (!requireLock) {
            log.info("我进入了锁机制");
            return "当前锁已被其他人占用,不能重复下单";
        }
        try {
            // 2、根据商品ID查询出商品信息
            Product product = productService.selectProduct(id);
            if (product == null) {
                throw new Exception("商品不存在");
            }
            // 3、根据购买的数量进行判断是否合理,当判断库存不足的时候,就需要把设置的锁释放掉,不然后续的请求无法处理
            if (product.getCount() < count) {
                throw new Exception("商品的库存不足,无法购买");
            }
            // 4、修改商品表的库存数量信息
            Integer updateResult = productService.consumerProduct(product.getId(),count);
            return "事务提交成功---订单创建成功";
        } catch (Exception e) {
            e.printStackTrace();
        } finally { // 这里最好使用try cache finally 使用finally将锁进行释放掉。
            // 5、操作完成之后,将 Redis中的Lock锁进行释放,也就是删除,就需要把设置的锁释放掉,不然后续的请求无法处理
            stringRedisTemplate.delete("key:" + id);
        }
        return "订单创建失败";
    }
3、锁的时间考虑,一般都是根据业务执行进行考虑的,如果设置的锁的时间长了,虽然在数据一致性上是正确的,但是对于系统的整体性能是由一定的影响的,但是对于锁时间果断的话,就会出现,我的任务还没有执行完毕,那么锁就被释放掉了,这个是有问题的。比如,一样的代码逻辑,但是现在我让时间小一点,看会不会出现问题。
    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/createOrder")
    public String createOrder(Integer id,Integer count) {
        // 1、向 Redis 中设置Lock,如果可以设置的话,就返回true,反之,返回false
//        Boolean requireLock = stringRedisTemplate.opsForValue().setIfAbsent("key:" + id, "lock:" + id);
        Boolean requireLock = stringRedisTemplate.opsForValue().setIfAbsent("key:" + id, "lock:" + id,1, TimeUnit.SECONDS);
        if (!requireLock) {
            log.info("我进入了锁机制");
            return "当前锁已被其他人占用,不能重复下单";
        }
        try {
            // 2、根据商品ID查询出商品信息
            Product product = productService.selectProduct(id);
            if (product == null) {
                throw new Exception("商品不存在");
            }
            // 3、根据购买的数量进行判断是否合理,当判断库存不足的时候,就需要把设置的锁释放掉,不然后续的请求无法处理
            if (product.getCount() < count) {
                throw new Exception("商品的库存不足,无法购买");
            }
            // 4、修改商品表的库存数量信息
            Integer updateResult = productService.consumerProduct(product.getId(),count);
            return "事务提交成功---订单创建成功";
        } catch (Exception e) {
            e.printStackTrace();
        } finally { // 这里最好使用try cache finally 使用finally将锁进行释放掉。
            // 5、操作完成之后,将 Redis中的Lock锁进行释放,也就是删除,就需要把设置的锁释放掉,不然后续的请求无法处理
            stringRedisTemplate.delete("key:" + id);
        }
        return "订单创建失败";
    }
从代码中可以看到,我设置的时间是1秒,现在发送1000个请求对2个资源做处理,看看会不会出现问题。

4、使用锁的标识
@Slf4j
@RestController
@RequestMapping("/order")
public class RedisOrderController {

    @Autowired
    private ProductService productService;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    // 1.1、设置锁的标识
    private static final String KEY_PREFIX = "key:";
    private static final String ID_PREFIX = UUID.randomUUID().toString().replace("-",""); // 使用final,标识不能被修改
    
    /**
     * 以下方式是使用锁的标识对释放锁进行操作。
     * @param id
     * @param count
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/createOrder")
    public String createOrder(Integer id,Integer count) {
        // 1、向 Redis 中设置Lock,如果可以设置的话,就返回true,反之,返回false

        Boolean requireLock = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + ID_PREFIX + id, ID_PREFIX + Thread.currentThread().getId());
        if (!requireLock) {
            log.info("我进入了锁机制");
            return "当前锁已被其他人占用,不能重复下单";
        }
        try {
            // 2、根据商品ID查询出商品信息
            Product product = productService.selectProduct(id);
            if (product == null) {
                throw new Exception("商品不存在");
            }
            // 3、根据购买的数量进行判断是否合理,当判断库存不足的时候,就需要把设置的锁释放掉,不然后续的请求无法处理
            if (product.getCount() < count) {
                throw new Exception("商品的库存不足,无法购买");
            }
            // 4、修改商品表的库存数量信息
            Integer updateResult = productService.consumerProduct(product.getId(),count);
            return "事务提交成功---订单创建成功";
        } catch (Exception e) {
            e.printStackTrace();
        } finally { // 这里最好使用try cache finally 使用finally将锁进行释放掉。
            // 5、操作完成之后,将 Redis中的Lock锁进行释放,也就是删除,就需要把设置的锁释放掉,不然后续的请求无法处理

            // 5.1、删除的时候,判断是不是自己的锁,如果是的话,就杀出
            String getLockFlagValue = stringRedisTemplate.opsForValue().get(KEY_PREFIX + ID_PREFIX + id);
            if (getLockFlagValue.equals(ID_PREFIX + Thread.currentThread().getId())) {
                stringRedisTemplate.delete(KEY_PREFIX + ID_PREFIX + id);
            }
        }
        return "订单创建失败";
    }
    
}

5、Redis的锁的不可重入的问题

什么是锁的重入问题,锁的重入就是:在获得锁的基础之上再一次的获得锁,这就是锁的重入问题。

我将演示一下,在获得锁的基础上再一次的获得锁,然后发送1000个请求对2个共享资源访问,看会出现什么问题。

代码如下:

    /**
     * 以下方法主要实现对Redis不可重入锁的尝试,看看是不是确实是有问题。
     * @param id
     * @param count
     * @return
     */
    @RequestMapping("/lockReview")
    public String methodOne(Integer id,Integer count) {
        // 1、在Redis中获得锁
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("key: " + id, "value: " + Thread.currentThread().getId());
        // 2、判断锁的实现
        if (!result) {
            log.info("获得锁失败!");
            return "获得锁失败!";
        }
        // 3、获得第二个锁
        try {
            log.info("我已经获得第一把锁");
            methodTwo(id,count);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            stringRedisTemplate.delete("key: " + id);
        }
        return "获得锁失败!";
    }

=============================================================================================================================================================

    /**
     * Redis 第二分布式锁实现
     * @param id
     * @param count
     * @return
     */
    private String methodTwo(Integer id,Integer count) {
        // 1、在Redis中获得锁
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("key: " + id, "value: " + Thread.currentThread().getId());
        // 2、判断锁的实现
        if (!result) {
            log.info("获得第二把锁失败!");
            return "获得第二把锁失败!";
        }
        // 3、获得第二个锁
        try {
            log.info("我已经获得第二把锁");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            stringRedisTemplate.delete("key: " + id);
        }
        return "获得第二把锁失败!";
    }
结果:

在很多的结果里面会看到第二个方法是不能获得到锁的,这个也是能想清楚,这就表示前面的锁还没有释放掉自己的锁,那么其他的锁就不能获得。这就充分说明,Redis是不支持锁的重入机制的。

那这个问题应该如何解决呢?

可以使用Hash算法实现,在Redis中有五种基本数据类型,其中包括Hash,用以上图片作以说明,key为键,value包含两个数据,第一个数据,可以存放当前线程,第二个线程为线程数,操作的时候会进行判断,如果线程相同的话,就会把线程数加加,删除时,会自动的减减,当为0时候就会删除掉hash。
  • 使用Zookeeper实现

Zookeeper实现分布式锁的核心就是文件系统进行存储节点再加上监听机制,用于实现节点监听。

先上一幅图:

这个图就是一个抢夺资源的示例图,共享资源是水,其他人都是在排队等待,当一号结束之后,就会让二号进行,二号结束之后,三号就会进行其他的都是这样。

具体实现过程:

一号线程在Zookeeper的锁路径下创建一个临时顺序节点,该节点扫描完整个锁路径下的临时节点后发现目前是编号最小的,他会占用锁,第二个线程进入Zookeeper之后,也会创建一个临时顺序节点,之后会扫描整个锁路径下看自己的节点序号是不是最小的,目前不是最小的,但是二号不甘心,所以就在一号上面安装一个Watcher用于监听,如果一号结束,二号就会马上收到通知,之后就会执行二号线程,其他的线程也是同样的道理。

Zookeeper是使用临时顺序节点实现分布式锁的。

Zookeeper实现分布式锁的最大特点是:可靠性。

7、最后,对这四种方式进行对比

三种分布式锁的使用师瑶结合具体的应用场景的。

  • 数据库分布式锁

优点:简单、使用方便,不需要引入Redis、Zookeeper等中间件。

    缺点:

            1、不适合高并发场景。

            2、DB操作性能较差。

  • Redis分布式锁

优点:

            1、性能号,适合高并发场景。

            2、较轻量级。

            3、有较好的框架支持。比如:Redisson

缺点:

            1、过期时间不好控制。

            2、需要考虑锁被别的线程误删场景。

  • Zookeeper分布式锁

优点:

            1、可靠性高。

缺点:

            1、性能不如Redis实现的分布式锁。

            2、比较重的分布式锁。

性能汇总:

            1、从性能角度:Redis > Zookeeper >= 数据库

            2、从实现的复杂性角度:Zookeeper > Redis > 数据库

            3、从可靠性角度:Zookeeper > Redis > 数据库

全部评论

相关推荐

无敌虾孝子:喜欢爸爸还是喜欢妈妈
点赞 评论 收藏
分享
评论
1
收藏
分享
牛客网
牛客企业服务