分布式锁的概念是在一个系统多个实例当中,传统的锁无法实现跨实例上锁,以Java为例,传统的锁只能针对当前JVM虚拟机,当一个系统在多个JVM中运行时,会存在一个业务在多个实例中同时执行。那么我们就需要保证多实例运行时,引用分布式锁来实现业务的安全执行。

Redission

介绍

  1. redission是阻塞的,既加锁后会持续等待
  2. redission如果没有释放锁,不会发生死锁
  3. Redission能够解决锁自动续期,如果业务超长,会在运行期间自动的给锁续上30s周期,不用担心业务时间长,锁自动过期被删掉,默认加的锁是30s
  4. 自动解锁的时间一定要大于业务的执行时间
  5. 如何使用
  • 第一步: 获取锁 RLock redissonLock = redisson.getLock(lockKey);
  • 第二步: 加锁,实现锁续命功能 redissonLock.lock();
  • 第三步:释放锁 redissonLock.unlock();

示例

  1. // 锁名称
  2. private final String upload_lock = "seckill:upload:lock";
  3. // 分布式锁。锁的业务执行完成,状态已更新完成,释放锁后,其他人获取到最新的状态
  4. RLock lock = redissonClient.getLock(upload_lock);
  5. // 分布式上锁
  6. lock.lock();
  7. try {
  8. log.info("商家秒杀的商品信息");
  9. seckillService.uploadSeckillSkuLates3Days();
  10. } finally {
  11. lock.unlock();
  12. }

源码分析

Redission源码分析.png

获取锁

  1. @Override
  2. public RLock getLock(String name) {
  3. // 获取锁方法,实例化RedissionLock对象
  4. return new RedissonLock(connectionManager.getCommandExecutor(), name);
  5. }

加锁

  1. @Override
  2. public void lock() {
  3. try {
  4. // -1表示锁默认持有时间为30秒
  5. lock(-1, null, false);
  6. } catch (InterruptedException e) {
  7. throw new IllegalStateException();
  8. }
  9. }
  1. private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  2. long threadId = Thread.currentThread().getId();
  3. // 上锁方法,返回ttl
  4. // ttl 有值,说明没有抢到锁,返回锁的还需要多久过期时间
  5. // ttl 为空,说明抢到锁。
  6. Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
  7. // lock acquired
  8. if (ttl == null) {
  9. return;
  10. }
  11. //加锁失败,订阅该线程,释放锁的时候会发布一个消息,锁没有释放的时候则会等待,直到锁释放的时候会执行下面的while循环,重新竞争锁。此处是用了异步的模式。
  12. RFuture<RedissonLockEntry> future = subscribe(threadId);
  13. if (interruptibly) {
  14. commandExecutor.syncSubscriptionInterrupted(future);
  15. } else {
  16. commandExecutor.syncSubscription(future);
  17. }
  18. try {
  19. // 循环抢锁,知道抢到锁后调出循环
  20. while (true) {
  21. // 当前线程尝试抢锁,如果ttl返回空,说明抢到了,否则没有
  22. ttl = tryAcquire(-1, leaseTime, unit, threadId);
  23. // lock acquired
  24. if (ttl == null) {
  25. break;
  26. }
  27. // waiting for message
  28. // 竞争获取锁失败,则排队等待所释放,重新竞争锁。
  29. if (ttl >= 0) {
  30. try {
  31. //利用信号量机制阻塞当前线程ttl时间,之后再重新获取锁,如果当前线程被中断,则抛出
  32. future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  33. } catch (InterruptedException e) {
  34. if (interruptibly) {
  35. throw e;
  36. }
  37. future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  38. }
  39. } else {
  40. if (interruptibly) {
  41. future.getNow().getLatch().acquire();
  42. } else {
  43. future.getNow().getLatch().acquireUninterruptibly();
  44. }
  45. }
  46. }
  47. } finally {
  48. //竞争锁成功后,取消订阅该线程Id事件
  49. unsubscribe(future, threadId);
  50. }
  51. // get(lockAsync(leaseTime, unit));
  52. }

