redis的常用命令./redis-cli -p 6380 shutdown 指定端口关闭
./redis-cli -p 6379 -h 127.0.0.1 指定端口号和ip启动
redisson
loadBalancer负载均衡模式
RandomLoadBalancer - 随机调度算法
public ClientConnectionsEntry getEntry(List
//java.util.concurrent.ThreadLocalRandom;
int ind = ThreadLocalRandom.current().nextInt(clientsCopy.size());
return clientsCopy.get(ind);
}
RoundRobinLoadBalancer -轮询调度算法
private final AtomicInteger index = new AtomicInteger(-1);//cas,保证变量在多线程下的准确性
public ClientConnectionsEntry getEntry(List
int ind = Math.abs(index.incrementAndGet() % clientsCopy.size());
return clientsCopy.get(ind);
}
WeightedRoundRobinBalancer - 权重轮询调度算法
RedissonLock中主要方法的源码分析
lock 方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired 获取锁成功
if (ttl == null) {
return;
}
// 通过异步方式订阅Redis的channel,阻塞方式获取订阅结果
RFuture
commandExecutor.syncSubscription(future);
try {<br /> while (true) {<br /> // 通过循环判断,直到锁获取成功,经典写法。<br /> ttl = tryAcquire(leaseTime, unit, threadId);<br /> // lock acquired 锁获取成功,跳出循环<br /> if (ttl == null) {<br /> break;<br /> }
// waiting for message 如果剩余时间 TTL 大于0,从Entry的信号量获取一个许可(除非发生中断或者一直不存在可用的许可)<br /> // 否则就在wait time时间范围内等待可以通过的信号量<br /> if (ttl >= 0) {<br /> try {<br /> getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);<br /> } catch (InterruptedException e) {<br /> if (interruptibly) {<br /> throw e;<br /> }<br /> getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);<br /> }<br /> } else {<br /> if (interruptibly) {<br /> getEntry(threadId).getLatch().acquire();<br /> } else {<br /> getEntry(threadId).getLatch().acquireUninterruptibly();<br /> }<br /> }<br /> }<br /> } finally {<br /> // 无论最终获取锁是否成功,都需要取消订阅解锁消息,防止死锁发生。<br /> unsubscribe(future, threadId);<br /> }<br />// get(lockAsync(leaseTime, unit));<br /> }<br />tryLock方法<br /> /**<br /> * <br /> * @param waitTime the maximum time to aquire the lock 最大等待时间,一般在多线程环境下,建议为0,这是为了防止多个线程同时获取到锁<br /> * @param leaseTime lease time 锁的有效时间<br /> * @param unit time unit 锁有效时间的单位<br /> * @return<br /> * @throws InterruptedException<br /> */<br /> @Override<br /> public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {<br /> //取得最大等待时间<br /> long time = unit.toMillis(waitTime);<br /> //记录当下时间<br /> long current = System.currentTimeMillis();<br /> //取得当前线程id(判断是否可重入锁的关键)<br /> long threadId = Thread.currentThread().getId();<br /> //1.申请锁,返回还剩余的锁过期时间<br /> Long ttl = tryAcquire(leaseTime, unit, threadId);<br /> // lock acquired<br /> // 2.如果ttl为空,表示锁申请成功<br /> // 获取锁同时获取成功的情况下,和lock(...)方法是一样的 直接返回True,获取锁False再往下走<br /> if (ttl == null) {<br /> return true;<br /> }<br /> //3.申请锁的耗时如果大于等于最大等待时间,则申请锁失败<br /> time -= System.currentTimeMillis() - current;<br /> if (time <= 0) {<br /> acquireFailed(threadId);<br /> return false;<br /> }<br /> <br /> current = System.currentTimeMillis();<br /> //订阅监听redis的消息,并创建RedissonLockEntry<br /> //其中,RedissonLockEntry中比较关键的是一个Semaphore<br /> //属性对象用来控制本地的锁的请求的信号量同步,返回Netty框架的Future<br /> /**<br /> * 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:<br /> * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争<br /> * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败<br /> * 当 this.await返回true,进入循环尝试获取锁<br /> */<br /> RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);<br /> // 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,<br /> // 说明已经超过了客户端设置的最大的wait time,直接返回false,取消订阅,并且不会再继续申请锁<br /> //await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future)<br /> if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {<br /> if (!subscribeFuture.cancel(false)) {<br /> subscribeFuture.onComplete((res, e) -> {<br /> if (e == null) {<br /> unsubscribe(subscribeFuture, threadId);<br /> }<br /> });<br /> }<br /> acquireFailed(threadId);<br /> return false;<br /> }
try {<br /> //计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败<br /> time -= System.currentTimeMillis() - current;<br /> if (time <= 0) {<br /> acquireFailed(threadId);<br /> return false;<br /> }<br /> /**<br /> * 5.收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁<br /> * 获取锁成功,则立马返回true,<br /> * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回false结束循环<br /> */<br /> while (true) {<br /> long currentTime = System.currentTimeMillis();<br /> // 再次尝试申请一次锁<br /> ttl = tryAcquire(leaseTime, unit, threadId);<br /> // lock acquired<br /> // 成功获取锁则直接返回true结束循环<br /> if (ttl == null) {<br /> return true;<br /> }<br /> //超过最大等待时间则返回false结束循环,获取锁失败<br /> time -= System.currentTimeMillis() - currentTime;<br /> if (time <= 0) {<br /> // 不等待申请锁并返回<br /> acquireFailed(threadId);<br /> return false;<br /> }
// waiting for message<br /> /**<br /> * 6.阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):<br /> */<br /> currentTime = System.currentTimeMillis();<br /> // 通过信号量(共享锁)进行阻塞,等待解锁消息<br /> // 如果剩余时间 TTL 小于wait time,就在ttl时间内<br /> // 从Entry的信号量获取一个许可(除非发生中断或者一直不存在可用的许可)<br /> // 否则就在wait time时间范围内等待可以通过的信号量<br /> if (ttl >= 0 && ttl < time) {<br /> //如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。<br /> getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);<br /> } else {<br /> //则就在wait time 时间范围内等待可以通过信号量<br /> getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);<br /> }<br /> // 更新等待时间,(最大等待时间-已经消耗的阻塞时间)<br /> time -= System.currentTimeMillis() - currentTime;<br /> if (time <= 0) {<br /> // 等待时间小于等于0,不等待申请锁并返回<br /> acquireFailed(threadId);<br /> return false;<br /> }<br /> }<br /> } finally {<br /> // 7.无论最终获取锁是否成功,都需要取消订阅解锁消息,防止死锁发生。<br /> unsubscribe(subscribeFuture, threadId);<br /> }<br />// return get(tryLockAsync(waitTime, leaseTime, unit));<br /> }
/
其中 tryAcquire 内部通过调用 tryLockInnerAsync 实现申请锁的逻辑。申请锁并返回锁有效期还剩余的时间,
如果为空说明锁未被其它线程申请则直接获取并返回,如果获取到时间,则进入等待竞争逻辑。
@param leaseTime
@param unit
@param threadId
@return
*/
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
unlock方法
/
unlock 内部通过 get(unlockAsync(Thread.currentThread().getId())) 调用 unlockInnerAsync 解锁
/
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
参考博文:
https://crazyfzw.github.io/2019/04/15/distributed-locks-with-redis/
http://wuwenliang.net/2019/07/23/%E5%86%8D%E8%B0%88%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E4%B9%8B%E5%89%96%E6%9E%90Redis%E5%AE%9E%E7%8E%B0/?utm_source=tuicool&utm_medium=referral#%E8%8E%B7%E5%8F%96RLock%E5%AE%9E%E4%BE%8B
https://blog.csdn.net/zilong_zilong/article/details/78252037
https://blog.csdn.net/xiaxiaorui2003/article/details/7065699
https://www.jianshu.com/p/2b19dec72ab0