1.Overview

Ref: https://juejin.cn/post/6844903830442737671
在分布式环境下,保证不同节点的线程同步执行,分布式锁,它是控制分布式系统之间互斥访问共享资源
Redis 分布式锁 - 图1
分布式锁一般有如下的特点:

  • 互斥性: 同一时刻只能有一个线程持有锁
  • 可重入性: 同一节点上的同一个线程如果获取了锁之后能够再次获取锁
  • 锁超时:和 J.U.C 中的锁一样支持锁超时,防止死锁
  • 高性能和高可用: 加锁和解锁需要高效,同时也需要保证高可用,防止分布式锁失效
  • 具备阻塞和非阻塞性:能够及时从阻塞状态中被唤醒

一般实现分布式锁有以下几种方式:

  • 基于数据库
  • 基于Redis
  • 基于zookeeper

    2.Redis实现

    2.1 setnx + expire (ERROR)

    setnx和expire是分开的两步操作,不具有原子性,如果执行完第一条指令应用异常或者重启了,锁将无法过期。
    一种改善方案就是使用Lua脚本来保证原子性(包含setnx和expire两条指令):

    2.2 Redis v2.1.6

    1. # 为 SET 命令增加一系列选项
    2. SET key value[EX seconds][PX milliseconds][NX|XX]
  • EX seconds: 设定过期时间,单位为秒

  • PX milliseconds: 设定过期时间,单位为毫秒
  • NX: 仅当 key 不存在时设置值,等同于 setnx 命令
  • XX: 仅当 key 存在时设置值

使用:

2.3 value 具备唯一性

value 必须要具有唯一性,可以用 UUID 设置随机字符串来保证。
假如 value 是一个固定值,那么就可能存在下面的问题:

  1. A 获取锁成功
  2. A 在某个操作上阻塞了太长时间
  3. 设置的 ke y过期了,锁自动释放了
  4. B 获取到了对应同一个资源的锁
  5. A 从阻塞中恢复过来,因为 value 值一样,所以执行释放锁操作时会释放掉 B 持有的锁,引发问题

通常在释放锁时,需要对value进行验证,判断锁是否是自己的,代码如下:
使用 Lua 脚本的方式,尽量保证原子性。

3.RedLock

Ref: Redis分布式锁最牛逼的实现
在 Redis 集群使用 set key value [EX seconds][PX milliseconds][NX|XX] 命令的时候也会出现问题。
这类琐最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

  1. 在Redis的master节点上拿到了锁;
  2. 但是这个加锁的key还没有同步到slave节点;
  3. master故障,发生故障转移,slave节点升级为master节点;
  4. 导致锁丢失。

Redis 作者 antirez 基于分布式环境下提出了一种更高级的分布式锁的实现 Redlock.

3.1 原理

在 Redis 的分布式环境中,我们假设有 N 个 Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。我们确保将在 N 个实例上使用与在 Redis 单实例下相同方法获取和释放锁。现在我们假设有 5 个 Redis master 节点,同时我们需要在5台服务器上面运行这些 Redis 实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:

  • 获取当前 Unix 时间,以毫秒为单位。
  • 依次尝试从 5 个实例,使用相同的 key 和具有唯一性的 value(例如UUID)获取锁。当向 Redis 请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端持续阻塞等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个 Redis 实例请求获取锁。
  • 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功
  • 如果取到了锁,key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
  • 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

如果业务逻辑没执行完,然后redis key超时了,会有监控线程去重新设置这个时间,为锁续命。

3.2 用法

3.3 源码

获取锁

获取锁的代码为 redLock.tryLock() 或者 redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),两者的最终核心源码都是下面这段代码,只不过前者获取锁的默认租约时间 leaseTime 是 LOCK_EXPIRATION_INTERVAL_SECONDS, 即 30s:

  1. <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,
  2. long threadId, RedisStrictCommand<T> command) {
  3. internalLockLeaseTime = unit.toMillis(leaseTime);
  4. // 获取锁时向5个redis实例发送的命令
  5. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
  6. // 首先分布式锁的KEY不能存在,如果确实不存在,那么执行hset命令
  7. // (hset REDLOCK_KEY uuid+threadId 1) 并通过pexpire设置失效时间(也是锁的租约时间)
  8. "if (redis.call('exists', KEYS[1]) == 0) then " +
  9. "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  10. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  11. "return nil; " +
  12. "end; " +
  13. // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,
  14. // 那么重入次数加1,并且设置失效时间
  15. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  16. "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  17. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  18. "return nil; " +
  19. "end; " +
  20. // 获取分布式锁的KEY的失效时间毫秒数
  21. "return redis.call('pttl', KEYS[1]);",
  22. // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
  23. Collections.<Object>singletonList(getName()),
  24. internalLockLeaseTime,
  25. getLockName(threadId)
  26. );
  27. }

获取锁的命令中,

  • KEYS[1] 就是Collections.singletonList(getName()),表示分布式锁的key,即REDLOCK_KEY;
  • ARGV[1] 就是internalLockLeaseTime,即锁的租约时间,默认30s;
  • ARGV[2] 就是getLockName(threadId),是获取锁时set的唯一值,即UUID+threadId:

    释放锁

    ```java protected RFuture unlockInnerAsync(long threadId) { // 向5个redis实例都执行如下命令 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE,
    1. RedisCommands.EVAL_BOOLEAN,
    2. // 如果分布式锁KEY不存在,那么向channel发布一条消息
    3. "if (redis.call('exists', KEYS[1]) == 0) then " +
    4. "redis.call('publish', KEYS[2], ARGV[1]); " +
    5. "return 1; " +
    6. "end;" +
    7. // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
    8. "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
    9. "return nil;" +
    10. "end; " +
    11. // 如果就是当前线程占有分布式锁,那么将重入次数减1
    12. "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
    13. // 重入次数减1后大于0,表示分布式锁有重入过,设置失效时间,不删除
    14. "if (counter > 0) then " +
    15. "redis.call('pexpire', KEYS[1], ARGV[2]); " +
    16. "return 0; " +
    17. "else " +
    18. // 重入次数减1后为0,表示分布式锁只获取过1次,
    19. // 那么删除这个KEY,并发布解锁消息
    20. "redis.call('del', KEYS[1]); " +
    21. "redis.call('publish', KEYS[2], ARGV[1]); " +
    22. "return 1; "+
    23. "end; " +
    24. "return nil;",
    25. // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
    26. Arrays.<Object>asList(getName(), getChannelName()),
    27. LockPubSub.unlockMessage,
    28. internalLockLeaseTime,
    29. getLockName(threadId));

} ```