1、为什么需要分布式锁

在多线程并发的情况下,我们用锁来保证一些代码逻辑在同一时间内只能由一个线程访问。比如我们你是用Java中的synchronized关键字或者ReentrantLock类等。
这样面临一个问题,这样加了锁后,可以保证同一个JVM进程内的多个线程同步执行,但是如果是在分布式的环境下,这样子就会有问题,因为这样加锁,只能在加自己的JVM内,如果请求多了分布式集群中的别的节点,那么是没有锁的。
这种情况可以通过分布式锁来解决。
分布式锁可以通过Redis或者Zookeeper来实现,我们来学习Redis分布式锁。

2、redis实现分布式锁及面临的问题

2.1 实现分布式锁的简单逻辑

实现一个分布式锁,首先要有这三个部分:

  • 加锁:可以通过redis的setnx命令来加锁。setnx命令的key是锁的唯一标识,可以根据业务来决定命名,value我们设置成一个随机的UUID或者其他。

此时加锁为:
setnx(key, 1)
当一个线程执行setnx返回了1,说明key原本不存在,该线程得到了锁;如果一个线程执行setnx返回0,说明key已经存在,该线程抢锁失败。

  • 解锁:有加锁自然就要对应着解锁。当获得锁的线程执行完任务,需要释放锁,让其他进程进入。可以通过del指令:

del(key)
释放了锁,其他线程就可以继续通过setnx’命令来获得锁。

  • 设置锁超时:给加的锁设置超时时间。这一步是很有必要的,否则,当一个得到锁的线程在执行任务中挂掉,没有显示地释放锁,那么资源将会被永远锁住,其他线程无法进来。

如果是执行时因为异常崩掉了,可以把解锁方法在finally代码块中,但是如果是运行时,机器断电,程序突然闪断的情况,finally代码块中也无法执行释放锁的逻辑。
因此,setnx的key必须设置一个超时时间,保证就算没有显示地释放锁,锁在一定时间后也会自动释放。setnx不支持超时参数,所以需要额外的指令:
expire(key, 30)

由加锁、解锁、设置锁超时,我们可以得出,初始版本的分布式锁的代码:

  1. ifsetnxkey1 == 1){
  2. expirekey30
  3. try {
  4. // TODO 执行业务逻辑
  5. } finally {
  6. delkey
  7. }
  8. }

2.2 问题1:setnx和expire的非原子性

设想一个极端的场景,如果一个线程A执行setnx成功得到了锁,但是,还没有直行道expire指令设置锁超时时间时,此时线程A所在的服务挂掉了,那么,锁是成功设置了,但是却没有成功的设置过期时间,此时锁基本上就不会过期,别的线程就没法获得锁了。
如何解决呢:
redis在2.6.12以上版本为set指令增加了可选参数,这样在设置锁的时候,就可以设置超时时间:
set(key 1 EX 30 NX)
这里面四个参数分别代表key、value、设置过期时间、过期时间30秒、仅在key不存在时才设置key。

2.3 问题2:删除锁误删了别人的锁

在2.2中,解决了设置锁和设置超时时间的非原子性,这时又面临一个问题:
一开始我们给给key设置的值是一个固定值。如果某个线程成功的得到了锁,锁的超时间是30秒,如果某些原因导致线程A执行很慢,过了三十秒还没执行完,锁自己过期了,然后业务执行完毕,执行删除锁的逻辑,这时有可能把别的线程正在持有的锁给删除调掉了。
如何解决呢:
可以把在占锁时,把值指定为uuid或者线程名,在删除锁时,先判断是不是自己的锁,是自己的锁再删除。
加锁:
String threadId = Thread.currentThread().getId()
set(key,threadId ,30,NX)
解锁:
if(threadId .equals(redisClient.get(key))){
del(key)
}

注:我觉得还是要用UUID,线程名是很有可能重复的

2.4 问题3:验证锁是否是自己的跟删除锁的非原子性

在2.3的基础上,还会面临新的问题:
判断锁是否是自己线程的锁,跟释放锁是否两个独立操作,不是原子性的。
这时会面临一个问题,假设判断锁是否是自己的逻辑,刚从redis判断是自己的,这时把判断为是,从redis返回给应用程序的过程中,锁已经到了超时时间而过期了,然后另一个线程抢占了锁,上了锁。这种情况下,虽然判断了是自己的锁,但是实际上,删除的是别的线程占用的锁。

这时可以通过Lua脚本实现,Lua脚本查看一下具体细节。

2.5 锁的自动续期问题

