Redis 为单进程单线程模式,采用队列模式将并发访问的请求变成串行访问,并且多客户端对 Redis 的访问不存在竞争关系。

    以下将会讲解如何使用 Redis 实现一个可靠的,自旋分布式锁。以及实现的思路,还有实现时会遇到的常见错误。
    当然,这些实现的都是不可重入的。在最后,还会讲一下,实现可重入锁的思路。

    实现原理

    Redis 操作
    Redis 提供了一些基本指令可以用来实现分布式锁,例如
    SET,SENTX,GETSET,INCR,DEL,GET 等操作,以下是对这些指令的基本用法:

    SET key val [NX|XX] [EX seconds | PX milliseconds]
    // 将字符串值key 关联到 value。成功后,返回值为”OK”。后面有两个可选参数
    // 可选参数 NX|XX:NX表示只在键不存在时,才对键进行操作,缺省方式是NX。XX表示只在键存在时对键进行操作
    // 可选参数 EX|PX:键过期的时间单位,后面跟长整型数字表示过期时间。EX表示秒,PX表示毫秒。缺省不设置过期时间。

    SETNX key val
    // 当且仅当key值不存在,将key对应的值设置为value,并且返回1,否则不做任何操作,返回0

    GETSET key val
    // 获取key的旧值,并且将新的value放入

    INCR key
    // 将key中存储的数字自增1并且返回结果。

    DEL key
    // 将对应Key的值删除

    锁的可靠性
    为了确保分布式锁可用,我们至少要确保锁的可靠性,要满足一下四个条件:

    1)互斥性,在任意时刻,只能有一个客户端(或者说业务请求)获得锁,并且也只能由该客户端请求解锁成功。
    2)避免死锁,即使获取了锁的客户端崩溃没有释放锁,也要保证锁正常过期,后续的客户端能正常加锁。
    3)容错性,只要大部分 Redis 节点可用,客户端就能正常加锁。
    4)自旋重试,获取不到锁时,不要直接返回失败,而是支持一定的周期自旋重试,设置一个总的超时时间,当过了超时时间以后还没有获取到锁则返回失败。(这一点很重要,我发现网上很多方案并没有把这个功能加上,只尝试一次加锁请求失败就返回了,加了自旋重试更好一些)

    参数设置
    这里有三个参数需要考虑,一般来说,设定的值,需要根据实际场景来判断:

    锁的过期时间 (EXPIRE_TIME)
    太短可能过早的释放锁,造成数据安全问题。太长的话,如果客户端挂掉,会长时间无法释放锁,导致其他客户端锁请求阻塞或者失败(这种场景太少见)
    我们一般会预估一下加锁需要进行的操作最长耗时,然后在最长耗时基础上再加一个 buffer 的时间来确定。(buffer 比例多少不确定,这个自行判断吧)需要保证锁在任务执行完之前不会过期。

    自旋间隔时间 (WAIT_INTERVAL)
    适当间隔就好,一般是 50~100ms

    获取锁的超时时间 (ACCQUIRE_TIME_OUT)
    在激烈的竞争环境下,超时时间设置太短会导致失败次数显著增加。建议至少设置成和锁的过期时间一样。

    如何实现

    代码示例
    首先是代码示例,以下是使用了两种方式实现的 Redis 锁:
    第一种方式是利用了 Redis 的 SET key value [NX|XX] [EX seconds | PX milliseconds]
    第二种方式利用了 Redis 的 SETNX key value 和 GETSET key value

    1. /**
    2. * @Author Antony
    3. * @Since 2018/5/25 22:48
    4. */
    5. public class RedisLock {
    6. private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
    7. private static final String LOCK_SUCCESS = "OK";
    8. private static final String SET_IF_NOT_EXIST = "NX";
    9. private static final String SET_WITH_EXPIRE_TIME_SECOND = "EX";
    10. private static final int ACQUIRE_LOCK_TIME_OUT_IN_MS = 5*1000;//获取锁超时时间
    11. private static final int EXPIRE_IN_SECOND = 5; //锁超时时间
    12. private static final int WAIT_INTERVAL_IN_MS = 100; //自旋重试间隔
    13. private static JedisPool jedisPool = JedisPoolFactory.getJedisPool();
    14. /**
    15. * 使用 set key value expireTime 获取锁
    16. * @param lockKey
    17. * @return
    18. */
    19. public static boolean tryLockWithSet(String lockKey){
    20. boolean flag = false;
    21. long timeoutAt = System.currentTimeMillis() + ACQUIRE_LOCK_TIME_OUT_IN_MS; //此次获取锁的超时时间点
    22. try (Jedis jedis = jedisPool.getResource()){
    23. String result;
    24. while (true) {
    25. long now = System.currentTimeMillis();
    26. if(timeoutAt < now){
    27. break;
    28. }
    29. result = jedis.set(lockKey, "", SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME_SECOND, EXPIRE_IN_SECOND);
    30. if(LOCK_SUCCESS.equals(result)){
    31. flag = true;
    32. return flag;
    33. }
    34. TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);
    35. }
    36. } catch (InterruptedException e) {
    37. logger.error("accquire redis lock error...", e);
    38. e.printStackTrace();
    39. }
    40. if(!flag){
    41. logger.error("cannot accquire redis lock...");
    42. }
    43. return flag;
    44. }
    45. /**
    46. * 使用 setnx 和 getset 方式获取锁
    47. * @param lockKey
    48. * @return
    49. */
    50. public static boolean tryLockWithSetnx(String lockKey){
    51. boolean flag = false;
    52. try (Jedis jedis = jedisPool.getResource()) {
    53. long timeoutAt = System.currentTimeMillis() + ACQUIRE_LOCK_TIME_OUT_IN_MS; //此次获取锁的超时时间点
    54. while (true){
    55. long now = System.currentTimeMillis();
    56. if(timeoutAt < now){
    57. break;
    58. }
    59. String expireAt = String.valueOf(now + EXPIRE_IN_SECOND*1000); //过期时间戳作为value
    60. long ret = jedis.setnx(lockKey, expireAt);
    61. if(ret == 1){//已取得锁
    62. flag = true;
    63. return flag;
    64. }else {
    65. // 未获取锁,尝试重新获取
    66. // 此处使用double check 的思想,防止多线程同时竞争到锁
    67. // 1) 先获取上一个锁的过期时间,校验当前是否过期。
    68. // 2) 如果过期了,尝试使用getset方式获取锁。此处可能存在多个线程同时执行到的情况。
    69. // 3) getset更新过期时间,并且获取上一个锁的过期时间。
    70. // 4) 如果getset获取到的oldExpireAt 已过期,说明获取锁成功。
    71. // 如果和当前比未过期,说明已经有另一个线程提前获取到了锁
    72. // 这样也没问题,只是短暂的将上一个锁稍微延后一点时间(只有在A和B线程同时执行到getset时,才会出现,延长的时间很短)
    73. String oldExpireAt = jedis.get(lockKey);
    74. if(oldExpireAt != null && Long.valueOf(oldExpireAt) < now){
    75. oldExpireAt = jedis.getSet(lockKey, expireAt);
    76. if(Long.parseLong(oldExpireAt) < now){
    77. flag = true;
    78. return flag;
    79. }
    80. }
    81. }
    82. TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);
    83. }
    84. } catch (InterruptedException e) {
    85. logger.error("accquire redis lock error...", e);
    86. e.printStackTrace();
    87. }
    88. if(!flag){
    89. logger.error("cannot accquire redis lock...");
    90. }
    91. return flag;
    92. }
    93. /**
    94. * 释放锁
    95. * @param lockKey
    96. */
    97. public static void unLock(String lockKey){
    98. try (Jedis jedis = jedisPool.getResource()) {
    99. jedis.del(lockKey);
    100. }
    101. }
    102. }

    思路详解
    1)第一种方式,tryLockWithSet 是使用了 Redis set 的同时指定过期时间的功能。
    这个方式的特点就是,简单有效,并且只有一个指令操作。一般也推荐这么使用。

    注意,有一种常见的错误方式是使用 setnx 和 expire 组合实现加锁,这是两个操作,并没有保证原子性。如果客户端在 setnx 之后崩溃,那么将导致锁无法释放。
    错误代码如下:

    1. Long result = jedis.setnx(lockKey, requestId);
    2. if (result == 1) {
    3. // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
    4. jedis.expire(lockKey, expireTime);
    5. }

    2)第二种方式,tryLockWithSetnx,是把锁的过期时间,当做 value 存储起来。
    这个方式,解决了刚才提出的 setnx 和 expire 操作无法保证原子性的问题,虽然使用了 setnx 操作,但是没有给 redis 的 key 设置过期时间。而是把该锁的过期时间作为 value 保存,在获取锁的时候判断是否过期期并抢占锁。这就需要保证 各个客户端的系统时间都严格一致,不然锁的持有时间就无法真正保证。

    在这里简单解释一下部分核心逻辑,主要是获取锁失败的重试阶段:

    如果锁获取失败,代表当前已有其他客户端持有锁,那么就根据 key 获取 value,得到该锁的过期时间和当前时间比较,可以知道是锁否过期。如果没过期,则进入下一个自旋。

    如果过期,则使用 getset 操作,尝试抢占锁。该操作将当前锁的过期时间放入,成功后将旧值返回,并进行再一次 check,确认是否拿到锁。
    注意:这里可能会出现竞争,两个线程 get 到旧值后都判断过期,然后都执行了 getset 操作。
    关键在于:getset 拿到的 value 后,进行的再一次 check,和当前时间判断,替换掉的旧值是否是已过期的值。如果小于当前时间,则表示替换掉的是已过期的锁。获取锁成功。如果判断没有小于,则表示替换掉的是另一个线程设置进去的值,进入下一个自旋。
    尽管执行成功了 getset 操作,这也只是将上一个成功拿到的锁过期时间稍微延迟,这个延迟时间很小,可以忽略不计。

    举个栗子:
    线程 A 和 B 尝试 setnx 失败,然后同时拿到了 value,并且都发现过期,然后都尝试进行 getset 操作。A 线程先执行了 getset 操作,获取锁成功。B 线程后执行了 getset 操作,那么 B 执行的就是把 A 的过期时间拿到,然后把自己的过期时间设置过去。这样的操作相当于把 A 的锁过期时间重置。
    由于 A 和 B 同时到达了竞态条件,那么这两个尝试设置的过期时间也不会相差太大,差别可以忽略不计。

    可重入锁
    上面的实现方式,都是不可重入的分布式锁,任何重入锁的尝试都会导致死锁的发生。导致响应超时。

    那么,要实现分布式锁的可重入,那就需要设计的可以存储更多信息。
    目前我知道的有两种方式(只提供思路):

    1)此种方式实现较为简单:value 中多存储一个 全局唯一的 requestId,代表客户端请求标识。具体可以使用 UUID。在重入的情况下使用同一个 UUID,就能判断是否是一个请求的锁重入,从而获取锁。

    2)存储锁的重入次数,以及分布式环境下唯一的线程标识。
    如何在分布式线程中标识唯一线程:
    MAC 地址 + jvm 进程 ID + 线程 ID(或者线程地址都行), 三者结合即可唯一分布式环境中的线程。
    锁的信息采用 json,存储格式如下:

    1. {
    2. "count":1,
    3. "expireAt":147506817232,
    4. "jvmPid":22224,
    5. "mac":"28-D2-44-0E-0D-9A",
    6. "threadId":14
    7. }

    参考资料:
    Redis,Zk 分布式锁的实现与区别
    Redis 实现分布式锁,以及如何处理超时情况
    Redis 分布式锁思考
    Redis 分布式锁处理并发问题
    Redis 分布式锁的正确实现方式 —— 阿里云栖社区
    基于 Redis 的分布式锁到底安全吗 —— 多节点 Redis 锁的讨论
    Java 实现基于 redis 的分布式可重入锁

    作者:_Zy
    链接:https://www.jianshu.com/p/1c5c1a592088
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。