1、Redission概述

1.1、Redission是什么?

如果你之前是在用 Redis 的话,那使用 Redisson 的话将会事半功倍,Redisson 提供了使用 Redis的最简单和最便捷的方法。
Redisson的宗旨是促进使用者对 Redis 的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。image.png

  • Netty 框架:Redisson采用了基于NIO的Netty框架,不仅能作为Redis底层驱动客户端,具备提供对Redis各种组态形式的连接功能,对Redis命令能以同步发送、异步形式发送、异步流形式发送或管道形式发送的功能,LUA脚本执行处理,以及处理返回结果的功能
  • 基础数据结构:将原生的Redis Hash,List,Set,String,Geo,HyperLogLog等数据结构封装为Java里大家最熟悉的映射(Map),列表(List),集(Set),通用对象桶(Object Bucket),地理空间对象桶(Geospatial Bucket),基数估计算法(HyperLogLog)等结构,
  • 分布式数据结构:这基础上还提供了分布式的多值映射(Multimap),本地缓存映射(LocalCachedMap),有序集(SortedSet),计分排序集(ScoredSortedSet),字典排序集(LexSortedSet),列队(Queue),阻塞队列(Blocking Queue),有界阻塞列队(Bounded Blocking Queue),双端队列(Deque),阻塞双端列队(Blocking Deque),阻塞公平列队(Blocking Fair Queue),延迟列队(Delayed Queue),布隆过滤器(Bloom Filter),原子整长形(AtomicLong),原子双精度浮点数(AtomicDouble),BitSet等Redis原本没有的分布式数据结构。
  • 分布式锁:Redisson还实现了Redis文档中提到像分布式锁Lock这样的更高阶应用场景。事实上Redisson并没有不止步于此,在分布式锁的基础上还提供了联锁(MultiLock),读写锁(ReadWriteLock),公平锁(Fair Lock),红锁(RedLock),信号量(Semaphore),可过期性信号量(PermitExpirableSemaphore)和闭锁(CountDownLatch)这些实际当中对多线程高并发应用至关重要的基本部件。正是通过实现基于Redis的高阶应用方案,使Redisson成为构建分布式系统的重要工具。

2、整合Redission

Spring Boot 整合 Redisson 有两种方案:

  • 程序化配置。
  • 文件方式配置。

本篇介绍如何用程序化的方式整合 Redisson。

2.1、程序化配置

1、引入Maven依赖

  1. <!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
  2. <dependency>
  3. <groupId>org.redisson</groupId>
  4. <artifactId>redisson</artifactId>
  5. <version>3.15.5</version>
  6. </dependency>

2、自定义配置类

  1. @Configuration
  2. public class MyRedissonConfig {
  3. /**
  4. * 对 Redisson 的使用都是通过 RedissonClient 对象
  5. * @return
  6. * @throws IOException
  7. */
  8. @Bean(destroyMethod="shutdown") // 服务停止后调用 shutdown 方法。
  9. public RedissonClient redisson() throws IOException {
  10. // 1.创建配置
  11. Config config = new Config();
  12. // 集群模式
  13. // config.useClusterServers().addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001");
  14. // 2.根据 Config 创建出 RedissonClient 示例。
  15. config.useSingleServer().setAddress("redis://127.0.0.1:6379");
  16. return Redisson.create(config);
  17. }
  18. }

3、测试类

  1. @Autowired
  2. RedissonClient redissonClient;
  3. @Test
  4. public void TestRedisson() {
  5. System.out.println(redissonClient);
  6. }

3、可重入分布式锁

3.1、可重入锁

基于Redis的Redisson分布式可重入锁RLockJava 对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

  1. RLock lock = redisson.getLock("anyLock");
  2. // 最常见的使用方法
  3. lock.lock();

