简介
分布式锁本质上要实现的目标就是在 Redis 里面占一个“茅坑”,当别的进程也要来占时,发现已经有人蹲在那里了,就只好放弃或者稍后再试。
占坑一般是使用 setnx(set if not exists) 指令,只允许被一个客户端占坑。先来先占, 用完了,再调用 del 指令释放茅坑。
但是有个问题,如果逻辑执行到中间出现异常了,可能会导致 del 指令没有被调用,这样就会陷入死锁,锁永远得不到释放。于是我们在拿到锁之后,再给锁加上一个过期时间,比如 5s,这样即使中间出现异常也
可以保证 5 秒之后锁会自动释放。
超时问题
Redis 的分布式锁不能解决超时问题,如果在加锁和释放锁之间的逻辑执行的太长,以至于超出了锁的超时限制,就会出现问题。因为这时候锁过期了,第二个线程重新持有了这把锁,但是紧接着第一个线程执行完了业务逻辑,就把锁给释放了,第三个线程就会在第二个线程逻辑执行完之间拿到了锁。
有一个更加安全的方案是为 set 指令的 value 参数设置为一个随机数,释放锁时先匹配随机数是否一致,然后再删除 key。但是匹配 value 和删除 key 不是一个原子操作,Redis 也没有提供类似于 delifequals 这样的指令,这就需要使用 Lua 脚本来处理了,因为 Lua 脚本可以保证连续多个指令的原子性执行。
# delifequals
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
可重入性
可重入性是指线程在持有锁的情况下再次请求加锁,如果一个锁支持同一个线程的多次加锁,那么这个锁就是可重入的。比如 Java 语言里有个 ReentrantLock 就是可重入锁。Redis 分布式锁如果要支持可重入,需要对客户端的 set 方法进行包装,使用线程的 Threadlocal 变量存储当前持有锁的计数。
public class RedisWithReentrantLock {
private ThreadLocal<Map> lockers = new ThreadLocal<>();
private Jedis jedis;
public RedisWithReentrantLock(Jedis jedis) {
this.jedis = jedis;
}
private boolean _lock(String key) {
return jedis.set(key, "", "nx", "ex", 5L) != null;
}
private void _unlock(String key) {
jedis.del(key);
}
private Map <String, Integer> currentLockers() {
Map <String, Integer> refs = lockers.get();
if (refs != null) {
return refs;
}
lockers.set(new HashMap<>());
return lockers.get();
}
public boolean lock(String key) {
Map refs = currentLockers();
Integer refCnt = refs.get(key);
if (refCnt != null) {
refs.put(key, refCnt + 1);
return true;
}
boolean ok = this._lock(key);
if (!ok) {
return false;
}
refs.put(key, 1);
return true;
}
public boolean unlock(String key) {
Map refs = currentLockers();
Integer refCnt = refs.get(key);
if (refCnt == null) {
return false;
}
refCnt -= 1;
if (refCnt > 0) {
refs.put(key, refCnt);
} else {
refs.remove(key);
this ._unlock(key);
}
return true;
}
}
锁冲突处理
客户端在处理请求时加锁没加成功怎么办。
一般有 3 种策略来处理加锁失败:
1、直接抛出异常,通知用户稍后重试;
2、sleep 一会再重试;
3、将请求转移至延时队列,过一会再试;
直接抛出特定类型的异常
这种方式比较适合由用户直接发起的请求,用户看到错误对话框后,会先阅读对话框的内
容,再点击重试,这样就可以起到人工延时的效果。如果考虑到用户体验,可以由前端的代码
替代用户自己来进行延时重试控制。它本质上是对当前请求的放弃,由用户决定是否重新发起
新的请求。
sleep
sleep 会阻塞当前的消息处理线程,会导致队列的后续消息处理出现延迟。如果碰撞的比
较频繁或者队列里消息比较多,sleep 可能并不合适。如果因为个别死锁的 key 导致加锁不成
功,线程会彻底堵死,导致后续消息永远得不到及时处理。
延时队列
这种方式比较适合异步消息处理,将当前冲突的请求扔到另一个队列延后处理以避开冲突。
延时队列