备注:内容来自《redis深度历险》可放心食用。

1.引言

我们在系统中修改已有数据时,需要先读取,然后进行修改保存,此时很容易遇到并发问题。由于修改和保存都不是原子操作(所谓原子操作是指不会被线程调度机制打断的操作这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch 线程切换),在并发场景下,部分对数据的操作可能会丢失。在单服务器系统可以使用本地锁来避免并发带来的问题,然而,当服务采用集群方式部署时,本地锁无法在多个服务器之间生效,这时候保证数据的一致性就需要使用分布式锁。
image.png
很多博客都在介绍分布式锁的原理,但是具体到细节的使用上往往并不完全正确。

2.实现

redis锁主要利用redis的setnx命令。
加锁命令:SETNX key value,当键不存在时,对键进行设置操作并返回成功,否则返回失败。KEY是锁的唯一标识,一般按照业务来决定命名
解锁命令:DEL key,通过删除键值对释放锁,以便其他线程可以通过SETNX命令来获取锁。
锁超时:EXPIRE key timeout,设置key的超时时间,以保证即使没有锁被显示释放,锁也可以在一定时间后自动释放,避免资源被永远锁住。
image.png
但是以上逻辑还有问题,如果在setnx和expire之间服务器进程突然挂掉了,可能是因为机器宕机或者进程被kill,就会导致expire得不到执行,也会造成死锁。
这种问题的根源在于setnx和expire是两条指令,而不是原子指令。Redis 2.8 版本中作者加入了 set 指令的扩展参数,使得 setnx 和 expire 指令可以一起执行,彻底解决了分布式锁的乱象。

锁误解除

Redis的分布式锁不能解决超时问题,如果在加锁和释放锁之间的逻辑执行的太长,以至于超出了锁的超时限制,就会出现问题,因为这时候锁过期了,第二个线程重新持有了这把锁,但是紧接着第一个线程执行完了业务逻辑就把锁给释放了,第三个线程就会在第二个线程逻辑执行完之间拿到了锁。(如果线程 A 成功获取到了锁,并且设置了过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放,此时线程 B 获取到了锁;随后 A 执行完成,线程 A 使用 DEL 命令来释放锁,但此时线程 B 加的锁还没有执行完成,线程 A 实际释放的线程 B 加的锁。)
image.png
为了避免这个问题,redis分布式锁不适合用于较长时间的任务,如果真的需要,数据出现的小波错乱可能需要人工介入。

  1. tag = random.nextint() #随机数
  2. if redis.set(key,tag,nx=True,ex=5):
  3. do_something()
  4. redis.del_if_equals(key,tag)

有一个更加安全的方案,通过在value中设置当前线程加锁的标识,在删除之前验证key对应的value判断锁是否是当前线程持有。可生成一个UUID标识当前线程,使用lua脚本做验证标识和解锁操作。

  1. // 加锁
  2. String uuid = UUID.randomUUID().toString.replaceAll("-","")
  3. SET key uuid NX EX 30
  4. //解锁
  5. if(redis.call('get',KEYS[1]==ARGV[1])) then
  6. return redis.call('del',KEYS[1])
  7. else return 0
  8. end

这样增加了双保险,可以保证锁不会被其他线程释放。

超时解锁导致并发

这种问题,跟上个问题很类似,可以说是基本一类问题,解决方案可以互相参考。如果线程 A 成功获取锁并设置过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放,此时线程 B 获取到了锁,线程 A 和线程 B 并发执行。

A、B 两个线程发生并发显然是不被允许的,一般有两种方式解决该问题:

  • 将过期时间设置足够长,确保代码逻辑在锁释放之前能够执行完成。
  • 为获取锁的线程增加守护线程,为将要过期但未释放的锁增加有效时间。(rediseesion)

image.png

不可重入

当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis 可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0 时释放锁。
在本地记录记录重入次数,如 Java 中使用 ThreadLocal 进行重入次数统计,简单示例代码:

  1. private static ThreadLocal<Map<String, Integer>> LOCKERS = ThreadLocal.withInitial(HashMap::new);
  2. // 加锁
  3. public boolean lock(String key) {
  4. Map<String, Integer> lockers = LOCKERS.get();
  5. if (lockers.containsKey(key)) {
  6. lockers.put(key, lockers.get(key) + 1);
  7. return true;
  8. } else {
  9. if (SET key uuid NX EX 30) {
  10. lockers.put(key, 1);
  11. return true;
  12. }
  13. }
  14. return false;
  15. }
  16. // 解锁
  17. public void unlock(String key) {
  18. Map<String, Integer> lockers = LOCKERS.get();
  19. if (lockers.getOrDefault(key, 0) <= 1) {
  20. lockers.remove(key);
  21. DEL key
  22. } else {
  23. lockers.put(key, lockers.get(key) - 1);
  24. }
  25. }

本地记录重入次数虽然高效,但如果考虑到过期时间和本地、Redis 一致性的问题,就会增加代码的复杂性。另一种方式是 Redis Map 数据结构来实现分布式锁,既存锁的标识也对重入次数进行计数。Redission 加锁示例:

// 如果 lock_key 不存在
if (redis.call('exists', KEYS[1]) == 0)
then
    // 设置 lock_key 线程标识 1 进行加锁
    redis.call('hset', KEYS[1], ARGV[2], 1);
    // 设置过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
    end;
// 如果 lock_key 存在且线程标识是当前欲加锁的线程标识
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
    // 自增
    then redis.call('hincrby', KEYS[1], ARGV[2], 1);
    // 重置过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
    end;
// 如果加锁失败,返回锁剩余时间
return redis.call('pttl', KEYS[1]);

无法等待锁释放

上述命令执行都是立即返回的,如果客户端可以等待锁释放就无法使用。

  • 可以通过客户端轮询的方式解决该问题,当未获取到锁时,等待一段时间重新获取锁,直到成功获取锁或等待超时。这种方式比较消耗服务器资源,当并发量比较大时,会影响服务器的效率。
  • 另一种方式是使用 Redis 的发布订阅功能,当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送锁释放消息。如下:

image.png