我们用 passjava 这个开源项目测试下可重入锁的两个点:

  • (1)多个线程抢占锁,后面锁需要等待吗?
  • (2)如果抢占到锁的线程所在的服务停了,锁会不会被释放?

    3.1.1、验证一:可重入锁是阻塞的吗?

    为了验证以上两点,我写了个 demo 程序:代码的流程就是设置WuKong-lock锁,然后加锁,打印线程 ID,等待 10 秒后释放锁,最后返回响应:“test lock ok”。 ```java @ResponseBody @GetMapping(“test-lock”) public String TestLock() { // 1.获取锁,只要锁的名字一样,获取到的锁就是同一把锁。 RLock lock = redisson.getLock(“WuKong-lock”);

    // 2.加锁 lock.lock(); try {

    1. System.out.println("加锁成功,执行后续代码。线程 ID:" + Thread.currentThread().getId());
    2. Thread.sleep(10000);

    } catch (Exception e) {

    1. //TODO

    } finally {

    1. lock.unlock();
    2. // 3.解锁
    3. System.out.println("Finally,释放锁成功。线程 ID:" + Thread.currentThread().getId());

    }

    return “test lock ok”; }

  1. 先验证第一个点,用两个 http 请求来测试抢占锁。<br />请求的 URL
  2. ```http
  3. http://localhost:11000/question/v1/redisson/test/test-lock

image.png
第一个线程对应的线程 ID 为 86,10秒后,释放锁。在这期间,第二个线程需要等待锁释放。
第一个线程释放锁之后,第二个线程获取到了锁,10 秒后,释放锁。
画了一个流程图,帮助大家理解。如下图所示:
image.png

  • 第一步:线程 A 在 0 秒时,抢占到锁,0.1 秒后,开始执行等待 10 s。
  • 第二步:线程 B 在 0.1 秒尝试抢占锁,未能抢到锁(被 A 抢占了)。
  • 第三步:线程 A 在 10.1 秒后,释放锁。
  • 第四步:线程 B 在 10.1 秒后抢占到锁,然后等待 10 秒后释放锁。

由此可以得出结论,Redisson 的可重入锁(lock)是阻塞其他线程的,需要等待其他线程释放的。

3.1.2 验证二:服务停了,锁会释放吗?

如果线程 A 在等待的过程中,服务突然停了,那么锁会释放吗?如果不释放的话,就会成为死锁,阻塞了其他线程获取锁。
我们先来看下线程 A 的获取锁后的,Redis 客户端查询到的结果,如下图所示:
image.png
WuKong-lock 有值,而且大家可以看到 TTL 在不断变小,说明 WuKong-lock 是自带过期时间的。
通过观察,经过 30 秒后,WuKong-lock 过期消失了。说明 Redisson 在停机后,占用的锁会自动释放。
image.png
那这又是什么原理呢?这里就要提一个概念了,看门狗。

3.2、看门狗原理

如果负责储存这个分布式锁的 Redisson 节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。
默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
如果我们未制定 lock 的超时时间,就使用 30 秒作为看门狗的默认时间。只要占锁成功,就会启动一个定时任务:每隔 10 秒重新给锁设置过期的时间,过期时间为 30 秒。
如下图所示:
image.png
当服务器宕机后,因为锁的有效期是 30 秒,所以会在 30 秒内自动解锁。(30秒等于宕机之前的锁占用时间+后续锁占用的时间)。
如下图所示:
image.png

3.2、设置锁过期时间

我们也可以通过给锁设置过期时间,让其自动解锁。
如下所示,设置锁 8 秒后自动过期。

  1. lock.lock(8, TimeUnit.SECONDS);

如果业务执行时间超过 8 秒,手动释放锁将会报错,如下图所示:
image.png
所以我们如果设置了锁的自动过期时间,则执行业务的时间一定要小于锁的自动过期时间,否则就会报错。

  1. // 1.设置分布式锁
  2. RLock lock = redisson.getLock("lock");
  3. // 2.占用锁
  4. lock.lock();
  5. // 3.执行业务
  6. ...
  7. // 4.释放锁
  8. lock.unlock();

4、读写锁分布式锁

基于 Redis 的 Redisson 分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了 RLock接口。
写锁是一个排他锁(互斥锁),读锁是一个共享锁。

  • 读锁 + 读锁:相当于没加锁,可以并发读。
  • 读锁 + 写锁:写锁需要等待读锁释放锁。
  • 写锁 + 写锁:互斥,需要等待对方的锁释放。
  • 写锁 + 读锁:读锁需要等待写锁释放。