调用tryAcquireAsync

  1. private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2. // 调用tryAcquireAsync
  3. return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
  4. }
  1. private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2. // 尝试加锁,非默认持有锁时间
  3. if (leaseTime != -1) {
  4. // 执行Redis的lua脚本
  5. // lua 脚本实现加锁
  6. return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  7. }
  8. // 通过lua脚本,尝试加锁,默认持有锁时间,30秒
  9. // internalLockLeaseTime 锁持有时间
  10. RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
  11. TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  12. // 注册看门狗,这是一个后台任务,作用是在锁快过期时,如果持有锁的线程还没有完成,为锁续持有时间
  13. // 默认在过期时间持有时间的1/3,比如持有锁的时间是30秒,那么当到10秒的时候,发现线程还没有处理完
  14. // 会继续为当前持有锁的线程续锁的持有时间为30秒
  15. ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
  16. if (e != null) {
  17. return;
  18. }
  19. // lock acquired
  20. if (ttlRemaining == null) {
  21. // 定时任务,频率为持有锁时间的1/3
  22. scheduleExpirationRenewal(threadId);
  23. }
  24. });
  25. return ttlRemainingFuture;
  26. }

执行Lua 脚本

  1. <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  2. // 过期时间
  3. internalLockLeaseTime = unit.toMillis(leaseTime);
  4. // 这个lua脚本的意思,
  5. // 加锁信息在redis是hashmap结构,key为加锁线程的id+随机的uuid,value为1
  6. // 如果当前线程上锁成功,那么就把当前线程的线程id+uuid 设置为hashmap的key,value为1
  7. // 如果key不存在(注意这个key不是hashmap的key,是redis的key),说明没有线程持有锁
  8. // 那么就设置hashmap值,hashmap的key是当前线程id+uuid,value为1,然后设置过期时间
  9. // 否则如果key存在,并且hashmap中的key和当前线程id+uuid相同,那么说明是重入锁,value值加1
  10. // 重新设置过期时间
  11. return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
  12. // 如果redis的key不存在,说明没有锁
  13. // 那么设置hashmap,hashmap的key为uuid+当前线程id,value为1
  14. "if (redis.call('exists', KEYS[1]) == 0) then " +
  15. "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  16. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  17. "return nil; " +
  18. "end; " +
  19. // redis的key存在,那么校验redis中的hashmap的key+uuid和当前线程是否一样
  20. // 如果一样,说明是重入锁,value值加1,并且重新设置过期时间
  21. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  22. "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  23. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  24. "return nil; " +
  25. "end; " +
  26. // redis的key存在,说明其他线程持有锁,返回持有锁的有效时间
  27. "return redis.call('pttl', KEYS[1]);",
  28. Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
  29. }

看门狗

看门狗的作用是解决线程在持有锁的时间范围内,还没有完成时,为线程再次增加持有锁的时间。防止时间到期后释放锁被其他线程抢去。
数据库一致性解决方案-第 4 页.jpg

  1. private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  2. if (leaseTime != -1) {
  3. return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  4. }
  5. RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
  6. TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  7. // 注册看门狗任务,定时扫描线程是否完成,没有完成为线程增加持有锁时间
  8. ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
  9. if (e != null) {
  10. return;
  11. }
  12. // lock acquired
  13. if (ttlRemaining == null) {
  14. // 执行定时任务
  15. scheduleExpirationRenewal(threadId);
  16. }
  17. });
  18. return ttlRemainingFuture;
  19. }

