BAT大厂生产级Redis高并发分布式锁。面试题及答案


分布式锁相关

考虑分布式商品秒杀系统,库存量较少的商品,如何保证商品数量不超卖?
其实需要保证这种一致性:某个人点击秒杀后系统中查出来的库存量和实际扣减库存时库存量的一致性就可以。Java中有synchronize关键字,可以保证这种一致性,但这种一致性是单JVM实例中的一致性。
对于如何保证分布式系统中数据的一致性,一般是使用对分布式系统可见的锁来实现。

MySQL乐观锁和悲观锁

乐观锁和悲观锁的实现

假设,MySQL数据库中商品库存表product_stock结构如下:


通过给商品库存加乐观锁,判断秒杀是否成功的伪代码:

/**
* 该方法返回是否秒杀成功
* @param productId 产品ID
* @return 是否成功
*/ private boolean isSuccess(int productId) { int affectedCount = 0;
      while (affectedCount == 0) {
      ProductStock product = excute("select * from product_stock where product_id=#{productId}"); if (product.getNumber>0) {
          affectedCount = excute("update product_stock set number=number-1 where product_id=#{productId} and number=#{product.getNumber}");
      } else { return false;
      }
    }
      return true; /*
  更简洁的写法
  int affectedCount = 
      excute("update product_stock set number=number-1 where product_id=#{productId} and number>0");
    return affectedCount!=0;*/

通过给商品库存加悲观锁,判断秒杀是否成功的伪代码:

/**
* 该方法返回是否秒杀成功
* @param productId 产品ID、
* @return 是否秒杀成功
*/ private boolean isSuccess(int productId) {
    ProductStock product = excute("select * from product_stock where product_id=#{productId} for update");
      
      if (product.getNumber>0) {
      excute("update product_stock set number=number-1 where product_id=#{productId}"); return true;
      } else { return false;
      }
}

乐观锁与悲观锁的区别

乐观锁的思路一般是表中增加版本字段,更新时where语句中增加版本的判断,算是一种CAS(Compare And Swep)操作,商品库存场景中number起到了版本控制的作用( and number=#{product.getNumber})。悲观锁之所以是悲观,在于他认为外面的世界太复杂,所以一开始就对商品加上锁(select ... for update),后面可以安心的做判断和更新,因为这时候不会有别人更新这条商品库存。

商品库存表中数量作为版本控制的特殊性,所以给商品库存加乐观锁可以简化为注释中更简洁的写法。在excute("update product_stock set number=number-1 where product_id=#{productId} and number=#{product.getNumber}")执行之后,即使商品数量在更新前发生了变化,更新数量为0,MySQL也会给product_stock的product_id索引加锁;这里的while循环执行不会超过3次。乐观锁是需要加上id+version的联合索引的,否则做不到在不加锁(不阻塞)的情况下实现更新。

分布式锁

网上有很多分布式锁的实现,这里讲一下基于redis的分布式锁的实现:

redis中有个命令setNX,是一种CAS操作,定义是

/**
 * Set {@code value} for {@code key}, only if {@code key} does not exist.
 *
 * @param key must not be {@literal null}.
 * @param value must not be {@literal null}.
 * @return * @see <a href="http://redis.io/commands/setnx">Redis Documentation: SETNX</a>
 */ Boolean setNX(byte[] key, byte[] value);

不同于一般的set命令直接覆盖原值,setNx在更新的时候会判断当前key是否存在,如果存在返回false,如果不存在设置value并返回true。下面的代码利用这个CAS操作写的乐观锁:

/**
 * 获取锁
 *
 * @param key 锁id
 * @return 锁结果
 */ public boolean lock(String key) { try { while (true) { if (redisTemplate.opsForValue().setIfAbsent(key, "")) {
                    redisTemplate.expire(key, 5000, TimeUnit.MILLISECONDS); return true;
                }
            }
        } catch (Exception e) {
            LOGGER.error("get lock {} error", key, e);
        } return false;
    }

释放锁的方法很简单,删掉就可以

/**
 * 释放锁
 *
 * @param key 锁id
 */ public void unlock(String key) {
        redisTemplate.delete(key);
}

存在的问题

在低并发下,上面的两种解决方案都没有什么问题,但在应对类似秒杀应用的高并发场景下,上面的解决方案存在的问题:

  1. MySQL
  2. MySQL就不是为高并发而生的,强一致性才是它的追求。高并发下使用得考虑分库分表,甚至在应用层就得做分离。
  3. 分布式缓存
  4. redis作为高性能的缓存系统,数据层并不存在太大问题。而对于应用层的每个节点来说,高并发下大量线程在运行获取锁的操作,每次其实只会有一个线程处于得到锁的状态;也就是说某一有很多线程运行着无意义的循环,浪费CPU时间。

简介

对于单个系统实例而言,不要做过多无谓的循环,每次派一个线程代表和其它系统实例竞争这把锁,其它线程等待。简单的画个图和普通的分布式锁比较下:


图片左边,每个实例中的线程同时争抢锁;图片右边每个实例中只有一个线程在争抢锁,其它线程在等待队列中,我把实现线程排队的锁叫本地锁;每个单系统派一个线程和其它系统竞争锁,各个系统竞争的叫分布式锁。

wllock的优势

  1. 可以限制获取本地锁的队列长度,超过阈值服务降级。
  2. 可以设置获取锁的最长时间,超过时间任未获得所返回获取锁失败。
  3. AOP方式,只需关注业务处理即可。

引用jar包

下载wllock包,修改build.gradleuploadArchivesrepository(url: "file:**.m2\\repository")到本地maven库,执行gradle jargradle uploadArchives打jar包到本地maven库。

在你新建的gradle项目中的build.gradle中加入依赖compile ("
org.wl:wllock:0.0.1-SNAPSHOT")

配置锁信息

application.yml中添加配置信息:

wl.systemName为当前系统名,将作为redis缓存key值的起始部分。

wllock.lockTimeMaxMillis是最长锁定时长,当系统中某个线程获取锁之后如果超过最长锁定时长,系统将自动释放当前线程分布式锁。

wl.singleWaitThreshold为单实例最长排队长度,当单实例中等待某个锁的队列长度超过最长排队长度,系统将对后面的进程返回获取锁失败,服务降级。

wllock: systemName: order lockTimeMaxMillis: 10000 singleWaitThreshold: 500 spring: redis: host: localhost port: 6379 maxTotal: 400 minIdle: 0 maxIdle: 20

获取锁代码

/**
 * 下单
 * @param accountId
 * @param productId
 */ public void placeOrder(int accountId) {
    LockValue lockValue = new LockValue(String.valueOf(productId));
    lockValue.setTryLockMilliSeconds(1000); boolean placeSuccess = placeOrder(lockValue, accountId, 1000); if (placeSuccess) { //返回支付页面 } else { //返回失败页面 }
} @LockGuard(name = "product") private boolean placeOrder(LockValue lockValue, int accountId, int productId) throws Exception { if (lockValue.isLockSuccess()) { //扣减库存 //下单 return true;
    } else { return false;
    }
}

上面简单演示了下单场景的代码。

需要锁保护的代码需要满足下面的规范:

  1. 方法需要添加 @LockGuard注解,注解中的name表示锁对象的名称。
  2. 方法的第一个入参是LockValue类型参数,LockValue的构造函数参数是key,可以选择性的设置TryLockMilliSeconds

以上两步后

  1. 方法将以wl.systemName+LockGuard.name+LockValue.key为完整的锁id,例如实例中redis中的key值为order:product:1000
  2. 方法中直接通过lockValue.isLockSuccess()判断是否获取锁成功,执行完成也无需手动释放锁。
  3. 锁定方法中处理的事情越少越好,即锁时间越小越好,例如实例中返回支付或者返回失败的页面就不用放到锁保护代码中处理。

本地锁实现

首先,有个静态变量LOCK_MAPkey值就是锁id,value是JUC包下的ReentrantLock,线程间的排队就是通过ReentrantLock实现的。

private static final ConcurrentHashMap<String, ReentrantLock> LOCK_MAP = new ConcurrentHashMap<>();

贴上获取锁的代码

/**
    * 获取本地锁
    * @param key 锁id
    * @param tryMilliSeconds 最大等待时长
    * @return 锁结果
    */ public LocalLockResult tryLock(String key, int tryMilliSeconds) { //1. 根据key值获取ReentrantLock LocalLockResult localLockResult = new LocalLockResult();
       ReentrantLock reentrantLock = LOCK_MAP.get(key); if (reentrantLock == null) {
           reentrantLock = new ReentrantLock(true); //没有key对应的ReentrantLock,写入新的ReentrantLock,并发场景下用了putIfAbsent ReentrantLock existed = LOCK_MAP.putIfAbsent(key, reentrantLock); if (existed != null) {
               reentrantLock = existed;
           }
       }
       localLockResult.setLock(reentrantLock); //如果当前排队数量超过阈值,服务降级返回获取锁失败 if (reentrantLock.getQueueLength() <= lockSettings.getSingleWaitThreshold()) { //获取本地锁 try { if (tryMilliSeconds > 0) { if (reentrantLock.tryLock(tryMilliSeconds, TimeUnit.MILLISECONDS)) {
                       localLockResult.setLockSuccess(true);
                   }
               } else {
                   reentrantLock.lock();
                   localLockResult.setLockSuccess(true);
               }
           } catch (InterruptedException e) {
               LOGGER.error("get local lock {} in {} miliseconds interupted", key, tryMilliSeconds);
           }
       } return localLockResult;
   }