只要存在写锁就一定需要进行等待锁的释放。
image.png
示例代码如下:

  1. RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
  2. // 最常见的使用方法
  3. rwlock.readLock().lock();
  4. // 或
  5. rwlock.writeLock().lock();

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

  1. // 10秒钟以后自动解锁
  2. // 无需调用unlock方法手动解锁
  3. rwlock.readLock().lock(10, TimeUnit.SECONDS);
  4. // 或
  5. rwlock.writeLock().lock(10, TimeUnit.SECONDS);
  6. // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
  7. boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
  8. // 或
  9. boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
  10. ...
  11. lock.unlock();

5、分布式信号量

基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)反射式(Reactive)RxJava2标准的接口。
关于信号量的使用大家可以想象一下这个场景,有三个停车位,当三个停车位满了后,其他车就不停了。可以把车位比作信号,现在有三个信号,停一次车,用掉一个信号,车离开就是释放一个信号。
我们用 Redisson 来演示上述停车位的场景。
先定义一个占用停车位的方法:

  1. /**
  2. * 停车,占用停车位
  3. * 总共 3 个车位
  4. */
  5. @ResponseBody
  6. @RequestMapping("park")
  7. public String park() throws InterruptedException {
  8. // 获取信号量(停车场)
  9. RSemaphore park = redisson.getSemaphore("park");
  10. // 获取一个信号(停车位)
  11. park.acquire();
  12. return "OK";
  13. }

再定义一个离开车位的方法:

  1. /**
  2. * 释放车位
  3. * 总共 3 个车位
  4. */
  5. @ResponseBody
  6. @RequestMapping("leave")
  7. public String leave() throws InterruptedException {
  8. // 获取信号量(停车场)
  9. RSemaphore park = redisson.getSemaphore("park");
  10. // 释放一个信号(停车位)
  11. park.release();
  12. return "OK";
  13. }

为了简便,我用 Redis 客户端添加了一个 key:“park”,值等于 3,代表信号量为 park,总共有三个值。

6、公平锁与非公平锁

什么叫公平锁:就是比较公平,根据请求锁的顺序排列。也就是先进来得先获取锁,后进来得后获取锁,采用队列进行存放,遵循得原则是先进先出的原则。类似于生活中的排队做某事,比如排队吃饭。

说完了公平锁,我们来说说非公平锁,非公平锁不是根据请求的顺序序排列,谁能够抢到锁就归谁。

new ReentrantLock(true) — 公平锁

new ReentrantLock(fasle)—-非公平锁

我们来用代码演示一遍:

  1. public class LockDemo implements Runnable{
  2. private static int count = 0;
  3. private static Lock lock = new ReentrantLock(false);
  4. @Override
  5. public void run() {
  6. while(count < 1000){
  7. try{
  8. lock.lock();
  9. createCount();
  10. }finally {
  11. lock.unlock();
  12. }
  13. }
  14. }
  15. private void createCount() {
  16. System.out.println(Thread.currentThread().getName() +",coiunt:" + count);
  17. count++;
  18. }
  19. public static void main(String[] args) {
  20. //开始时间
  21. long stratTime = System.currentTimeMillis();
  22. List<Thread> threads = new ArrayList<>();
  23. for (int i = 0; i < 5; i++) {
  24. Thread thread = new Thread(new LockDemo());
  25. thread.start();
  26. threads.add(thread);
  27. }
  28. threads.forEach((t)->{
  29. try {
  30. t.join();
  31. }catch (Exception e){
  32. e.printStackTrace();
  33. }
  34. });
  35. long endTime = System.currentTimeMillis();
  36. System.out.println(endTime - stratTime);
  37. }
  38. }

上面的代码是非公平锁:5个线程看谁获取到锁就谁去操作:
image.png
我们来看下公平锁执行:
image.png

