分布式锁的概念是在一个系统多个实例当中,传统的锁无法实现跨实例上锁,以Java为例,传统的锁只能针对当前JVM虚拟机,当一个系统在多个JVM中运行时,会存在一个业务在多个实例中同时执行。那么我们就需要保证多实例运行时,引用分布式锁来实现业务的安全执行。
Redission
介绍
- redission是阻塞的,既加锁后会持续等待
- redission如果没有释放锁,不会发生死锁
- Redission能够解决锁自动续期,如果业务超长,会在运行期间自动的给锁续上30s周期,不用担心业务时间长,锁自动过期被删掉,默认加的锁是30s
- 自动解锁的时间一定要大于业务的执行时间
- 如何使用
- 第一步: 获取锁 RLock redissonLock = redisson.getLock(lockKey);
- 第二步: 加锁,实现锁续命功能 redissonLock.lock();
- 第三步:释放锁 redissonLock.unlock();
示例
// 锁名称private final String upload_lock = "seckill:upload:lock";// 分布式锁。锁的业务执行完成,状态已更新完成,释放锁后,其他人获取到最新的状态RLock lock = redissonClient.getLock(upload_lock);// 分布式上锁lock.lock();try {log.info("商家秒杀的商品信息");seckillService.uploadSeckillSkuLates3Days();} finally {lock.unlock();}
源码分析

获取锁
@Overridepublic RLock getLock(String name) {// 获取锁方法,实例化RedissionLock对象return new RedissonLock(connectionManager.getCommandExecutor(), name);}
加锁
@Overridepublic void lock() {try {// -1表示锁默认持有时间为30秒lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();// 上锁方法,返回ttl// ttl 有值,说明没有抢到锁,返回锁的还需要多久过期时间// ttl 为空,说明抢到锁。Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;}//加锁失败,订阅该线程,释放锁的时候会发布一个消息,锁没有释放的时候则会等待,直到锁释放的时候会执行下面的while循环,重新竞争锁。此处是用了异步的模式。RFuture<RedissonLockEntry> future = subscribe(threadId);if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {commandExecutor.syncSubscription(future);}try {// 循环抢锁,知道抢到锁后调出循环while (true) {// 当前线程尝试抢锁,如果ttl返回空,说明抢到了,否则没有ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for message// 竞争获取锁失败,则排队等待所释放,重新竞争锁。if (ttl >= 0) {try {//利用信号量机制阻塞当前线程ttl时间,之后再重新获取锁,如果当前线程被中断,则抛出future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {if (interruptibly) {future.getNow().getLatch().acquire();} else {future.getNow().getLatch().acquireUninterruptibly();}}}} finally {//竞争锁成功后,取消订阅该线程Id事件unsubscribe(future, threadId);}// get(lockAsync(leaseTime, unit));}
调用tryAcquireAsync
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 调用tryAcquireAsyncreturn get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {// 尝试加锁,非默认持有锁时间if (leaseTime != -1) {// 执行Redis的lua脚本// lua 脚本实现加锁return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}// 通过lua脚本,尝试加锁,默认持有锁时间,30秒// internalLockLeaseTime 锁持有时间RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);// 注册看门狗,这是一个后台任务,作用是在锁快过期时,如果持有锁的线程还没有完成,为锁续持有时间// 默认在过期时间持有时间的1/3,比如持有锁的时间是30秒,那么当到10秒的时候,发现线程还没有处理完// 会继续为当前持有锁的线程续锁的持有时间为30秒ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {// 定时任务,频率为持有锁时间的1/3scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}
执行Lua 脚本
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {// 过期时间internalLockLeaseTime = unit.toMillis(leaseTime);// 这个lua脚本的意思,// 加锁信息在redis是hashmap结构,key为加锁线程的id+随机的uuid,value为1// 如果当前线程上锁成功,那么就把当前线程的线程id+uuid 设置为hashmap的key,value为1// 如果key不存在(注意这个key不是hashmap的key,是redis的key),说明没有线程持有锁// 那么就设置hashmap值,hashmap的key是当前线程id+uuid,value为1,然后设置过期时间// 否则如果key存在,并且hashmap中的key和当前线程id+uuid相同,那么说明是重入锁,value值加1// 重新设置过期时间return evalWriteAsync(getName(), LongCodec.INSTANCE, command,// 如果redis的key不存在,说明没有锁// 那么设置hashmap,hashmap的key为uuid+当前线程id,value为1"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// redis的key存在,那么校验redis中的hashmap的key+uuid和当前线程是否一样// 如果一样,说明是重入锁,value值加1,并且重新设置过期时间"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +// redis的key存在,说明其他线程持有锁,返回持有锁的有效时间"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
看门狗
看门狗的作用是解决线程在持有锁的时间范围内,还没有完成时,为线程再次增加持有锁的时间。防止时间到期后释放锁被其他线程抢去。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);// 注册看门狗任务,定时扫描线程是否完成,没有完成为线程增加持有锁时间ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {// 执行定时任务scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;}
定时执行看门口任务
private void scheduleExpirationRenewal(long threadId) {ExpirationEntry entry = new ExpirationEntry();// 当前锁已经有看门狗任务,返回的oldEntry不为空ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);// 当前锁已经有看门狗任务if (oldEntry != null) {oldEntry.addThreadId(threadId);} else {// 当前锁还没看门狗任务// 将当前线程作为看门狗的持有者entry.addThreadId(threadId);// 执行定时任务renewExpiration();}}
private void renewExpiration() {// 获取当前锁的任务ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {// 获取锁的看门狗任务ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}// 如果看门狗任务没有任何持有线程,不做处理Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}// lua 脚本为锁增加持有时间RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if (res) {// reschedule itself// 重新为看门狗设置执行时间renewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 执行评率为持有锁的时间1/3ee.setTimeout(task);}
执行lua脚本,为持有锁的线程增加持有锁的时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {// lua 脚本的意思// 如果锁名称和持有锁的线程id+uuid存在,说明当前线程持有该锁// 那么为线程设置持有时间,持有时间默认为30秒,如果在加锁时指定了持有时间,以指定时间为准return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));}
解锁
调用unlock()方法解锁
@Overridepublic void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}}
@Overridepublic RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();// lua 脚本释放锁RFuture<Boolean> future = unlockInnerAsync(threadId);future.onComplete((opStatus, e) -> {cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);return;}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}result.trySuccess(null);});return result;}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {// lua脚本的意思// 如果缓存key和线程id+uuid在redis不存在,说明当前线程没有持有该锁// 否则说明存在,给缓存key对应的hashmap 的value值设置为-1,返回锁的计数器// 如果计数器>0表示该锁仍然有效,更新锁的超时时间// 其他情况,直接删除redis锁,并广播释放锁的消息return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end; " +"return nil;",Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));}