定时执行看门口任务

  1. private void scheduleExpirationRenewal(long threadId) {
  2. ExpirationEntry entry = new ExpirationEntry();
  3. // 当前锁已经有看门狗任务,返回的oldEntry不为空
  4. ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
  5. // 当前锁已经有看门狗任务
  6. if (oldEntry != null) {
  7. oldEntry.addThreadId(threadId);
  8. } else {
  9. // 当前锁还没看门狗任务
  10. // 将当前线程作为看门狗的持有者
  11. entry.addThreadId(threadId);
  12. // 执行定时任务
  13. renewExpiration();
  14. }
  15. }
  1. private void renewExpiration() {
  2. // 获取当前锁的任务
  3. ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  4. if (ee == null) {
  5. return;
  6. }
  7. Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
  8. @Override
  9. public void run(Timeout timeout) throws Exception {
  10. // 获取锁的看门狗任务
  11. ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  12. if (ent == null) {
  13. return;
  14. }
  15. // 如果看门狗任务没有任何持有线程,不做处理
  16. Long threadId = ent.getFirstThreadId();
  17. if (threadId == null) {
  18. return;
  19. }
  20. // lua 脚本为锁增加持有时间
  21. RFuture<Boolean> future = renewExpirationAsync(threadId);
  22. future.onComplete((res, e) -> {
  23. if (e != null) {
  24. log.error("Can't update lock " + getName() + " expiration", e);
  25. EXPIRATION_RENEWAL_MAP.remove(getEntryName());
  26. return;
  27. }
  28. if (res) {
  29. // reschedule itself
  30. // 重新为看门狗设置执行时间
  31. renewExpiration();
  32. }
  33. });
  34. }
  35. }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 执行评率为持有锁的时间1/3
  36. ee.setTimeout(task);
  37. }

执行lua脚本,为持有锁的线程增加持有锁的时间

  1. protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  2. // lua 脚本的意思
  3. // 如果锁名称和持有锁的线程id+uuid存在,说明当前线程持有该锁
  4. // 那么为线程设置持有时间,持有时间默认为30秒,如果在加锁时指定了持有时间,以指定时间为准
  5. return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  6. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  7. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  8. "return 1; " +
  9. "end; " +
  10. "return 0;",
  11. Collections.singletonList(getName()),
  12. internalLockLeaseTime, getLockName(threadId));
  13. }

解锁

调用unlock()方法解锁

  1. @Override
  2. public void unlock() {
  3. try {
  4. get(unlockAsync(Thread.currentThread().getId()));
  5. } catch (RedisException e) {
  6. if (e.getCause() instanceof IllegalMonitorStateException) {
  7. throw (IllegalMonitorStateException) e.getCause();
  8. } else {
  9. throw e;
  10. }
  11. }
  12. }
  1. @Override
  2. public RFuture<Void> unlockAsync(long threadId) {
  3. RPromise<Void> result = new RedissonPromise<Void>();
  4. // lua 脚本释放锁
  5. RFuture<Boolean> future = unlockInnerAsync(threadId);
  6. future.onComplete((opStatus, e) -> {
  7. cancelExpirationRenewal(threadId);
  8. if (e != null) {
  9. result.tryFailure(e);
  10. return;
  11. }
  12. if (opStatus == null) {
  13. IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
  14. + id + " thread-id: " + threadId);
  15. result.tryFailure(cause);
  16. return;
  17. }
  18. result.trySuccess(null);
  19. });
  20. return result;
  21. }
  1. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  2. // lua脚本的意思
  3. // 如果缓存key和线程id+uuid在redis不存在,说明当前线程没有持有该锁
  4. // 否则说明存在,给缓存key对应的hashmap 的value值设置为-1,返回锁的计数器
  5. // 如果计数器>0表示该锁仍然有效,更新锁的超时时间
  6. // 其他情况,直接删除redis锁,并广播释放锁的消息
  7. return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  8. "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  9. "return nil;" +
  10. "end; " +
  11. "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  12. "if (counter > 0) then " +
  13. "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  14. "return 0; " +
  15. "else " +
  16. "redis.call('del', KEYS[1]); " +
  17. "redis.call('publish', KEYS[2], ARGV[1]); " +
  18. "return 1; " +
  19. "end; " +
  20. "return nil;",
  21. Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
  22. }