参考redisson:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

分布式锁

锁的类型

  • 排他锁,不允许多个程序(线程、进程)同时访问某个共享资源
  • 共享锁

redisson分布式锁使用

  1. // 锁标识
  2. String lockKey = "LOCK:TASK:XXX";
  3. // 创建获取锁redisson客户端
  4. RLock initingTaskLock = redissonClient.getLock(lockKey);
  5. // 尝试获取锁
  6. if (!initingTaskLock.tryLock()) {
  7. return;
  8. }

执行逻辑

public boolean tryLock() {
    // 尝试获取锁
    RFuture<Boolean> future = tryLockAsync();
    // 阻塞获取
    return get(future);
}

public RFuture<Boolean> tryLockAsync() {
    // 异步获取锁
    return tryLockAsync(Thread.currentThread().getId());
}

public RFuture<Boolean> tryLockAsync(long threadId) {
    // 执行一次
    return tryAcquireOnceAsync(-1, -1, null, threadId);
}

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        // 指定了锁过期时间
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }
    // 未指定锁过期时间
    RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                                            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    // 添加监听事件
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired 获取到锁
        if (ttlRemaining) {
            // 设置看门狗程序
            scheduleExpirationRenewal(threadId);
        }
    });

    return ttlRemainingFuture;
}

这里比较重要的是获取锁操作tryLockInnerAsync()和设置看门狗程序进行锁续期scheduleExpirationRenewal()
获取锁的逻辑根据类型的不同有多种实现
image.png
默认情况下是非公平方式获取锁RedissonLock

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    String script =
        "if (redis.call('exists', KEYS[1]) == 0) then " +
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " +
        "end; " +
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " +
        "end; " +
        "return redis.call('pttl', KEYS[1]);";
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command, script,
                          Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

这里使用了lua脚本处理,使用了redis的hash结构,key保存锁名称,field保存某个线程的锁名称lockName:threadId,value保存了重入次数。
即:lockName -> (lockName:threadId, num).
看门狗程序设置

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        // 续期
        renewExpiration();
    }
}

续期逻辑

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 创建异步延迟任务
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
           // redis key重置过期时间
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                // 再重置定时续期任务
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 续期:默认10s去续期一次internalLockLeaseTime/3 = 10

    ee.setTimeout(task);
}

重置redis过期时间

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    // 存在就重置过期时间
    String script = 
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return 1; " +
        "end; " +
        "return 0;";
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, script,
                          Collections.singletonList(getName()), // keys
                          internalLockLeaseTime, getLockName(threadId) // params
                         );
}

释放锁:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    // 一次一的释放锁
    // 释放完成时删除key,同时发布事件给监听端
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                          "return nil;" +
                          "end; " +
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          "if (counter > 0) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                          "return 0; " +
                          "else " +
                          "redis.call('del', KEYS[1]); " +
                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return nil;",
                          Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

Lua脚本

时间轮