1.版本一

1.1 代码

  1. @RestController
  2. public class OrderController {
  3. @Autowired
  4. private StringRedisTemplate stringRedisTemplate;
  5. @RequestMapping("/deduct_stock")
  6. public ResponseEntity<String> deductStock() {
  7. String lockKey = "lockKey";
  8. Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deduct_stock");
  9. if (!result) {
  10. return ResponseEntity.ok("扣减库存锁获取失败");
  11. }
  12. //业务处理
  13. int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
  14. if (stock > 0) {
  15. int realStock = stock - 1;
  16. stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
  17. System.out.println("扣减成功,剩余库存:" + realStock);
  18. } else {
  19. System.out.println("扣减失败,库存不足");
  20. }
  21. stringRedisTemplate.delete(lockKey);
  22. return ResponseEntity.ok("扣减库存成功");
  23. }
  24. }

1.2 存在的问题

如果在业务处理过程中抛异常,则 锁永远释放不了,则新的线程永远都得不到锁,业务都会被影响。

2.版本二

2.1 代码

  1. @RestController
  2. public class OrderController {
  3. @Autowired
  4. private StringRedisTemplate stringRedisTemplate;
  5. @RequestMapping("/deduct_stock")
  6. public ResponseEntity<String> deductStock() {
  7. String lockKey = "lockKey";
  8. Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deduct_stock");
  9. if (!result) {
  10. return ResponseEntity.ok("扣减库存锁获取失败");
  11. }
  12. try {
  13. int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
  14. if (stock > 0) {
  15. int realStock = stock - 1;
  16. stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
  17. System.out.println("扣减成功,剩余库存:" + realStock);
  18. } else {
  19. System.out.println("扣减失败,库存不足");
  20. }
  21. } catch (Throwable throwable) {
  22. //打印日志等处理
  23. } finally {
  24. stringRedisTemplate.delete(lockKey);
  25. }
  26. return ResponseEntity.ok("扣减库存成功");
  27. }
  28. }

2.2 存在的问题

如果在获取锁后,在执行业务代码的过程中,发布重启业务系统或者挂了,锁就依旧会无法释放,死锁还是会发生。

3.版本三

为解决死锁问题,增加超时时间

3.1 增加锁超时时间

  1. public class OrderController {
  2. @Autowired
  3. private StringRedisTemplate stringRedisTemplate;
  4. @RequestMapping("/deduct_stock")
  5. public ResponseEntity<String> deductStock() {
  6. String lockKey = "lockKey";
  7. //原子命令
  8. Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "deduct_stock", 10, TimeUnit.SECONDS);
  9. if (!result) {
  10. return ResponseEntity.ok("扣减库存锁获取失败");
  11. }
  12. try {
  13. int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
  14. if (stock > 0) {
  15. int realStock = stock - 1;
  16. stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
  17. System.out.println("扣减成功,剩余库存:" + realStock);
  18. } else {
  19. System.out.println("扣减失败,库存不足");
  20. }
  21. } catch (Throwable throwable) {
  22. //打印日志等处理
  23. } finally {
  24. stringRedisTemplate.delete(lockKey);
  25. }
  26. return ResponseEntity.ok("扣减库存成功");
  27. }
  28. }

3.1.1 存在的问题

业务处理超过锁时间,举例说明,线程1加锁后,线程1执行的比较慢,超过锁时间,超时后锁自动解锁,然后线程2进来加锁,线程1继续执行,然后线程1执行解锁过程,将线程2加的锁 给释放掉了,就乱套了。

3.2 对锁和请求进行绑定

  1. @RestController
  2. public class OrderController {
  3. @Autowired
  4. private StringRedisTemplate stringRedisTemplate;
  5. @RequestMapping("/deduct_stock")
  6. public ResponseEntity<String> deductStock() {
  7. String lockKey = "lockKey";
  8. String requestClientId = UUID.randomUUID().toString();
  9. Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, requestClientId, 10, TimeUnit.SECONDS);
  10. if (!result) {
  11. return ResponseEntity.ok("扣减库存锁获取失败");
  12. }
  13. try {
  14. int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
  15. if (stock > 0) {
  16. int realStock = stock - 1;
  17. stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
  18. System.out.println("扣减成功,剩余库存:" + realStock);
  19. } else {
  20. System.out.println("扣减失败,库存不足");
  21. }
  22. } catch (Throwable throwable) {
  23. //打印日志等处理
  24. } finally {
  25. if(requestClientId.equals(stringRedisTemplate.opsForValue().get("lockKey"))){
  26. stringRedisTemplate.delete(lockKey);
  27. }
  28. }
  29. return ResponseEntity.ok("扣减库存成功");
  30. }
  31. }

3.2.1 存在的问题

这样虽然可以解决线程之间互解锁的问题,但是因为锁时间无法控制,仍然会出现锁失效,上一个线程还未处理完,新线程进来继续处理的并发问题。

4.版本四

4.1 代码

  1. @RestController
  2. public class OrderController {
  3. @Autowired
  4. private StringRedisTemplate stringRedisTemplate;
  5. @Autowired
  6. private Redisson redisson;
  7. @RequestMapping("/deduct_stock")
  8. public ResponseEntity<String> deductStock() {
  9. String lockKey = "lockKey";
  10. RLock lock = redisson.getLock(lockKey);
  11. try {
  12. lock.lock();
  13. //业务处理
  14. int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
  15. if (stock > 0) {
  16. int realStock = stock - 1;
  17. stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
  18. System.out.println("扣减成功,剩余库存:" + realStock);
  19. } else {
  20. System.out.println("扣减失败,库存不足");
  21. }
  22. } catch (Throwable throwable) {
  23. //打印日志等处理
  24. } finally {
  25. lock.unlock();
  26. }
  27. return ResponseEntity.ok("扣减库存成功");
  28. }
  29. }

