1.版本一
1.1 代码
@RestControllerpublic class OrderController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@RequestMapping("/deduct_stock")public ResponseEntity<String> deductStock() {String lockKey = "lockKey";Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deduct_stock");if (!result) {return ResponseEntity.ok("扣减库存锁获取失败");}//业务处理int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}stringRedisTemplate.delete(lockKey);return ResponseEntity.ok("扣减库存成功");}}
1.2 存在的问题
如果在业务处理过程中抛异常,则 锁永远释放不了,则新的线程永远都得不到锁,业务都会被影响。
2.版本二
2.1 代码
@RestControllerpublic class OrderController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@RequestMapping("/deduct_stock")public ResponseEntity<String> deductStock() {String lockKey = "lockKey";Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deduct_stock");if (!result) {return ResponseEntity.ok("扣减库存锁获取失败");}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} catch (Throwable throwable) {//打印日志等处理} finally {stringRedisTemplate.delete(lockKey);}return ResponseEntity.ok("扣减库存成功");}}
2.2 存在的问题
如果在获取锁后,在执行业务代码的过程中,发布重启业务系统或者挂了,锁就依旧会无法释放,死锁还是会发生。
3.版本三
3.1 增加锁超时时间
public class OrderController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@RequestMapping("/deduct_stock")public ResponseEntity<String> deductStock() {String lockKey = "lockKey";//原子命令Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deduct_stock", 10, TimeUnit.SECONDS);if (!result) {return ResponseEntity.ok("扣减库存锁获取失败");}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} catch (Throwable throwable) {//打印日志等处理} finally {stringRedisTemplate.delete(lockKey);}return ResponseEntity.ok("扣减库存成功");}}
3.1.1 存在的问题
业务处理超过锁时间,举例说明,线程1加锁后,线程1执行的比较慢,超过锁时间,超时后锁自动解锁,然后线程2进来加锁,线程1继续执行,然后线程1执行解锁过程,将线程2加的锁 给释放掉了,就乱套了。
3.2 对锁和请求进行绑定
@RestControllerpublic class OrderController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@RequestMapping("/deduct_stock")public ResponseEntity<String> deductStock() {String lockKey = "lockKey";String requestClientId = UUID.randomUUID().toString();Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, requestClientId, 10, TimeUnit.SECONDS);if (!result) {return ResponseEntity.ok("扣减库存锁获取失败");}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} catch (Throwable throwable) {//打印日志等处理} finally {if(requestClientId.equals(stringRedisTemplate.opsForValue().get("lockKey"))){stringRedisTemplate.delete(lockKey);}}return ResponseEntity.ok("扣减库存成功");}}
3.2.1 存在的问题
这样虽然可以解决线程之间互解锁的问题,但是因为锁时间无法控制,仍然会出现锁失效,上一个线程还未处理完,新线程进来继续处理的并发问题。
4.版本四
4.1 代码
@RestControllerpublic class OrderController {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate Redisson redisson;@RequestMapping("/deduct_stock")public ResponseEntity<String> deductStock() {String lockKey = "lockKey";RLock lock = redisson.getLock(lockKey);try {lock.lock();//业务处理int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} catch (Throwable throwable) {//打印日志等处理} finally {lock.unlock();}return ResponseEntity.ok("扣减库存成功");}}
4.2 优点
4.3 缺点
4.3.1 单点问题
这个架构中存在一个严重的单点失败问题。如果Redis挂了怎么办 ?你可能会说,可以通过增加一个slave节点解决这个问题,但是不行。因为Redis的主从同步是异步的。
在这种场景(主从)中存在明显的竞态:
1.客户端A从master获取到锁;
2.在master将锁同步到slave之前,master宕掉;
3.slave节点被晋级为master节点。
4.客户端B取得了同一个资源被客户端A已经获取到的另外一个锁,安全失效。
5.Redisson分布式锁原理
加锁机制、锁互斥机制、Watch dog 机制、可重入加锁机制、锁释放机制、等五个方面对 Redisson 实现分布式锁的底层原理进行分析;
5.1 加锁流程

| leaseTime | 加锁到期时间, -1 使用默认值 30 秒 |
|---|---|
| unit | 时间单位, 毫秒、秒、分钟、小时… |
| interruptibly | 是否可被中断标示 |
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {// 获取当前线程IDlong threadId = Thread.currentThread().getId();// 尝试获取锁,如果获取到锁,ttl 为null ,直接返回,就能执行业务了。Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;}RFuture<RedissonLockEntry> future = subscribe(threadId);if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {commandExecutor.syncSubscription(future);}try {while (true) {ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for messageif (ttl >= 0) {try {future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {if (interruptibly) {future.getNow().getLatch().acquire();} else {future.getNow().getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(future, threadId);}// get(lockAsync(leaseTime, unit));}
5.1.1 加锁Lua 代码分析
参数分析
| keys[1] | 表示的是加的锁key( mylock),比如说:RLock lock = redisson.getLock(“myLock”);这里你自己设置了加锁的那个锁key就是“myLock”。 |
|---|---|
| argv[1] | 代表的就是锁key的默认生存时间 (毫秒:ms),默认30秒 |
| argv[2] | 唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID |
// 判断 key[1] 加锁的key 是否存在 ,如果不存在走下面代码if (redis.call('exists', keys[1] == 0 ) then//redis.call('hset', keys[1], argv[2], 1);redis.call('pexpire', keys[1],argv[1]);return nil;end;if (redis.call('hexists', keys[1], argv[2] == 1) thenredis.call('hincrby', keys[1], argv[2], 1);redis.call('pexpire', keys[1], argv[1]);return nil;end;return redis.call('pttl', keys[1]);
5.2 锁互斥机制
5.3 Watch Dog机制
private void scheduleExpirationRenewal(final long threadId) {// expirationRenewalMap是一个ConcurrentMap,存储标志为"当前线程ID:key名称"的任务if (expirationRenewalMap.containsKey(getEntryName())) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {// 检测锁是否存在的lua脚本,存在的话就用pexpire命令刷新过期时长RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itself// 递归调用自己;scheduleExpirationRenewal(threadId);}}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();}}
5.4 可重入锁机制
5.5 解锁

1、判断锁是否存在,不存在的话用publish命令发布释放锁的消息,订阅者收到后就能做下一步的拿锁处理;
2、锁存在但不是当前线程持有,返回空置nil;
3、当前线程持有锁,用hincrby命令将锁的可重入次数-1,然后判断重入次数是否大于0,是的话就重新刷新锁的过期时长,返回0,否则就删除锁,并发布释放锁的消息,返回1;
6.RedLock算法
在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在这个样例中,我们假设有5个Redis master节点,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:
- 获取当前Unix时间,以毫秒为单位。
- 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
- 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
- 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
- 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。
引用
https://blog.csdn.net/asd051377305/article/details/108384490
https://blog.csdn.net/qq_42046105/article/details/111350721
https://zhuanlan.zhihu.com/p/135864820
https://github.com/redisson/redisson/wiki
http://redis.cn/topics/distlock.html
