原子性问题
Redis 分布式锁就是在 Redis 里面占一个坑,当别的进程来占坑时,就会放弃或重试。
占坑一般使用 setnx(set if not exists)命令,只允许一个客户端占坑,用完了调用 del。
异常会导致 del 指令没有被调用,这样就会死锁,所以要加上 过期时间,这样即使出现异常,过期时间后也会释放锁。
但是 setnx 与 expire 是两条指令,也不能保证原子性,所以要使用 set 的扩展参数保证原子性(redis 2.8后支持)
防止超时后释放其他线程的锁
value 设置一个随机数,释放锁时匹配随机数是否一致,一致再删除 key。
不一致说明锁已经被其他线程抢占,不能删除。
redis 内部集成了 lua 解释器
释放锁逻辑需要用 lua 脚本完成:
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
java:
public class RedisLock implements AutoCloseable {
private StringRedisTemplate redisTemplate;
private String key;
private String value;
private int expireSeconds;
public RedisLock(StringRedisTemplate redisTemplate, String key, int expireSeconds) {
this.redisTemplate = redisTemplate;
this.key = key;
this.value = UUID.randomUUID().toString();
this.expireSeconds = expireSeconds;
}
public Boolean getLock() {
return redisTemplate.opsForValue().setIfAbsent(key, value, expireSeconds, TimeUnit.SECONDS);
}
public Boolean unLock() {
String script = "if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n" +
" return redis.call(\"del\", KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
RedisScript<Boolean> redisScript = RedisScript.of(script, Boolean.class);
List<String> keys = Collections.singletonList(key);
return redisTemplate.execute(redisScript, keys, value);
}
@Override
public void close() throws Exception {
unLock();
}
}
使用
String key = "redisKey";
try (RedisLock redisLock = new RedisLock(redisTemplate, key, 30)) {
if (redisLock.getLock()) {
log.info("进入了锁!");
Thread.sleep(15000);
}
} catch (Exception e) {
e.printStackTrace();
}
log.info("方法执行完成");
超时问题
如果加锁与释放锁之间的执行超过了时间限制,就会出问题,可能会被其他线程重新锁住。
所以 Redis 分布式锁不能用于较长时间的任务,如果真出了错,最终就需要人工介入。
可重入性
Redis 分布式锁如果要支持可重入,需要对客户端的 set 方法进行包装,使用线程的 Threadlocal 变量存储当前持有锁的计数。
可重入锁增加了逻辑复杂度,
在编写业务方法时在逻辑上进行调整,完全可以不使用可重入锁。