一、分布式锁分类

image.png

二、用redission实现分布式锁

看门狗大致原理图

image.png

可重入锁机制

1、分布式锁实现步骤

1.添加依赖:

  1. <dependency>
  2. <groupId>org.redisson</groupId>
  3. <artifactId>redisson</artifactId>
  4. <version>3.8.2</version>
  5. </dependency>

2.添加配置文件:
由于使用的springoot作为开发环境,所有自定义bean进行注入:

  1. @Configuration
  2. public class RedissonConfig {
  3. @Bean
  4. public RedissonClient getClient() {
  5. Config config = new Config();
  6. config.useSingleServer().setAddress("redis://127.0.0.1:6379");
  7. RedissonClient redisson = Redisson.create(config);
  8. return redisson;
  9. }
  10. }

本次使用单机的redis作为测试用例,只有一个几点,Redisson支持集群模式的redis.
3.实现分布式锁:

  1. RLock lock = redisson.getLock("lockName");
  2. try{
  3. // 1. 最常见的使用方法
  4. //lock.lock();
  5. // 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁
  6. //lock.lock(10, TimeUnit.SECONDS);
  7. // 3. 尝试加锁,最多等待2秒,上锁以后8秒自动解锁
  8. boolean res = lock.tryLock(2, 8, TimeUnit.SECONDS);
  9. if(res){ //成功
  10. //处理业务
  11. }
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. } finally {
  15. //释放锁
  16. lock.unlock();
  17. }

4.小案例

  1. long lockTimeOut = 5000L;
  2. String lockName = "lock";
  3. Long setnx = RedisSharedPoolUtil.setnx(lockName, String.valueOf(System.currentTimeMillis() + lockTimeOut));
  4. if (setnx != null && setnx.intValue() == 1) {
  5. //返回值是1,表示成功,获取锁
  6. //do something
  7. } else {
  8. //为获取到锁,继续判断,看能否重置,并且获取到锁.
  9. String s = RedisSharedPoolUtil.get(lockName);
  10. if (s != null && System.currentTimeMillis() > Long.parseLong(s)) {
  11. //不为空,而且已经超时了 //重置锁
  12. //上面的s 和这个set有可能不一样,因为是集群tomcat.返回给定的key,->旧值判断,是否可以获取锁
  13. //当key没有旧值时,即key不存在,返回nil->获取锁
  14. //这里我们set了一个新value,获取旧值
  15. String set = RedisSharedPoolUtil.getSet(lockName, String.valueOf(System.currentTimeMillis() + lockTimeOut));
  16. if (set == null || (set != null && StringUtils.equals(s, set))) {
  17. //真正获取到锁
  18. //do something
  19. } else {
  20. log.info("没有获取到分布式锁:{}", lockName);
  21. }
  22. } else {
  23. log.info("没有获取到分布式锁:{}", lockName);
  24. }
  25. log.info("没有获取到分布式锁:{}", lockName);
  26. }
  27. log.info("完成定时关闭订单的任务");

三、分布式锁相关面试题

1、分布式锁的主要用途

保证一个方法在高并发情况下的同一时间只能被同一个线程执行。

2、分布式锁避免哪些问题

(1)允许多个客户端操作共享资源
这种情况下,对共享资源的操作一定是幂等性操作,无论你操作多少次都不会出现不同结果。在这里使用锁,无外乎就是为了避免重复操作共享资源从而提高效率。
(2)只允许一个客户端操作共享资源
这种情况下,对共享资源的操作一般是非幂等性操作。在这种情况下,如果出现多个客户端操作共享资源,就可能意味着数据不一致,数据丢失。

3.redis的分布式锁如何实现?

正常情况下redis使用setnx lock加锁,使用del lock释放锁就可以了。但是如果还没释放锁,服务中途就挂了,这样锁得不到释放造成死锁。可以加上超时时间,等服务启了在释放锁。
因为setnx和expire指令不是原子性,redis在2.8以后加入set扩展命令,使得这两个命令能一起执行。

问题一:分布式锁不能解决超时问题
如果在加锁或者释放锁的期间,业务逻辑太长了导致超时时间过了业务还没执行完,第二个线程重新持有锁 就造成业务的得不到严格执行。这时候需要lua脚本执行。确保原子性。

问题二:可重入锁
redis使用redission插件,首先客户端会发lua脚本个redis查找是否有同名锁,没有就加锁。有就等待该线程释放锁。锁一旦加上会启动一个watch dog,这是一个后台线程,每过十秒检查是否持有锁,如果还持有就会延长key的生存时间。
如果reids主节点宕机了,那么系统会自动切换到slave节点上。但是主从在切换过程中,主从复制是异步的操作,就导致锁丧失了安全性,可能导致第二个线程占有原先的锁。

问题三:主节点宕机,锁安全得不到保障
也是使用redission,其中有一个redlock红锁。Redlock的算法大致是这样。假设有N个主节点。在此我们假设N为5。这些节点相互独立,互不影响。在不同服务器上运行实例。保证不会同时宕机。
客户端应该执行以下操作:

获取当前时间,精准到毫秒
使用相同的key和value依次请求5个redis获取锁。这时候客户端应该建立请求响应时间和超时时间。超时时间应该小于锁的失效时间。这样可以避免如果是redis挂了,客户端就没必要等待,继续去下一个redis获取锁。
只有获取超过半数redis的锁并且获取锁的时间小于锁的失效时间才算获取成功。
如果获取锁失败 应该释放全部的锁

问题四:加锁失败怎么处理
抛出异常,通知用户稍后重试
Sleep 一会重试
将请求转移到延时队列。稍后重试
[

](https://blog.csdn.net/My_daidai/article/details/107232107)