我们给key设置了自动过期时间,但是如果线程A获得了锁,却超出了过期时间还没有执行完代码逻辑,怎么办? 就需要给锁进行续期。
这里有两种处理方式:

  1. 让获得锁的线程开启一个守护线程,用来给快要过期的锁续航;
  2. 使用Redisson锁。

这个一步一步解决的问题,其实也就是redis实现分布式锁的原理。

3、Redisson

3.1 Redisson是什么

Redission是架设在Redis基础上的一个Java驻内存数据网格。它提供了一系列具有分布式特性的常用工具类。还是先了可重入锁(ReentrantLock)、公平锁(FairLock)、联锁(MultiLock)、红锁(RedLock)、读写锁(ReadWriteLock)等。

3.2 Redisson与SpringBoot整合

导入依赖:

  1. <!-- redisson-->
  2. <dependency>
  3. <groupId>org.redisson</groupId>
  4. <artifactId>redisson-spring-boot-starter</artifactId>
  5. <version>3.15.5</version>
  6. </dependency>

编写配置文件:

spring:
redis:
host: 127.0.0.1
port: 6379
password:
# redisson配置文件路径
redisson:
file: classpath:redisson.yml

在resource下创建redisson.yml


# 单节点配置
singleServerConfig:
  # 连接空闲超时,单位:毫秒
  idleConnectionTimeout: 10000
  # 连接超时,单位:毫秒
  connectTimeout: 10000
  # 命令等待超时,单位:毫秒
  timeout: 3000
  # 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
  # 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
  retryAttempts: 3
  # 命令重试发送时间间隔,单位:毫秒
  retryInterval: 1500
  # 密码
  password:
  # 单个连接最大订阅数量
  subscriptionsPerConnection: 5
  # 客户端名称
  clientName: myredis
  # 节点地址
  address: redis://127.0.0.1:6379
  # 发布和订阅连接的最小空闲连接数
  subscriptionConnectionMinimumIdleSize: 1
  # 发布和订阅连接池大小
  subscriptionConnectionPoolSize: 50
  # 最小空闲连接数
  connectionMinimumIdleSize: 32
  # 连接池大小
  connectionPoolSize: 64
  # 数据库编号
  database: 0
  # DNS监测时间间隔,单位:毫秒
  dnsMonitoringInterval: 5000
# 线程池数量,默认值: 当前处理核数量 * 2
#threads: 0
# Netty线程池数量,默认值: 当前处理核数量 * 2
#nettyThreads: 0
# 编码
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式
transportMode : "NIO"

添加配置类:

package com.yuanhai.sbdemo.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

/**
 * 本类说明:
 */
@Configuration
public class MyRedissonConfig {

    /**
     * 所有对Redisson的使用都是通过RedissonClient对象
     * @return
     * @throws IOException
     */
    @Bean(destroyMethod="shutdown")
    public RedissonClient redissonClient(@Value("${spring.redis.host}") String url) throws IOException {
        //1、创建配置
        //Redis url should start with redis:// or rediss://
        Config config = new Config();
        config.useSingleServer().setAddress("redis://"+url+":6379");
        //2、根据Config创建出RedissonClient示例
        RedissonClient redissonClient = Redisson.create(config);
        return redissonClient;
    }

}

3.3 简单测试使用

package com.yuanhai.sbdemo.controller;

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 本类说明:
*
*/
@RequestMapping("/redislock")
@RestController
public class RedisLockController {

    @Autowired
    private RedissonClient redissonClient;

    @GetMapping("/lock1")
    public String hello() {
        // 通过RedissonClient实例调用getLock()方法获取一把锁,
        // 只要锁的名字一样,就是同一把锁
        RLock myLock = redissonClient.getLock("my-lock");
        // 2.执行加锁方法。 用获取到的锁执行lock()方法即可加锁
        myLock.lock();  // 这个方法是阻塞式等待,加不到锁会一直等待,知道加上锁
        try {
            System.out.println("加锁成功,执行业务..."+Thread.currentThread().getId());
            Thread.sleep(5000);
        }catch (Exception e) {

        }finally {
            // 3.解锁方法
            System.out.println("释放锁..."+Thread.currentThread().getId());
            myLock.unlock();
        }

        return "hello";
    }


}

3.4 3.3中的一些细节

在3.3的例子中,假设解锁方法因为服务器宕机等问题没有解锁,也不会出现死锁问题。
这是因为Redisson的lock有如下特点:

  • 加锁时,默认加的锁都是30s的时间;
  • 锁会自动续期,假设业务执行时间超长,运行期间会自动给锁续上新的30秒,不用担心业务执行时间过长,锁会被自动删掉。
  • 加锁业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁也会在三十秒后自动删除。

3.5 Redisson的看门狗机制

