基本原理和不同实现方式对比
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:
基于Redis的分布式锁
实现分布式锁时需要实现的两个基本方法:
- 获取锁:
- 互斥:确保只能有一个线程获取锁
- 非阻塞:尝试一次,成功返回true,失败返回false
- 释放锁:
- 手动释放
- 超时释放:获取锁时添加一个超时时间
实现Redis分布式锁版本1
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功; false代表获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = String.valueOf(Thread.currentThread().getId());
// 获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
@Resource
private StringRedisTemplate stringRedisTemplate;
@Transactional
public Result createVoucherOrder(Long voucherId) {
//一人一单
Long userId = UserHolder.getUser().getId();
//创建锁对象
SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//尝试获取锁
boolean isLock = redisLock.tryLock(1200);
if(!isLock){
//获取锁失败
return Result.fail("不允许重复下单!");
}
try {
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count > 0){
//用户已经购买过
return Result.fail("用户已经购买过!");
}
//5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock",0)
.update();
if(!success) {
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2用户id
voucherOrder.setUserId(userId);
//6.3代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
//7.返回订单id
return Result.ok(orderId);
} finally {
redisLock.unlock();
}
}
Redis分布式锁误删问题
由于线程1的锁已经超时释放掉了,所以线程1执行释放锁的操作释放的是线程2的锁,这时线程3判断没有锁,所以线程3也可以执行
在锁释放前通过锁标识判断是否一致
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁中标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if(threadId.equals(id)){
//释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
Redis分布式锁原子性问题
当线程1执行完判断准备释放锁时挂起了,线程2获取锁,线程1又运行了,释放了线程2的锁(所有的key都是一样的),导致线程3又可以执行了。解决方法:保证这两个动作的原子性。
Redis的Lua脚本
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
这里重点介绍Redis提供的调用函数,语法如下:
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
写好脚本以后,需要用Redis命令来调用脚本,例如,我们要执行 redis.call(‘set’, ‘name’, ‘jack’) 这个脚本,语法如下:
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
释放锁的业务流程是这样的:
1.获取锁中的线程标示
2.判断是否与指定的标示(当前线程标示)一致
3.如果一致则释放锁(删除)
4.如果不一致则什么都不做
如果用Lua脚本来表示则是这样的:
Java调用Lua脚本改造分布式锁
RedisTemplate调用Lua脚本的API如下:
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public void unlock(){
//调用lua脚本
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
Redisson功能介绍
基于setnx实现的分布式锁存在下面的问题:
Redission
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
Redissson快速入门
引入依赖
<!--redisson--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency>
配置Redission客户端
@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient(){ // 配置 Config config = new Config(); // 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址 config.useSingleServer().setAddress("redis://localhost:6379"); // 创建RedissonClient对象 return Redisson.create(config); } }
使用Redisson分布式锁
@Resource private RedissonClient redissonClient; @Transactional public Result createVoucherOrder(Long voucherId) { //一人一单 Long userId = UserHolder.getUser().getId(); //创建锁对象 RLock redisLock = redissonClient.getLock("lock:order:" + userId); //尝试获取锁 boolean isLock = redisLock.tryLock(); if(!isLock){ //获取锁失败 return Result.fail("不允许重复下单!"); } try { ... ... //7.返回订单id return Result.ok(orderId); } finally { redisLock.unlock(); } }
Redisson可重入锁原理
// 创建锁对象
RLock lock = redissonClient.getLock("lock");
@Test void method1() {
boolean isLock = lock.tryLock();
if(!isLock){
log.error("获取锁失败,1");
return;
}
try {
log.info("获取锁成功,1");
method2();
} finally {
log.info("释放锁,1");
lock.unlock();
}
}
void method2(){
boolean isLock = lock.tryLock();
if(!isLock){
log.error("获取锁失败, 2");
return;
}
try {
log.info("获取锁成功,2");
} finally {
log.info("释放锁,2");
lock.unlock();
}
}
Redission分布式锁原理
- 可重入:利用hash结构记录线程ID和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
- 超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间
Redisson分布式锁主从一致性问题
原理:配置多个Redis节点,必须在所有的节点都获取到锁后,才算获取锁成功。
1)不可重入Redis分布式锁:
- 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示
- 缺陷:不可重入、无法重试、锁超时失效
2)可重入的Redis分布式锁:
- 原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
- 缺陷:redis宕机引起锁失效问题
3)Redisson的multiLock:
- 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
- 缺陷:运维成本高、实现复杂