原理与演进
参考: https://www.cnblogs.com/MrLiuZF/p/15110559.html (原文非常好, 强烈推荐细读原文)
分布式锁定义
分布式锁定义: 保证同一时间只有一个客户端对共享资源进行操作
要求:
- 不会发生死锁, 即使客户端在持有锁的期间崩溃而没有主动解锁, 也能保证后续其他客户端能加锁
- 具有容错性, 只要大部分Redis节点正常运行, 客户端就可以加锁解锁
- 解铃还须系铃人, 加锁和解锁必须是同一客户端
锁实现演进
(建议看上面的原文, 此处为个人总结)
- setnx
为保证同一时刻只有一个线程操作共享资源, 由于redis是单线程的, 因此可在操作之前往redis设置一个标志位, 操作结束后释放, 使用的是 setIfAbsent(key, val) -> setnx(key, val)
- setnx+finally+expire
若程序拿到标志位后执行异常, 锁未释放, 则会发生死锁, 因此释放锁的操作需在finally代码块中;
但若程序拿到标志位后服务直接宕机, 锁依然不会被释放, 因此除finally代码块外, 还需要设置标志位的过期时间, 超时则自动释放锁
- set px nx
设置标志位以及过期时间需为一个原子操作, 因此, 需使用set px nx 来保证原子性
SET key value [expiration EX seconds|PX milliseconds] [NX|XX]
可选参数如下:
EX: 设置超时时间,单位是秒
PX: 设置超时时间,单位是毫秒
NX: IF NOT EXIST 的缩写,只有 KEY不存在的前提下 才会设置值
XX: IF EXIST 的缩写,只有在 KEY存在的前提下 才会设置值
- set px nx+客户端id
当程序执行时间超过标志位的过期时间时, 锁过期被自动释放, 线程二拿到锁, 随后线程一程序执行完成在finally代码块中把线程二的锁释放掉了, 为解决这一问题, 加锁解锁需为同一客户端, 简单来说, 锁的key需要附带客户端标识, 在解锁时, 需匹配标识一致
- lua
解锁时, 需匹配标识一致, 先判断后解锁存在竞态条件, 因此需将解锁删除标志位的操作写在lua脚本中保证原子性
KEYS[1]: lockKey
ARGV[1]: lockValue
# 获取 KEYS[1] 对应的 Val
local cliVal = redis.call('get', KEYS[1])
# 判断 KEYS[1] 与 ARGV[1] 是否保持一致
if(cliVal == ARGV[1]) then
# 删除 KEYS[1]
redis.call('del', KEYS[1])
return 'OK'
else
return nil
end
- watch dog
上述分布式锁无法续期, 万一锁被超时释放, 可能会导致不可预料的问题, 因此reddisson在加锁成功后会启动一个watch dog后台线程, 每隔10秒检查一下, 若客户端还持有锁, 那么就会不断延长锁的过期时间
- redlock
参考: https://www.jianshu.com/p/2c7855e648ca
当redis集群为主从结构时, 主节点加锁但还未同步从节点, 此刻主节点宕机, 主备切换, 线程二能在新的主节点再次获得同一把锁, 官方推荐使用redlock来解决这一问题, redlock使用算法要求超过半数的节点加锁成功才算最终加锁成功
获取锁的执行步骤 1、获取当前时间
2、依次N个节点获取锁,并设置响应超时时间,防止单节点获取锁时间过长
3、锁有效时间=锁过期时间-获取锁耗费时间,如果第2步骤中获取成功的节点数大于
N/2+1,且锁有效时间大于0,则获得锁成功
4、若获得锁失败,则向所有节点释放锁
框架实现
对比
参考: https://gitee.com/zhaokuner/redission
spring-integration-redis
参考: https://zhuanlan.zhihu.com/p/76532718
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>5.5.11</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
封装示例
DistributedLock 分布式锁接口
/**
* 分布式锁操作的接口
*
* @author xinzhang
* @version 2022/4/29
*/
public interface DistributedLock {
/**
* 加锁
*/
void lock(String key);
/**
* 尝试加锁
*/
boolean tryLock(String key, long timeout, TimeUnit timeUnit);
/**
* 解锁
*/
void unlock(String key);
}
SpringRedisLock 锁实现
/**
* SpringRedisLock
*
* @author xinzhang
* @version 2022/4/29
*/
@Slf4j
public class SpringRedisLock implements DistributedLock {
private static final String SPRING_REDIS_LOCK_PREFIX = "SRL:";
private RedisLockRegistry redisLockRegistry;
public SpringRedisLock(RedisLockRegistry redisLockRegistry) {
this.redisLockRegistry = redisLockRegistry;
}
/**
* 构建分布式锁的key
*/
public static String buildKey(String original) {
return SPRING_REDIS_LOCK_PREFIX + original;
}
@Override
public void lock(String key) {
this.redisLockRegistry.obtain(buildKey(key)).lock();
if (log.isDebugEnabled()) {
log.debug(String.format("线程%s获取锁成功", Thread.currentThread().getName()));
}
}
@Override
public boolean tryLock(String key, long waitTimeout, TimeUnit timeUnit) {
Lock lock = this.redisLockRegistry.obtain(buildKey(key));
try {
boolean isLockSuccess = lock.tryLock(waitTimeout, timeUnit);
if (log.isDebugEnabled()) {
log.debug(String.format("线程%s获取锁%s", Thread.currentThread().getName(), isLockSuccess ? "成功" : "失败"));
}
return isLockSuccess;
} catch (InterruptedException e) {
return false;
}
}
@Override
public void unlock(String key) {
this.redisLockRegistry.obtain(buildKey(key)).unlock();
if (log.isDebugEnabled()) {
log.debug(String.format("线程%s释放锁", Thread.currentThread().getName()));
}
}
}
SpringRedisLockAutoConfig 自动装配
/**
* SpringRedisLockAutoConfig
* RedisOperations在spring-data-redis, redis的自动装配RedisAutoConfiguration在spring-boot-autoconfigure
* RedisAutoConfiguration的条件装配也是基于RedisOperations
* @author xinzhang
* @version 2022/4/29
*/
@Configuration
public class SpringRedisLockAutoConfig {
private static final String DEFAULT_SPRING_REDIS_LOCK = "DSRL";
/**
* 初始化redis分布式锁配置.
*
* 注意,这里的分布式锁的解锁时间默认为60秒,这可能会导致以下安全性问题(出现概率依次递减):
* 1. 分布式锁用在网络IO的场景,必须设置超时时间,否则可能会因为对方超时导致锁自动释放
* 2. 服务器时钟跳跃,可能会出现不可预料的锁到期情况,可能出现的场景:业务因其他原因执行时间过长 + 服务器时钟跳跃 可能会导致分布式锁自动释放
* 3. JVM GC的STW过长导致
*
* @param connectionFactory redis连接工厂
* @return redis分布式锁配置
*/
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory connectionFactory) {
return new RedisLockRegistry(connectionFactory, DEFAULT_SPRING_REDIS_LOCK);
}
@Bean
public DistributedLock distributedLock(RedisLockRegistry redisLockRegistry) {
return new SpringRedisLock(redisLockRegistry);
}
}
使用示例
/**
* ConcurrencyController
*
* @author xinzhang
* @version 2022/4/29
*/
@RestController
@RequestMapping("/service_a/concurrency/snack")
public class SnackController {
@Autowired
private DistributedLock distributedLock;
private Random random = new Random();
private Integer snack = 3;
@PostMapping("/unsafe_acquire")
public Boolean unsafeAcquireSnack() {
return acquire();
}
@PostMapping("/safe_acquire")
public Boolean safeAcquireSnack() {
boolean res;
try {
distributedLock.lock("snack");
res = acquire();
} finally {
distributedLock.unlock("snack");
}
return res;
}
/**
* 获取小吃, 线程不安全
*/
private boolean acquire() {
if (snack <= 0) {
return false;
}
// mock logical
try {
Thread.sleep(random.nextInt(200));
snack--;
System.out.println("get snack, o yeah~~!");
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
}
jmeter20个线程请求即可对比出结果
Redission
参考: https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
https://github.com/redisson/redisson/tree/master/redisson-spring-boot-starter#spring-boot-starter
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.1</version>
</dependency>
封装示例
总体DistributedLock接口设计同上
RedissonLock
/**
* ReddsionLock
*
* @author xinzhang
* @version 2022/5/5
*/
@Slf4j
public class RedissonLock implements DistributedLock {
private RedissonClient redissonClient;
public RedissonLock(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
@Override
public void lock(String key) {
redissonClient.getLock(key).lock();
if (log.isDebugEnabled()) {
log.debug(String.format("线程%s获取锁成功", Thread.currentThread().getName()));
}
}
@Override
public boolean tryLock(String key, long timeout, TimeUnit timeUnit) {
RLock lock = redissonClient.getLock(key);
try {
boolean isLockSuccess = lock.tryLock(timeout, timeUnit);
if (log.isDebugEnabled()) {
log.debug(String.format("线程%s获取锁%s", Thread.currentThread().getName(), isLockSuccess ? "成功" : "失败"));
}
return isLockSuccess;
} catch (InterruptedException e) {
return false;
}
}
@Override
public void unlock(String key) {
redissonClient.getLock(key).unlock();
if (log.isDebugEnabled()) {
log.debug(String.format("线程%s释放锁", Thread.currentThread().getName()));
}
}
}
RedissonLockAutoConfig
/**
* RedissonLockAutoConfig
*
* @author xinzhang
* @version 2022/5/5
*/
@Configuration
public class RedissonLockAutoConfig {
@Bean
public DistributedLock distributedLock(RedissonClient redisson) {
return new RedissonLock(redisson);
}
}
Redis客户端补充
RedisTemplate
Spring-data-redis是spring大家族的一部分,提供了在srping应用中通过简单的配置访问redis服务,对reids底层开发包(Jedis, JRedis, and RJC)进行了高度封装,RedisTemplate提供了redis各种操作、异常处理及序列化,支持发布订阅,并对spring 3.1 cache进行了实现。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>