Redis 为单进程单线程模式,采用队列模式将并发访问的请求变成串行访问,并且多客户端对 Redis 的访问不存在竞争关系。
以下将会讲解如何使用 Redis 实现一个可靠的,自旋分布式锁。以及实现的思路,还有实现时会遇到的常见错误。
当然,这些实现的都是不可重入的。在最后,还会讲一下,实现可重入锁的思路。
实现原理
Redis 操作
Redis 提供了一些基本指令可以用来实现分布式锁,例如
SET,SENTX,GETSET,INCR,DEL,GET 等操作,以下是对这些指令的基本用法:
SET key val [NX|XX] [EX seconds | PX milliseconds]
// 将字符串值key 关联到 value。成功后,返回值为”OK”。后面有两个可选参数
// 可选参数 NX|XX:NX表示只在键不存在时,才对键进行操作,缺省方式是NX。XX表示只在键存在时对键进行操作
// 可选参数 EX|PX:键过期的时间单位,后面跟长整型数字表示过期时间。EX表示秒,PX表示毫秒。缺省不设置过期时间。SETNX key val
// 当且仅当key值不存在,将key对应的值设置为value,并且返回1,否则不做任何操作,返回0GETSET key val
// 获取key的旧值,并且将新的value放入INCR key
// 将key中存储的数字自增1并且返回结果。DEL key
// 将对应Key的值删除
锁的可靠性
为了确保分布式锁可用,我们至少要确保锁的可靠性,要满足一下四个条件:
1)互斥性,在任意时刻,只能有一个客户端(或者说业务请求)获得锁,并且也只能由该客户端请求解锁成功。
2)避免死锁,即使获取了锁的客户端崩溃没有释放锁,也要保证锁正常过期,后续的客户端能正常加锁。
3)容错性,只要大部分 Redis 节点可用,客户端就能正常加锁。
4)自旋重试,获取不到锁时,不要直接返回失败,而是支持一定的周期自旋重试,设置一个总的超时时间,当过了超时时间以后还没有获取到锁则返回失败。(这一点很重要,我发现网上很多方案并没有把这个功能加上,只尝试一次加锁请求失败就返回了,加了自旋重试更好一些)
参数设置
这里有三个参数需要考虑,一般来说,设定的值,需要根据实际场景来判断:
锁的过期时间 (EXPIRE_TIME)
太短可能过早的释放锁,造成数据安全问题。太长的话,如果客户端挂掉,会长时间无法释放锁,导致其他客户端锁请求阻塞或者失败(这种场景太少见)
我们一般会预估一下加锁需要进行的操作最长耗时,然后在最长耗时基础上再加一个 buffer 的时间来确定。(buffer 比例多少不确定,这个自行判断吧)需要保证锁在任务执行完之前不会过期。
自旋间隔时间 (WAIT_INTERVAL)
适当间隔就好,一般是 50~100ms
获取锁的超时时间 (ACCQUIRE_TIME_OUT)
在激烈的竞争环境下,超时时间设置太短会导致失败次数显著增加。建议至少设置成和锁的过期时间一样。
如何实现
代码示例
首先是代码示例,以下是使用了两种方式实现的 Redis 锁:
第一种方式是利用了 Redis 的 SET key value [NX|XX] [EX seconds | PX milliseconds]
第二种方式利用了 Redis 的 SETNX key value 和 GETSET key value
/**
* @Author Antony
* @Since 2018/5/25 22:48
*/
public class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME_SECOND = "EX";
private static final int ACQUIRE_LOCK_TIME_OUT_IN_MS = 5*1000;//获取锁超时时间
private static final int EXPIRE_IN_SECOND = 5; //锁超时时间
private static final int WAIT_INTERVAL_IN_MS = 100; //自旋重试间隔
private static JedisPool jedisPool = JedisPoolFactory.getJedisPool();
/**
* 使用 set key value expireTime 获取锁
* @param lockKey
* @return
*/
public static boolean tryLockWithSet(String lockKey){
boolean flag = false;
long timeoutAt = System.currentTimeMillis() + ACQUIRE_LOCK_TIME_OUT_IN_MS; //此次获取锁的超时时间点
try (Jedis jedis = jedisPool.getResource()){
String result;
while (true) {
long now = System.currentTimeMillis();
if(timeoutAt < now){
break;
}
result = jedis.set(lockKey, "", SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME_SECOND, EXPIRE_IN_SECOND);
if(LOCK_SUCCESS.equals(result)){
flag = true;
return flag;
}
TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);
}
} catch (InterruptedException e) {
logger.error("accquire redis lock error...", e);
e.printStackTrace();
}
if(!flag){
logger.error("cannot accquire redis lock...");
}
return flag;
}
/**
* 使用 setnx 和 getset 方式获取锁
* @param lockKey
* @return
*/
public static boolean tryLockWithSetnx(String lockKey){
boolean flag = false;
try (Jedis jedis = jedisPool.getResource()) {
long timeoutAt = System.currentTimeMillis() + ACQUIRE_LOCK_TIME_OUT_IN_MS; //此次获取锁的超时时间点
while (true){
long now = System.currentTimeMillis();
if(timeoutAt < now){
break;
}
String expireAt = String.valueOf(now + EXPIRE_IN_SECOND*1000); //过期时间戳作为value
long ret = jedis.setnx(lockKey, expireAt);
if(ret == 1){//已取得锁
flag = true;
return flag;
}else {
// 未获取锁,尝试重新获取
// 此处使用double check 的思想,防止多线程同时竞争到锁
// 1) 先获取上一个锁的过期时间,校验当前是否过期。
// 2) 如果过期了,尝试使用getset方式获取锁。此处可能存在多个线程同时执行到的情况。
// 3) getset更新过期时间,并且获取上一个锁的过期时间。
// 4) 如果getset获取到的oldExpireAt 已过期,说明获取锁成功。
// 如果和当前比未过期,说明已经有另一个线程提前获取到了锁
// 这样也没问题,只是短暂的将上一个锁稍微延后一点时间(只有在A和B线程同时执行到getset时,才会出现,延长的时间很短)
String oldExpireAt = jedis.get(lockKey);
if(oldExpireAt != null && Long.valueOf(oldExpireAt) < now){
oldExpireAt = jedis.getSet(lockKey, expireAt);
if(Long.parseLong(oldExpireAt) < now){
flag = true;
return flag;
}
}
}
TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);
}
} catch (InterruptedException e) {
logger.error("accquire redis lock error...", e);
e.printStackTrace();
}
if(!flag){
logger.error("cannot accquire redis lock...");
}
return flag;
}
/**
* 释放锁
* @param lockKey
*/
public static void unLock(String lockKey){
try (Jedis jedis = jedisPool.getResource()) {
jedis.del(lockKey);
}
}
}
思路详解
1)第一种方式,tryLockWithSet 是使用了 Redis set 的同时指定过期时间的功能。
这个方式的特点就是,简单有效,并且只有一个指令操作。一般也推荐这么使用。
注意,有一种常见的错误方式是使用 setnx 和 expire 组合实现加锁,这是两个操作,并没有保证原子性。如果客户端在 setnx 之后崩溃,那么将导致锁无法释放。
错误代码如下:
Long result = jedis.setnx(lockKey, requestId);
if (result == 1) {
// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
jedis.expire(lockKey, expireTime);
}
2)第二种方式,tryLockWithSetnx,是把锁的过期时间,当做 value 存储起来。
这个方式,解决了刚才提出的 setnx 和 expire 操作无法保证原子性的问题,虽然使用了 setnx 操作,但是没有给 redis 的 key 设置过期时间。而是把该锁的过期时间作为 value 保存,在获取锁的时候判断是否过期期并抢占锁。这就需要保证 各个客户端的系统时间都严格一致,不然锁的持有时间就无法真正保证。
在这里简单解释一下部分核心逻辑,主要是获取锁失败的重试阶段:
如果锁获取失败,代表当前已有其他客户端持有锁,那么就根据 key 获取 value,得到该锁的过期时间和当前时间比较,可以知道是锁否过期。如果没过期,则进入下一个自旋。
如果过期,则使用 getset 操作,尝试抢占锁。该操作将当前锁的过期时间放入,成功后将旧值返回,并进行再一次 check,确认是否拿到锁。
注意:这里可能会出现竞争,两个线程 get 到旧值后都判断过期,然后都执行了 getset 操作。
关键在于:getset 拿到的 value 后,进行的再一次 check,和当前时间判断,替换掉的旧值是否是已过期的值。如果小于当前时间,则表示替换掉的是已过期的锁。获取锁成功。如果判断没有小于,则表示替换掉的是另一个线程设置进去的值,进入下一个自旋。
尽管执行成功了 getset 操作,这也只是将上一个成功拿到的锁过期时间稍微延迟,这个延迟时间很小,可以忽略不计。
举个栗子:
线程 A 和 B 尝试 setnx 失败,然后同时拿到了 value,并且都发现过期,然后都尝试进行 getset 操作。A 线程先执行了 getset 操作,获取锁成功。B 线程后执行了 getset 操作,那么 B 执行的就是把 A 的过期时间拿到,然后把自己的过期时间设置过去。这样的操作相当于把 A 的锁过期时间重置。
由于 A 和 B 同时到达了竞态条件,那么这两个尝试设置的过期时间也不会相差太大,差别可以忽略不计。
可重入锁
上面的实现方式,都是不可重入的分布式锁,任何重入锁的尝试都会导致死锁的发生。导致响应超时。
那么,要实现分布式锁的可重入,那就需要设计的可以存储更多信息。
目前我知道的有两种方式(只提供思路):
1)此种方式实现较为简单:value 中多存储一个 全局唯一的 requestId,代表客户端请求标识。具体可以使用 UUID。在重入的情况下使用同一个 UUID,就能判断是否是一个请求的锁重入,从而获取锁。
2)存储锁的重入次数,以及分布式环境下唯一的线程标识。
如何在分布式线程中标识唯一线程:
MAC 地址 + jvm 进程 ID + 线程 ID(或者线程地址都行), 三者结合即可唯一分布式环境中的线程。
锁的信息采用 json,存储格式如下:
{
"count":1,
"expireAt":147506817232,
"jvmPid":22224,
"mac":"28-D2-44-0E-0D-9A",
"threadId":14
}
参考资料:
Redis,Zk 分布式锁的实现与区别
Redis 实现分布式锁,以及如何处理超时情况
Redis 分布式锁思考
Redis 分布式锁处理并发问题
Redis 分布式锁的正确实现方式 —— 阿里云栖社区
基于 Redis 的分布式锁到底安全吗 —— 多节点 Redis 锁的讨论
Java 实现基于 redis 的分布式可重入锁
作者:_Zy
链接:https://www.jianshu.com/p/1c5c1a592088
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。