下面是释放本地锁的代码

/**
 * 释放本地锁
 *
 * @param key             锁id
 * @param localLockResult 锁结果
 */ public void unlock(String key, LocalLockResult localLockResult) { if (localLockResult != null && localLockResult.getLock() != null) { if (localLockResult.isLockSuccess()) { localLockResult.getLock().unlock();
        } //当前key没有线程等待锁,删除 if (!localLockResult.getLock().hasQueuedThreads()) { LOCK_MAP.remove(key, localLockResult.getLock());
        }
    }
}

为防止LOCK_MAP里的key值一直加入,导致内存泄漏,在释放锁的时候会判断当前ReentrantLock是否有排队队列,如果没有说明当前并发量小,可以删掉;需要说明的这个操作并不是线程安全的,也就是说可能存在从MAP中删除ReentrantLock时,ReentrantLock中有新加入的线程在排队,不过这并不影响获取锁。

分布式锁实现

分布式锁就是基于redis的实现,不过增加了等待时长的概念,可以设置超过一定时间自动放弃加锁。

通过执行lua脚本实现setNX和expire同时执行,防止分两步执行(setNx执行成功,expire未执行成功)可能导致的死锁。

/**
 * setNX + expire的lua脚本.
 * <p>
 * setNxWithExpire(key, args[1], args[2]).
 * key
 * args[1] 值
 * args[2] 失效时间,单位:毫秒
 */ private static final String SET_NX_WITH_EXPIRE = "local rst = redis.call('SETNX',KEYS[1],ARGV[1]);" + "if (rst==1) then redis.call('PEXPIRE', KEYS[1], ARGV[2]); end;" + " return rst;"; /**
 * setNX + expire脚本.
 */ private static RedisScript<Long> SET_NX_WITH_EXPIRE_SCRIPT = new DefaultRedisScript<>(SET_NX_WITH_EXPIRE, Long.class); public boolean tryLock(String key, int tryMilliSeconds) { try { long start = System.currentTimeMillis(); while (true) {
                List<String> keys = new ArrayList<>(1);
                keys.add(key);
                String value = "";

                Long result = redisTemplate.execute(SET_NX_WITH_EXPIRE_SCRIPT, redisTemplate.getStringSerializer(), LONG_SERIALIZER, keys, value, String.valueOf(lockSettings.getLockTimeMaxMillis())); if (result == 1) { return true;
                } long end = System.currentTimeMillis(); if (tryMilliSeconds > 0 && tryMilliSeconds < (end - start)) { return false;
                }
            }
        } catch (Exception e) {
            LOGGER.error("get distributionlock {} error", key, e);
        } return false;
    }

