1.加锁

    1. private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    2. long threadId = Thread.currentThread().getId();
    3. Long ttl = this.tryAcquire(leaseTime, unit, threadId);
    4. //
    5. if (ttl != null) {
    6. RFuture<RedissonLockEntry> future = this.subscribe(threadId);
    7. this.commandExecutor.syncSubscription(future);
    8. try {
    9. while(true) {
    10. ttl = this.tryAcquire(leaseTime, unit, threadId);
    11. if (ttl == null) {
    12. return;
    13. }
    14. if (ttl >= 0L) {
    15. try {
    16. this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    17. } catch (InterruptedException var13) {
    18. if (interruptibly) {
    19. throw var13;
    20. }
    21. this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    22. }
    23. } else if (interruptibly) {
    24. this.getEntry(threadId).getLatch().acquire();
    25. } else {
    26. this.getEntry(threadId).getLatch().acquireUninterruptibly();
    27. }
    28. }
    29. } finally {
    30. this.unsubscribe(future, threadId);
    31. }
    32. }
    33. }
    1. private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    2. if (leaseTime != -1) {//设置了过期时间
    3. return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    4. }
    5. //没设置过期时间默认30s
    6. RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    7. ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    8. if (e != null) {
    9. return;
    10. }
    11. // lock acquired
    12. if (ttlRemaining == null) {//开启锁续期延时任务
    13. scheduleExpirationRenewal(threadId);
    14. }
    15. });
    16. return ttlRemainingFuture;
    17. }
    1. private void scheduleExpirationRenewal(long threadId) {
    2. ExpirationEntry entry = new ExpirationEntry();
    3. ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    4. if (oldEntry != null) {
    5. oldEntry.addThreadId(threadId);
    6. } else {
    7. entry.addThreadId(threadId);
    8. renewExpiration();
    9. }
    10. }
    1. private void renewExpiration() {
    2. ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    3. if (ee == null) {
    4. return;
    5. }
    6. Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    7. @Override
    8. public void run(Timeout timeout) throws Exception {
    9. ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    10. if (ent == null) {
    11. return;
    12. }
    13. Long threadId = ent.getFirstThreadId();
    14. if (threadId == null) {
    15. return;
    16. }
    17. RFuture<Boolean> future = renewExpirationAsync(threadId);
    18. future.onComplete((res, e) -> {
    19. if (e != null) {
    20. log.error("Can't update lock " + getName() + " expiration", e);
    21. return;
    22. }
    23. if (res) {
    24. // reschedule itself
    25. renewExpiration();
    26. }
    27. });
    28. }
    29. }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    30. ee.setTimeout(task);
    31. }