1.加锁
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = this.tryAcquire(leaseTime, unit, threadId);//if (ttl != null) {RFuture<RedissonLockEntry> future = this.subscribe(threadId);this.commandExecutor.syncSubscription(future);try {while(true) {ttl = this.tryAcquire(leaseTime, unit, threadId);if (ttl == null) {return;}if (ttl >= 0L) {try {this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException var13) {if (interruptibly) {throw var13;}this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else if (interruptibly) {this.getEntry(threadId).getLatch().acquire();} else {this.getEntry(threadId).getLatch().acquireUninterruptibly();}}} finally {this.unsubscribe(future, threadId);}}}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {//设置了过期时间return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}//没设置过期时间默认30sRFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {//开启锁续期延时任务scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}
private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {entry.addThreadId(threadId);renewExpiration();}}
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);}