Redisson的看门狗WatchDog解决死锁问题。
我们先把例子改为自己指定锁的过期时间:

 @Autowired
    RedissonClient redisson;

    @ResponseBody
    @GetMapping("/hello")
    public String hello() {
        // 通过RedissonClient实例调用getLock()方法获取一把锁,
        // 只要锁的名字一样,就是同一把锁
        RLock myLock = redisson.getLock("my-lock");
        // 2.执行加锁方法。 用获取到的锁执行lock()方法即可加锁
//        myLock.lock();  // 这个方法是阻塞式等待,加不到锁会一直等待,直到加上锁,默认加的锁都是30s时间。
        //1)、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删掉
        //2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
        myLock.lock(10, TimeUnit.SECONDS); //10秒自动解锁,自动解锁时间一定要大于业务的执行时间。
        try {
            System.out.println("加锁成功,执行业务..."+Thread.currentThread().getId());
            Thread.sleep(30000);
        }catch (Exception e) {

        }finally {
            // 3.解锁方法 假设解锁代码没有运行,redisson会不会出现死锁
            System.out.println("释放锁..."+Thread.currentThread().getId());
            myLock.unlock();
        }

        return "hello";
    }

这里要知道,我们自己设置的自动解锁时间一定要大于业务的执行时间,而且,如果采用自己设置超时时间的方式加锁,那么在锁的时间到了以后,不会自动续期。

对比下自己传入锁超时时间和不指定锁超时时间:

  • 如果我们自己传入锁的超时时间:那么redisson底层就是发送redis一个lua执行脚本,进行占锁,默认超时就是我们指定的时间,超时时间到了之后,如果业务没执行完不会续期。
  • 如果我们不指定锁的超时时间:那么会默认使用看门狗的默认时间LockWatchdogTimeout,也就是30秒。只要占锁成功,就会启动一个定时任务,这个定时任务用来重新给锁设置过期时间,新的过期时间就是看门狗的默认时间。定时任务每隔十秒就会自动续期,续期成三十秒。

实际开发中,如何使用:
实际开发,还是推荐明显的指定超时时间。
lock.lock(30,TimeUnit.SECONDS);
这样省掉了整个续期操作,手动解锁。
只不过这里自己把指定的时间设的大一点,比如就手动设置成三十秒,用以防止业务执行时间过长而超时。业务执行成功后,就手动解锁。
因为如果业务时间过长,说明中间出问题了,完蛋了。
即:加锁时设置超时时间,业务成功后手动解锁。因为如果业务很长时间都没执行完,那么考虑优化业务性能吧。

3.6 Redisson中的锁的种类

Redisson中的锁与JUC中的锁基本一致,用法也基本一致。但是能够分布式上锁解锁。

  • 可重入锁:3.3例子中RedissonClient实例调用getLock方法,获取的是一个可重入锁。

可重入锁:一个线程在执行一个带锁的方法,改方法中又调用了另一个需要相同锁的方法,则该线程可以直接指向调用的方法,无需重新获得锁。

  • 公平锁:通过RedissonClient实例调用getFairLock()方法,获取一个公平锁。

公平锁:它保证了多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待五秒钟后继续下一个线程。也就是说,如果前面有五个线程都处于等待状态,那么后面的线程会等待至少25秒。

  • 联锁:可以将多个锁对象关联到 一个锁对象。通过创建RedissonMultiLock。

这时,所有的锁都上所才算成功。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();

//业务逻辑
lock.unlock();
  • 红锁:当有大部分(一半以上)锁加锁成功后,才算真正获得锁。 ```java RLock lock1 = redissonInstance1.getLock(“lock1”); RLock lock2 = redissonInstance2.getLock(“lock2”); RLock lock3 = redissonInstance3.getLock(“lock3”);

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 同时加锁:lock1 lock2 lock3 // 红锁在大部分节点上加锁成功就算成功。 这里有3个锁,至少2个加锁成功,才算真的加锁成功 lock.lock(); … lock.unlock();


- 读写锁:上读锁的时候可以多个线程获取,上写锁的时候只能由一个线程获取,当写锁未释放的时候,读锁会阻塞获取不了,直到写锁释放。通过RedissonClient实例调用getReadWriteLock()方法获得读写锁,然后读写锁实例调用readLock()方法会上读锁,调用writeLock()方法会上写锁。
```java
/**
 * 保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁)。读锁是一个共享锁
 * 写锁没释放,读就必须等待
 */
@GetMapping("/read")
@ResponseBody
public String readValue() {

    RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
    RLock rLock = readWriteLock.readLock();
    try {
        rLock.lock();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
    }
    return "";
}