AOP的方式加锁实现

/***
 * 加锁aspect
 * @param joinPoint 连接点
 * @param lockAnnotation 方法需要带注解 例如LockGuard(name="product")
 * @param lockValue 方法第一个参数是 LockValue
 * @throws Throwable
 */ @Around("@annotation(lockAnnotation) && args(lockValue, ..)") public void aroundAction(JoinPoint joinPoint, LockGuard lockAnnotation, LockValue lockValue) throws Throwable {
    String key = lockSettings.getSystemName() + ":" + lockAnnotation.name() + ":" + lockValue.getKey();
    LockValue.LockDetail lockDetail = null; try {
        lockDetail = tryLock(key, lockValue.getTryLockMilliSeconds());
        lockValue.setLockSuccess(lockDetail.isDistributionLock());
        lockValue.setLockDetail(lockDetail);

        ((ProceedingJoinPoint) joinPoint).proceed();

    } catch (InterruptedException e) {
        LOGGER.error("get {} interupted", key);
    } finally { try { if (lockDetail != null && lockDetail.isDistributionLock()) {
                distributionLock.unlock(key);
            }
        } catch (Exception e) {
            LOGGER.error("unlock distribution {} error", key, e);
        } try { if (lockDetail != null && lockDetail.isLocalLock()) {
                localLock.unlock(key, lockDetail.getLocalLockResult());
            }
        } catch (Exception e) {
            LOGGER.error("unlock local {} error", key, e);
        }
    }
}

