参考redisson:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
分布式锁
锁的类型
- 排他锁,不允许多个程序(线程、进程)同时访问某个共享资源
- 共享锁
redisson分布式锁使用
// 锁标识
String lockKey = "LOCK:TASK:XXX";
// 创建获取锁redisson客户端
RLock initingTaskLock = redissonClient.getLock(lockKey);
// 尝试获取锁
if (!initingTaskLock.tryLock()) {
return;
}
执行逻辑
public boolean tryLock() {
// 尝试获取锁
RFuture<Boolean> future = tryLockAsync();
// 阻塞获取
return get(future);
}
public RFuture<Boolean> tryLockAsync() {
// 异步获取锁
return tryLockAsync(Thread.currentThread().getId());
}
public RFuture<Boolean> tryLockAsync(long threadId) {
// 执行一次
return tryAcquireOnceAsync(-1, -1, null, threadId);
}
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
// 指定了锁过期时间
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
// 未指定锁过期时间
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
// 添加监听事件
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired 获取到锁
if (ttlRemaining) {
// 设置看门狗程序
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
这里比较重要的是获取锁操作tryLockInnerAsync()
和设置看门狗程序进行锁续期scheduleExpirationRenewal()
获取锁的逻辑根据类型的不同有多种实现
默认情况下是非公平方式获取锁RedissonLock
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
String script =
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);";
return evalWriteAsync(getName(), LongCodec.INSTANCE, command, script,
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
这里使用了lua脚本处理,使用了redis的hash结构,key保存锁名称,field保存某个线程的锁名称lockName:threadId
,value保存了重入次数。
即:lockName -> (lockName:threadId, num)
.
看门狗程序设置
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 续期
renewExpiration();
}
}
续期逻辑
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 创建异步延迟任务
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// redis key重置过期时间
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 再重置定时续期任务
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 续期:默认10s去续期一次internalLockLeaseTime/3 = 10
ee.setTimeout(task);
}
重置redis过期时间
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
// 存在就重置过期时间
String script =
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;";
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, script,
Collections.singletonList(getName()), // keys
internalLockLeaseTime, getLockName(threadId) // params
);
}
释放锁:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// 一次一的释放锁
// 释放完成时删除key,同时发布事件给监听端
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}