分布式锁的概念是在一个系统多个实例当中,传统的锁无法实现跨实例上锁,以Java为例,传统的锁只能针对当前JVM虚拟机,当一个系统在多个JVM中运行时,会存在一个业务在多个实例中同时执行。那么我们就需要保证多实例运行时,引用分布式锁来实现业务的安全执行。
Redission
介绍
- redission是阻塞的,既加锁后会持续等待
- redission如果没有释放锁,不会发生死锁
- Redission能够解决锁自动续期,如果业务超长,会在运行期间自动的给锁续上30s周期,不用担心业务时间长,锁自动过期被删掉,默认加的锁是30s
- 自动解锁的时间一定要大于业务的执行时间
- 如何使用
- 第一步: 获取锁 RLock redissonLock = redisson.getLock(lockKey);
- 第二步: 加锁,实现锁续命功能 redissonLock.lock();
- 第三步:释放锁 redissonLock.unlock();
示例
// 锁名称
private final String upload_lock = "seckill:upload:lock";
// 分布式锁。锁的业务执行完成,状态已更新完成,释放锁后,其他人获取到最新的状态
RLock lock = redissonClient.getLock(upload_lock);
// 分布式上锁
lock.lock();
try {
log.info("商家秒杀的商品信息");
seckillService.uploadSeckillSkuLates3Days();
} finally {
lock.unlock();
}
源码分析
获取锁
@Override
public RLock getLock(String name) {
// 获取锁方法,实例化RedissionLock对象
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
加锁
@Override
public void lock() {
try {
// -1表示锁默认持有时间为30秒
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 上锁方法,返回ttl
// ttl 有值,说明没有抢到锁,返回锁的还需要多久过期时间
// ttl 为空,说明抢到锁。
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
//加锁失败,订阅该线程,释放锁的时候会发布一个消息,锁没有释放的时候则会等待,直到锁释放的时候会执行下面的while循环,重新竞争锁。此处是用了异步的模式。
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
// 循环抢锁,知道抢到锁后调出循环
while (true) {
// 当前线程尝试抢锁,如果ttl返回空,说明抢到了,否则没有
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
// 竞争获取锁失败,则排队等待所释放,重新竞争锁。
if (ttl >= 0) {
try {
//利用信号量机制阻塞当前线程ttl时间,之后再重新获取锁,如果当前线程被中断,则抛出
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
//竞争锁成功后,取消订阅该线程Id事件
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
调用tryAcquireAsync
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 调用tryAcquireAsync
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 尝试加锁,非默认持有锁时间
if (leaseTime != -1) {
// 执行Redis的lua脚本
// lua 脚本实现加锁
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 通过lua脚本,尝试加锁,默认持有锁时间,30秒
// internalLockLeaseTime 锁持有时间
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 注册看门狗,这是一个后台任务,作用是在锁快过期时,如果持有锁的线程还没有完成,为锁续持有时间
// 默认在过期时间持有时间的1/3,比如持有锁的时间是30秒,那么当到10秒的时候,发现线程还没有处理完
// 会继续为当前持有锁的线程续锁的持有时间为30秒
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
// 定时任务,频率为持有锁时间的1/3
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
执行Lua 脚本
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
// 过期时间
internalLockLeaseTime = unit.toMillis(leaseTime);
// 这个lua脚本的意思,
// 加锁信息在redis是hashmap结构,key为加锁线程的id+随机的uuid,value为1
// 如果当前线程上锁成功,那么就把当前线程的线程id+uuid 设置为hashmap的key,value为1
// 如果key不存在(注意这个key不是hashmap的key,是redis的key),说明没有线程持有锁
// 那么就设置hashmap值,hashmap的key是当前线程id+uuid,value为1,然后设置过期时间
// 否则如果key存在,并且hashmap中的key和当前线程id+uuid相同,那么说明是重入锁,value值加1
// 重新设置过期时间
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 如果redis的key不存在,说明没有锁
// 那么设置hashmap,hashmap的key为uuid+当前线程id,value为1
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// redis的key存在,那么校验redis中的hashmap的key+uuid和当前线程是否一样
// 如果一样,说明是重入锁,value值加1,并且重新设置过期时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// redis的key存在,说明其他线程持有锁,返回持有锁的有效时间
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
看门狗
看门狗的作用是解决线程在持有锁的时间范围内,还没有完成时,为线程再次增加持有锁的时间。防止时间到期后释放锁被其他线程抢去。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 注册看门狗任务,定时扫描线程是否完成,没有完成为线程增加持有锁时间
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
// 执行定时任务
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
定时执行看门口任务
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
// 当前锁已经有看门狗任务,返回的oldEntry不为空
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
// 当前锁已经有看门狗任务
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
// 当前锁还没看门狗任务
// 将当前线程作为看门狗的持有者
entry.addThreadId(threadId);
// 执行定时任务
renewExpiration();
}
}
private void renewExpiration() {
// 获取当前锁的任务
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 获取锁的看门狗任务
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 如果看门狗任务没有任何持有线程,不做处理
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// lua 脚本为锁增加持有时间
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
// 重新为看门狗设置执行时间
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 执行评率为持有锁的时间1/3
ee.setTimeout(task);
}
执行lua脚本,为持有锁的线程增加持有锁的时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
// lua 脚本的意思
// 如果锁名称和持有锁的线程id+uuid存在,说明当前线程持有该锁
// 那么为线程设置持有时间,持有时间默认为30秒,如果在加锁时指定了持有时间,以指定时间为准
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
解锁
调用unlock()方法解锁
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
// lua 脚本释放锁
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// lua脚本的意思
// 如果缓存key和线程id+uuid在redis不存在,说明当前线程没有持有该锁
// 否则说明存在,给缓存key对应的hashmap 的value值设置为-1,返回锁的计数器
// 如果计数器>0表示该锁仍然有效,更新锁的超时时间
// 其他情况,直接删除redis锁,并广播释放锁的消息
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}