其中tryLock是本地方法:

/**
 * 获取本地锁和分布式锁
 *
 * @param key          分布式锁key
 * @param milliSeconds 限时 ms
 * @return * @throws InterruptedException
 */ private LockValue.LockDetail tryLock(String key, long milliSeconds) throws InterruptedException { long start = System.currentTimeMillis();
    LockValue.LockDetail lockDetail = new LockValue.LockDetail(); //先获取本地锁 LocalLockResult localLockResult = localLock.tryLock(key, (int) milliSeconds); long local = System.currentTimeMillis(); boolean distributionLockSuccess = false; if (localLockResult.isLockSuccess()) { long leftMiliSeconds = milliSeconds - (local - start); if (milliSeconds == 0 || leftMiliSeconds > 0) { //再获取分布式锁 distributionLockSuccess = distributionLock.tryLock(key, milliSeconds == 0 ? 0 : (int) leftMiliSeconds / 2);
        }
    } long distribute = System.currentTimeMillis(); //记录锁详细信息 lockDetail.setLocalLockResult(localLockResult);
    lockDetail.setLocalLock(localLockResult.isLockSuccess());
    lockDetail.setDistributionLock(distributionLockSuccess);
    lockDetail.setLocalLockTime(local - start);
    lockDetail.setDistributionLockTime(distribute - local);
    LOGGER.debug("tryLock {} {} , use time : {} ms ", key, lockDetail.isDistributionLock(), System.currentTimeMillis() - start); return lockDetail;
}

Redis面试题及答案

1、什么是Redis?简述它的优缺点?

Redis的全称是:RemoteDictionary.Server,本质上是一个Key-Value类型的内存数据库,很像memcached,整个数据库统统加载在内存当中进行操作,定期通过异步操作把数据库数据flush到硬盘上进行保存。

因为是纯内存操作,Redis的性能非常出色,每秒可以处理超过10万次读写操作,是已知性能最快的Key-ValueDB。

Redis的出色之处不仅仅是性能,Redis最大的魅力是支持保存多种数据结构,此外单个value的最大限制是1GB,不像memcached只能保存1MB的数据,因此Redis可以用来实现很多有用的功能。比方说用他的List来做FIFO双向链表,实现一个轻量级的高性能消息队列服务,用他的Set可以做高性能的tag系统等等。

另外Redis也可以对存入的Key-Value设置expire时间,因此也可以被当作一个功能加强版的memcached来用。Redis的主要缺点是数据库容量受到物理内存的限制,不能用作海量数据的高性能读写,因此Redis适合的场景主要局限在较小数据量的高性能操作和运算上。

2、Redis与memcached相比有哪些优势?

1.memcached所有的值均是简单的字符串,redis作为其替代者,支持更为丰富的数据类型

2.redis的速度比memcached快很多redis的速度比memcached快很多

3.redis可以持久化其数据redis可以持久化其数据

3、Redis支持哪几种数据类型?

String、List、Set、SortedSet、hashes

4、Redis主要消耗什么物理资源?

内存。

5、Redis有哪几种数据淘汰策略?

1.noeviction:返回错误当内存限制达到,并且客户端尝试执行会让更多内存被使用的命令。

2.allkeys-lru:尝试回收最少使用的键(LRU),使得新添加的数据有空间存放。

3.volatile-lru:尝试回收最少使用的键(LRU),但仅限于在过期集合的键,使得新添加的数据有空间存放。

4.allkeys-random:回收随机的键使得新添加的数据有空间存放。

5.volatile-random:回收随机的键使得新添加的数据有空间存放,但仅限于在过期集合的键。

