1.版本一
1.1 代码
@RestController
public class OrderController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct_stock")
public ResponseEntity<String> deductStock() {
String lockKey = "lockKey";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deduct_stock");
if (!result) {
return ResponseEntity.ok("扣减库存锁获取失败");
}
//业务处理
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
stringRedisTemplate.delete(lockKey);
return ResponseEntity.ok("扣减库存成功");
}
}
1.2 存在的问题
如果在业务处理过程中抛异常,则 锁永远释放不了,则新的线程永远都得不到锁,业务都会被影响。
2.版本二
2.1 代码
@RestController
public class OrderController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct_stock")
public ResponseEntity<String> deductStock() {
String lockKey = "lockKey";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deduct_stock");
if (!result) {
return ResponseEntity.ok("扣减库存锁获取失败");
}
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} catch (Throwable throwable) {
//打印日志等处理
} finally {
stringRedisTemplate.delete(lockKey);
}
return ResponseEntity.ok("扣减库存成功");
}
}
2.2 存在的问题
如果在获取锁后,在执行业务代码的过程中,发布重启业务系统或者挂了,锁就依旧会无法释放,死锁还是会发生。
3.版本三
3.1 增加锁超时时间
public class OrderController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct_stock")
public ResponseEntity<String> deductStock() {
String lockKey = "lockKey";
//原子命令
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deduct_stock", 10, TimeUnit.SECONDS);
if (!result) {
return ResponseEntity.ok("扣减库存锁获取失败");
}
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} catch (Throwable throwable) {
//打印日志等处理
} finally {
stringRedisTemplate.delete(lockKey);
}
return ResponseEntity.ok("扣减库存成功");
}
}
3.1.1 存在的问题
业务处理超过锁时间,举例说明,线程1加锁后,线程1执行的比较慢,超过锁时间,超时后锁自动解锁,然后线程2进来加锁,线程1继续执行,然后线程1执行解锁过程,将线程2加的锁 给释放掉了,就乱套了。
3.2 对锁和请求进行绑定
@RestController
public class OrderController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct_stock")
public ResponseEntity<String> deductStock() {
String lockKey = "lockKey";
String requestClientId = UUID.randomUUID().toString();
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, requestClientId, 10, TimeUnit.SECONDS);
if (!result) {
return ResponseEntity.ok("扣减库存锁获取失败");
}
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} catch (Throwable throwable) {
//打印日志等处理
} finally {
if(requestClientId.equals(stringRedisTemplate.opsForValue().get("lockKey"))){
stringRedisTemplate.delete(lockKey);
}
}
return ResponseEntity.ok("扣减库存成功");
}
}
3.2.1 存在的问题
这样虽然可以解决线程之间互解锁的问题,但是因为锁时间无法控制,仍然会出现锁失效,上一个线程还未处理完,新线程进来继续处理的并发问题。
4.版本四
4.1 代码
@RestController
public class OrderController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private Redisson redisson;
@RequestMapping("/deduct_stock")
public ResponseEntity<String> deductStock() {
String lockKey = "lockKey";
RLock lock = redisson.getLock(lockKey);
try {
lock.lock();
//业务处理
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} catch (Throwable throwable) {
//打印日志等处理
} finally {
lock.unlock();
}
return ResponseEntity.ok("扣减库存成功");
}
}
4.2 优点
4.3 缺点
4.3.1 单点问题
这个架构中存在一个严重的单点失败问题。如果Redis挂了怎么办 ?你可能会说,可以通过增加一个slave节点解决这个问题,但是不行。因为Redis的主从同步是异步的。
在这种场景(主从)中存在明显的竞态:
1.客户端A从master获取到锁;
2.在master将锁同步到slave之前,master宕掉;
3.slave节点被晋级为master节点。
4.客户端B取得了同一个资源被客户端A已经获取到的另外一个锁,安全失效。
5.Redisson分布式锁原理
加锁机制、锁互斥机制、Watch dog 机制、可重入加锁机制、锁释放机制、
等五个方面对 Redisson 实现分布式锁的底层原理进行分析;
5.1 加锁流程
leaseTime | 加锁到期时间, -1 使用默认值 30 秒 |
---|---|
unit | 时间单位, 毫秒、秒、分钟、小时… |
interruptibly | 是否可被中断标示 |
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
// 获取当前线程ID
long threadId = Thread.currentThread().getId();
// 尝试获取锁,如果获取到锁,ttl 为null ,直接返回,就能执行业务了。
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
5.1.1 加锁Lua 代码分析
参数分析
keys[1] | 表示的是加的锁key( mylock),比如说:RLock lock = redisson.getLock(“myLock”);这里你自己设置了加锁的那个锁key就是“myLock”。 |
---|---|
argv[1] | 代表的就是锁key的默认生存时间 (毫秒:ms),默认30秒 |
argv[2] | 唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID |
// 判断 key[1] 加锁的key 是否存在 ,如果不存在走下面代码
if (redis.call('exists', keys[1] == 0 ) then
//
redis.call('hset', keys[1], argv[2], 1);
redis.call('pexpire', keys[1],argv[1]);
return nil;
end;
if (redis.call('hexists', keys[1], argv[2] == 1) then
redis.call('hincrby', keys[1], argv[2], 1);
redis.call('pexpire', keys[1], argv[1]);
return nil;
end;
return redis.call('pttl', keys[1]);
5.2 锁互斥机制
5.3 Watch Dog机制
private void scheduleExpirationRenewal(final long threadId) {
// expirationRenewalMap是一个ConcurrentMap,存储标志为"当前线程ID:key名称"的任务
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 检测锁是否存在的lua脚本,存在的话就用pexpire命令刷新过期时长
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
// 递归调用自己;
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
5.4 可重入锁机制
5.5 解锁
1、判断锁是否存在,不存在的话用publish命令发布释放锁的消息,订阅者收到后就能做下一步的拿锁处理;
2、锁存在但不是当前线程持有,返回空置nil;
3、当前线程持有锁,用hincrby命令将锁的可重入次数-1,然后判断重入次数是否大于0,是的话就重新刷新锁的过期时长,返回0,否则就删除锁,并发布释放锁的消息,返回1;
6.RedLock算法
在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在这个样例中,我们假设有5个Redis master节点,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:
- 获取当前Unix时间,以毫秒为单位。
- 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
- 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
- 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
- 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。
引用
https://blog.csdn.net/asd051377305/article/details/108384490
https://blog.csdn.net/qq_42046105/article/details/111350721
https://zhuanlan.zhihu.com/p/135864820
https://github.com/redisson/redisson/wiki
http://redis.cn/topics/distlock.html