我们来看下运行效率,公平锁用了1067毫秒,非公平锁用了632毫秒:
‘所以非公平锁的运行效率比较高,因为公平锁释放完要对锁进行排列,多以效率比较低。
没有特殊要求情况下,推荐使用非公平锁。
我们看下synchronized锁是公平锁还是非公平锁:

  1. public class LockDemo implements Runnable{
  2. private static int count = 0;
  3. private static Lock lock = new ReentrantLock(false);
  4. @Override
  5. public void run() {
  6. while(count < 100000){
  7. try{
  8. // lock.lock();
  9. createCount();
  10. }finally {
  11. // lock.unlock();
  12. }
  13. }
  14. }
  15. private synchronized void createCount() {
  16. System.out.println(Thread.currentThread().getName() +",coiunt:" + count);
  17. count++;
  18. }
  19. public static void main(String[] args) {
  20. //开始时间
  21. long stratTime = System.currentTimeMillis();
  22. List<Thread> threads = new ArrayList<>();
  23. for (int i = 0; i < 5; i++) {
  24. Thread thread = new Thread(new LockDemo());
  25. thread.start();
  26. threads.add(thread);
  27. }
  28. threads.forEach((t)->{
  29. try {
  30. t.join();
  31. }catch (Exception e){
  32. e.printStackTrace();
  33. }
  34. });
  35. long endTime = System.currentTimeMillis();
  36. System.out.println(endTime - stratTime);
  37. }
  38. }

执行结果是非公平锁:
image.png

7、公平锁(Fair Lock)

基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)反射式(Reactive)RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。

  1. RLock fairLock = redisson.getFairLock("anyLock");
  2. // 最常见的使用方法
  3. fairLock.lock();

大家都知道,如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

  1. // 10秒钟以后自动解锁
  2. // 无需调用unlock方法手动解锁
  3. fairLock.lock(10, TimeUnit.SECONDS);
  4. // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
  5. boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
  6. ...
  7. fairLock.unlock();

Redisson同时还为分布式可重入公平锁提供了异步执行的相关方法:

  1. RLock fairLock = redisson.getFairLock("anyLock");
  2. fairLock.lockAsync();
  3. fairLock.lockAsync(10, TimeUnit.SECONDS);
  4. Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

8、联锁(MultiLock)

基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。
联锁的思想很简单:N个实例都必须获取到锁,有一个失败,即为失败。

  1. RLock lock1 = redissonInstance1.getLock("lock1");
  2. RLock lock2 = redissonInstance2.getLock("lock2");
  3. RLock lock3 = redissonInstance3.getLock("lock3");
  4. RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
  5. // 同时加锁:lock1 lock2 lock3
  6. // 所有的锁都上锁成功才算成功。
  7. lock.lock();
  8. ...
  9. lock.unlock();

大家都知道,如果负责储存某些分布式锁的某些Redis节点宕机以后,而且这些锁正好处于锁住的状态时,这些锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

  1. RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
  2. // 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
  3. lock.lock(10, TimeUnit.SECONDS);
  4. // 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
  5. boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
  6. ...
  7. lock.unlock();

9、红锁(RedLock)

基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
Redis获取分布式锁,其实就是向N个Redis实例中使用SETNX来对该resource设置键值。

  1. RLock lock1 = redissonInstance1.getLock("lock1");
  2. RLock lock2 = redissonInstance2.getLock("lock2");
  3. RLock lock3 = redissonInstance3.getLock("lock3");
  4. RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
  5. // 同时加锁:lock1 lock2 lock3
  6. // 红锁在大部分节点上加锁成功就算成功。
  7. lock.lock();
  8. ...
  9. lock.unlock();

大家都知道,如果负责储存某些分布式锁的某些Redis节点宕机以后,而且这些锁正好处于锁住的状态时,这些锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

  1. RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
  2. // 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
  3. lock.lock(10, TimeUnit.SECONDS);
  4. // 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
  5. boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
  6. ...
  7. lock.unlock();

10、可过期性信号量(PermitExpirableSemaphore)

基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore)是在RSemaphore对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。它提供了异步(Async)反射式(Reactive)RxJava2标准的接口。

  1. RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
  2. String permitId = semaphore.acquire();
  3. // 获取一个信号,有效期只有2秒钟。
  4. String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
  5. // ...
  6. semaphore.release(permitId);

11、闭锁(CountDownLatch)

基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。

  1. RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
  2. latch.trySetCount(1);
  3. latch.await();
  4. // 在其他线程或其他JVM里
  5. RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
  6. latch.countDown();
  • 公平锁(Fair Lock)
  • 联锁(MultiLock)
  • 红锁(RedLock)
  • 读写锁(ReadWriteLock)
  • 可过期性信号量(PermitExpirableSemaphore)
  • 闭锁(CountDownLatch)