6.volatile-ttl:回收在过期集合的键,并且优先回收存活时间(TTL)较短的键,使得新添加的数据有空间存放。

6、Redis官方为什么不提供Windows版本?

因为目前Linux版本已经相当稳定,而且用户量很大,无需开发windows版本,反而会带来兼容性等问题。

7、一个字符串类型的值能存储最大容量是多少?

512M

8、为什么Redis需要把所有数据放到内存中?

Redis为了达到最快的读写速度将数据都读到内存中,并通过异步的方式将数据写入磁盘。所以redis具有快速和数据持久化的特征,如果不将数据放在内存中,磁盘I/O速度为严重影响redis的性能。在内存越来越便宜的今天,redis将会越来越受欢迎,如果设置了最大使用的内存,则数据已有记录数达到内存限值后不能继续插入新值。

9、Redis集群方案应该怎么做?都有哪些方案?

1.codis

2.目前用的最多的集群方案,基本和twemproxy一致的效果,但它支持在节点数量改变情况下,旧节点数据可恢复到新hash节点。rediscluster3.0自带的集群,特点在于他的分布式算法不是一致性hash,而是hash槽的概念,以及自身支持节点设置从节点。具体看官方文档介绍。

3.在业务代码层实现,起几个毫无关联的redis实例,在代码层,对key进行hash计算,然后去对应的redis实例操作数据。这种方式对hash层代码要求比较高,考虑部分包括,节点失效后的替代算法方案,数据震荡后的自动脚本恢复,实例的监控,等等。

10、Redis集群方案什么情况下会导致整个集群不可用?

有A,B,C三个节点的集群,在没有复制模型的情况下,如果节点B失败了,那么整个集群就会以为缺少5501-11000这个范围的槽而不可用。

11、MySQL里有2000w数据,redis中只存20w的数据,如何保证redis中的数据都是热点数据?

12、Redis有哪些适合的场景?

13、Redis支持的Java客户端都有哪些?官方推荐用哪个?

14、Redis和Redisson有什么关系?

15、Jedis与Redisson对比有什么优缺点?

16、说说Redis哈希槽的概念?

17、Redis集群的主从复制模型是怎样的?

18、Redis集群会有写操作丢失吗?为什么?

19、Redis集群之间是如何复制的?

20、Redis集群最大节点个数是多少?

21、Redis集群如何选择数据库?

22、Redis中的管道有什么用?

23、怎么理解Redis事务?

24、Redis事务相关的命令有哪几个?

25、Rediskey的过期时间和永久有效分别怎么设置?

26、Redis如何做内存优化?

27、Redis回收进程如何工作的?

28.加锁机制

29.锁互斥机制

30.watchdog自动延期机制

31.可重入加锁机制

32.释放锁机制

33.上述Redis分布式锁的缺点

34.使用过Redis分布式锁么,它是怎么实现的?

35.使用过Redis做异步队列么,你是怎么用的?有什么缺点?

36.什么是缓存穿透?如何避免?什么是缓存雪崩?何如避免?

37.redis和memcached什么区别?为什么高并发下有时单线程的redis比多线程的memcached效率要高?

38.redis主从复制如何实现的?redis的集群模式如何实现?redis的key是如何寻址的?

39.使用redis如何设计分布式锁?说一下实现思路?使用zk可以吗?如何实现?这两种有什么区别?

40.知道redis的持久化吗?底层如何实现的?有什么优点缺点?

41.redis过期策略都有哪些?LRU算法知道吗?写一下java代码实现?

42.缓存穿透、缓存击穿、缓存雪崩解决方案?

43.在选择缓存时,什么时候选择redis,什么时候选择memcached

44.缓存与数据库不一致怎么办

45.主从数据库不一致如何解决
46.Redis常见的性能问题和解决方案

47.Redis的数据淘汰策略有哪些

48.Redis当中有哪些数据结构

49.假如Redis里面有1亿个key,其中有10w个key是以某个固定的已知的前缀开头的,如果将它们全部找出来?

50.使用Redis做过异步队列吗,是如何实现的
51.Redis如何实现延时队列

总结

************************************************************************


#Java求职##Java##面试流程##笔经##面经##笔试题目##求面经##MySQL#
全部评论

相关推荐

点赞 评论 收藏
分享
评论
点赞
12
分享

创作者周榜

更多
牛客网
牛客企业服务