4.2 优点

锁可以根据业务执行时间,自动续命。

4.3 缺点

4.3.1 单点问题

这个架构中存在一个严重的单点失败问题。如果Redis挂了怎么办 ?你可能会说,可以通过增加一个slave节点解决这个问题,但是不行。因为Redis的主从同步是异步的。

在这种场景(主从)中存在明显的竞态:
1.客户端A从master获取到锁;
2.在master将锁同步到slave之前,master宕掉;
3.slave节点被晋级为master节点。
4.客户端B取得了同一个资源被客户端A已经获取到的另外一个锁,安全失效

5.Redisson分布式锁原理

  1. 加锁机制、锁互斥机制、Watch dog 机制、可重入加锁机制、锁释放机制、
  2. 等五个方面对 Redisson 实现分布式锁的底层原理进行分析;

5.1 加锁流程

image.png

leaseTime 加锁到期时间, -1 使用默认值 30 秒
unit 时间单位, 毫秒、秒、分钟、小时…
interruptibly 是否可被中断标示
  1. private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  2. // 获取当前线程ID
  3. long threadId = Thread.currentThread().getId();
  4. // 尝试获取锁,如果获取到锁,ttl 为null ,直接返回,就能执行业务了。
  5. Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
  6. // lock acquired
  7. if (ttl == null) {
  8. return;
  9. }
  10. RFuture<RedissonLockEntry> future = subscribe(threadId);
  11. if (interruptibly) {
  12. commandExecutor.syncSubscriptionInterrupted(future);
  13. } else {
  14. commandExecutor.syncSubscription(future);
  15. }
  16. try {
  17. while (true) {
  18. ttl = tryAcquire(-1, leaseTime, unit, threadId);
  19. // lock acquired
  20. if (ttl == null) {
  21. break;
  22. }
  23. // waiting for message
  24. if (ttl >= 0) {
  25. try {
  26. future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  27. } catch (InterruptedException e) {
  28. if (interruptibly) {
  29. throw e;
  30. }
  31. future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  32. }
  33. } else {
  34. if (interruptibly) {
  35. future.getNow().getLatch().acquire();
  36. } else {
  37. future.getNow().getLatch().acquireUninterruptibly();
  38. }
  39. }
  40. }
  41. } finally {
  42. unsubscribe(future, threadId);
  43. }
  44. // get(lockAsync(leaseTime, unit));
  45. }

5.1.1 加锁Lua 代码分析

参数分析

keys[1] 表示的是加的锁key( mylock),比如说:RLock lock = redisson.getLock(“myLock”);这里你自己设置了加锁的那个锁key就是“myLock”。
argv[1] 代表的就是锁key的默认生存时间 (毫秒:ms),默认30秒
argv[2] 唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID
  1. // 判断 key[1] 加锁的key 是否存在 ,如果不存在走下面代码
  2. if (redis.call('exists', keys[1] == 0 ) then
  3. //
  4. redis.call('hset', keys[1], argv[2], 1);
  5. redis.call('pexpire', keys[1],argv[1]);
  6. return nil;
  7. end;
  8. if (redis.call('hexists', keys[1], argv[2] == 1) then
  9. redis.call('hincrby', keys[1], argv[2], 1);
  10. redis.call('pexpire', keys[1], argv[1]);
  11. return nil;
  12. end;
  13. return redis.call('pttl', keys[1]);

5.2 锁互斥机制

5.3 Watch Dog机制

  1. private void scheduleExpirationRenewal(final long threadId) {
  2. // expirationRenewalMap是一个ConcurrentMap,存储标志为"当前线程ID:key名称"的任务
  3. if (expirationRenewalMap.containsKey(getEntryName())) {
  4. return;
  5. }
  6. Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
  7. @Override
  8. public void run(Timeout timeout) throws Exception {
  9. // 检测锁是否存在的lua脚本,存在的话就用pexpire命令刷新过期时长
  10. RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  11. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  12. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  13. "return 1; " +
  14. "end; " +
  15. "return 0;",
  16. Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
  17. future.addListener(new FutureListener<Boolean>() {
  18. @Override
  19. public void operationComplete(Future<Boolean> future) throws Exception {
  20. expirationRenewalMap.remove(getEntryName());
  21. if (!future.isSuccess()) {
  22. log.error("Can't update lock " + getName() + " expiration", future.cause());
  23. return;
  24. }
  25. if (future.getNow()) {
  26. // reschedule itself
  27. // 递归调用自己;
  28. scheduleExpirationRenewal(threadId);
  29. }
  30. }
  31. });
  32. }
  33. }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
  34. if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
  35. task.cancel();
  36. }
  37. }

5.4 可重入锁机制

image.png

5.5 解锁

image.png

1、判断锁是否存在,不存在的话用publish命令发布释放锁的消息,订阅者收到后就能做下一步的拿锁处理;
2、锁存在但不是当前线程持有,返回空置nil;
3、当前线程持有锁,用hincrby命令将锁的可重入次数-1,然后判断重入次数是否大于0,是的话就重新刷新锁的过期时长,返回0,否则就删除锁,并发布释放锁的消息,返回1;

6.RedLock算法

在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在这个样例中,我们假设有5个Redis master节点,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。
为了取到锁,客户端应该执行以下操作:

  1. 获取当前Unix时间,以毫秒为单位。
  2. 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
  3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
  4. 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
  5. 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。

引用

https://blog.csdn.net/asd051377305/article/details/108384490
https://blog.csdn.net/qq_42046105/article/details/111350721
https://zhuanlan.zhihu.com/p/135864820
https://github.com/redisson/redisson/wiki
http://redis.cn/topics/distlock.html