@GetMapping("/write")
@ResponseBody
public String writeValue() {

    RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
    RLock rLock = readWriteLock.writeLock();
    try {
        //1、改数据加写锁,读数据加读锁
        rLock.lock();
        TimeUnit.SECONDS.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
    }
    return "";
}
  • 闭锁:通过RedissonClient实例调用getCountDownLatch()获取闭锁。

闭锁:确保某个逻辑在它需要的所有资源都具备之后才继续执行。

  • 信号量:通过RedissonClient实例调用getSemaphore()来获取信号量。

信号量:一个计数信号量。用来控制同时访问特定资源的线程所里,通过协调各个线程,以保证合理的使用公共资源。一般用于流量控制,特别是公共资源有限的应用场景。例如数据的连接,假设数据库连接上线为10个,并线程并发操作数据库可以使用Semaphore累控制并发操作数据库的线程数量最多为10个。

3.7 Redisson中读写锁ReadWriteLock示例

使用读写锁的好处:
保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁、独享锁)。读锁是一个共享锁。
//写锁没释放读就必须等待
// 读 + 读: 相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
// 写 + 读: 等待写锁释放
// 写 + 写: 阻塞方式,下一个 写锁必须等待上一个写锁完全释放,才能进行;
// 读 + 写: 有读锁。写也需要等待。
// 只要有写的存在,都必须等待


    @Autowired
    RedissonClient redisson;

    @Autowired
    StringRedisTemplate redisTemplate;

//保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁、独享锁)。读锁是一个共享锁
    //写锁没释放读就必须等待
    // 读 + 读: 相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
    // 写 + 读: 等待写锁释放
    // 写 + 写: 阻塞方式
    // 读 + 写: 有读锁。写也需要等待。
    // 只要有写的存在,都必须等待
    @GetMapping("/write")
    @ResponseBody
    public String writeValue() {

        RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");

        String uuid = "";
        RLock writeLock = readWriteLock.writeLock();
        try {
            // 1.改数据,加写锁;读数据,加读锁
            writeLock.lock();

            uuid = UUID.randomUUID().toString();
            // 休眠30s,模拟写入数据要花费时间
            Thread.sleep(30000);
            redisTemplate.opsForValue().set("writeValue",uuid);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
        }

        return uuid;
    }

    @GetMapping("/read")
    @ResponseBody
    public String readValue() {

        RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");

        String uuid = "";
        // 加读锁
        RLock readLock = readWriteLock.readLock();
        readLock.lock();
        try {
            uuid = redisTemplate.opsForValue().get("writeValue");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
        }

        return uuid;
    }

3.8 Redisson中信号量Semaphore示例


  @Autowired
    RedissonClient redisson;


/**
     * Redisson信号量使用场景模拟
     * 车库停车,
     * 假设有3车位,来一辆车占用一个车位,走一辆车释放一个车位,想要停车,要看车位够不够
     *
     * 信号量也可以用作分布式限流,来限制每一个应用的流量;
     */
    @GetMapping("/park")
    @ResponseBody
    public String park() throws InterruptedException {
        RSemaphore park = redisson.getSemaphore("park");
        //获取一个信号(可以看作获取一个值),此场景看作占一个车位
        // acquire()是阻塞式获取,即一定要获取一个值,获取不到会一直等待获取
//        park.acquire();
        // tryAcquire()是尝试获取,获取得到就获取,获取不到,就返回false
        boolean b = park.tryAcquire();
        if(b){
            //执行业务
        }else {
            return "error";
        }

        return "ok=>"+b;
    }

    @GetMapping("/go")
    @ResponseBody
    public String go() {
        RSemaphore park = redisson.getSemaphore("park");
        park.release(); //释放一个信号,此场景看作释放一个车位
        return "ok";
    }

3.9 Redisson中闭锁CountDownLatch示例


 @Autowired
    RedissonClient redisson;


/**
     * 闭锁 案例
     *
     * 放假,锁门
     * 1班没人了,2班没人了......
     * 5个班全部走完,我们可以锁大门
     */
    @GetMapping("/lockDoor")
    @ResponseBody
    public String lockDoor() throws InterruptedException {
        RCountDownLatch door = redisson.getCountDownLatch("door");
        door.trySetCount(5);
        door.await(); //等待闭锁都完成

        return "放假了...";
    }

    @GetMapping("/gogogo/{id}")
@ResponseBody
    public String gogogo(@PathVariable("id") Long id){
        RCountDownLatch door = redisson.getCountDownLatch("door");
        door.countDown();//计数减一;

//        CountDownLatch

        return id+"班的人都走了...";
    }