整体结构
面向开发人员的API | |
---|---|
acquireShared() |
获取共享模式的锁,忽略中断 |
acquireSharedInterruptibly() |
获取共享模式的锁,发生中断会终止 |
tryAcquireSharedNanos() |
获取共享模式的锁,在指定时间内等待,超时/中断取消 |
releaseShared() |
释放共享模式的锁 |
由开发人员具体实现的方法 | |
tryAcquireShared() |
尝试获取共享锁,模板方法,由子类实现 |
tryReleaseShared() |
尝试释放共享锁,模板方法,由子类实现 |
底层实现 | |
doAcquireShared() |
底层真正执行入队、持续获取锁的方法 |
doAcquireSharedInterruptibly() |
同上,增加了中断抛出中断异常的功能 |
doAcquireSharedNanos() |
同上,增加了中断抛出中断异常的功能,增加了超时机制 |
上锁
点击查看【processon】
• 里面蓝色的方块表明要铺开分析的部分;虚线的菱形方块表示模板方法,其指向的方块是具体实现。
整体将划分为:
- 进入
acquireShared()
,分析acquireShared()
的源码 - 学习
tryAcquireShared()
的具体实现,看看ReentrantReadWriteLock
如何实现的tryAcquireShared()
- 学习
doAcquireShared()
我们略过 ReentrantReadWriteLock
(这部分源码放在后续单独讲解),直接看 acquireShared()
是如何实现的:
在
Share
模式下,AQS
已经定义好了tryAcquireShared()
的返回语义了:tryAcquireShared()
返回值>=0
,获取锁成功tryAcquireShared()
返回值<0
,获取锁失败;准备进入等待队列
这部分流程是不是很眼熟?那我就不多分析了,如果忘了去《Locks - AQS之Exclusive模式》补习一下~// AbstractQueuedSynchronizer#acquireShared
public final void acquireShared(int arg) {
// 尝试获取Shared模式的锁。如果获取失败,则进入等待队列
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
如果获取锁失败了,进入队列是如何实现的?
// AbstractQueuedSynchronizer#doAcquireShared()
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 创建一个节点入队。不过此时,是SHARED模式
boolean failed = true; // 获取锁的结果
try {
boolean interrupted = false; // 记录是否被终端
for (;;) {
final Node p = node.predecessor(); // 获取当前节点node的前驱节点
if (p == head) { // 如果前驱节点为HEAD,继续向下执行
int r = tryAcquireShared(arg); // 尝试获取共享锁
if (r >= 0) { // 获取成功,则通知后续的节点进入共享模式
setHeadAndPropagate(node, r); // 重点分析!
p.next = null; // help GC
if (interrupted) // 处理中断
selfInterrupt();
failed = false; // 获取锁成功
return;
}
}
// 这部分逻辑就不再分析了吧~没获取到锁就进入睡眠
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
小伙伴们是不是对共享模式的锁感到奇怪呢?哪里贡献了?看着怎么还像是互斥锁一样?其实这部分主要有子类决定哪些算是共享的~如果支持共享的情况下 tryAcquireShared() 就会返回大于0,就不会进入后续的等待队列了;如果不支持才会进入等待队列,我们先看一下 setHeadAndPropagate() 的实现,然后举个例子串起来理解一下~ |
|
---|---|
// java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 保留之前的head,之后还有用
setHead(node); // 将当前的node设置为HEAD!
// 这部分为什么这么写,看后面的分析,先按捺住自己蠢蠢欲动的好奇心^_^
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 获取当前节点的后续节点
Node s = node.next;
// 如果后续节点为null或者为share,释放自己
if (s == null || s.isShared())
doReleaseShared();
}
}
该方法会将当前节点设置为 HEAD
,并唤醒后续节点,如果后续节点也是等待获取共享锁的节点,就以传播的方式唤醒头结点之后紧邻的各个共享结点。
释放锁
这部分纯看代码完全没有任何问题,但是想要结合多线程来思考为什么这么设计就很难理解。这部分将会结合 setHeadAndPropagate()
一起放在后文进行分析,为何如此设计!
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 如果HEAD不为NULL,或者HEAD不为TAIL
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 这部分主要针对后续节点是EXCLUDE的节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // 唤醒后续节点(仍然可能会和共享锁发生竞争)
}
// 让自己(HEAD)变为PROPAGATE,是不是很蒙蔽?为啥又这个操作
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) // 如果HEAD发生变化了就break
break;
}
}
共享模式的设计思路
在共享模式里,对个别的一些状态判断有点懵懵懂懂,不知其所以然,为何要这样设计,笔者打算将所有的情况列出来方便大家理解。
背景描述: 有A、B、C、D四条线程,A、B是获取共享锁,C、D是获取互斥锁。共享锁用绿色方块表示,互斥锁用红色方块表示~ |
|
---|---|
情况1——共享锁先发制人
- 当A线程优先获取到锁时,它直接获取共享锁成功!
- 随后,C线程姗姗来迟,尝试获取互斥锁。因为A线程已经持有了共享锁,所以C线程获取失败,进入等待队列:
- 与此同时,B线程也尝试获取共享锁,因为A线程已经持有了共享锁,B线程也自然而然地获取成功!
- 而当A线程(或者B线程)想要
doReleaseShared()
时,它尝试获取同步等待队列里的HEAD
,此时HEAD
的waitStatus = SIGNAL
,所以被CAS
设置为了0
,然后唤醒后续的节点,即C线程
情况2——独占锁后来居上
- 当C线程优先获取到锁时,它直接获取独占锁成功!
- 随后,A线程缓缓到达现场,尝试获取共享锁。因为C线程已经持有独占锁,所以A线程获取失败,进入等待队列:
- 之后,B线程尝试获取共享锁,也获取失败,也因此进入等待队列:
- 此时,等待队列中分别由A、B两个线程在等待。当C线程释放锁时(假设无外来竞争),此时A线程会被 **
HEAD
** (即C线程)唤醒,A线程获取锁成功并调用setHeadAndPropagate()
,将自己设为HEAD
并调用doReleaseShared()
唤醒后面的B线程:
- 前面讲到A线程调用了
doreleseShared()
,因此会唤醒后续的B线程,B线程也会和A线程一样获取锁成功,然后调用setHeadAndPropagate()
,将自己设为HEAD
,由于B线程是最后一个节点,故没有后续。
线程竞争——解释setHeadAndPropagate
以往使用 synchronized
时候,可以让大面积的代码处于互斥状态,可以不太用考虑那么多并发条件;而现在设计的无锁 AQS
,使用的 CAS
操作粒度细,只能针对某一个变量,那在同一个时间点,其余的作为并发条件的共享状态变量就会成为破防因素。
私以为面向这种无锁的资源竞争问题,我们更要重点关注资源共享情况,比如 setHeadAndPropagate()
方法里后面就有一堆奇怪的状态判断,要想理解它们,就要从并发入手(不然设计人干嘛写这么一大串?):
private void setHeadAndPropagate(Node node, int propagate) {
...
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
propagate
,表示 剩余资源,在tryAccquireShare()
后会返回一个r
,一般来说这个r
就是剩余可用的共享资源- 第一个
h == null
,属于判空操作,CSDN
上有篇博客说,这里的h == null
是永远不可能成立的,而我则持反对意见:Old Head
可能会被回收,所以这里的判断是有意义的 - 第一个
h.waitStatus < 0
,old head
在触发doReleaseShared()
时会将SIGNAL
设置为0
中间状态;如果用户又触发了一次doReleaseShared()
时就会将0
变为PROGAGATE
,可以参考下图:
- 然后我们看看情况1:如果
SIGNAL
变为0
后,进入unparkSuccessor
,唤醒后一个节点 - 情况2:用户在
h
线程doReleaseShared
过程中又调了一遍doReleaseShared
,h
线程就会从0
变为PROGAGATE
- 第二个
(h = head) == null
也属于判空操作,但这个是否真有可能为空就不太确定了 - 第二个
h.waitStatus <0
,如果有后续等待节点,那么也同样成立;否则不会成立。
- 第二个
如果前面任一条件满足了,后续会有新的两个条件判断:
s == null
,如果当前node
为队列里的最后一个节点时则成立s != null && s.isShared()
,则doReleseShared()
setHeadAndPropagate()
比较麻烦的点就这么多了,有很多情况需要知晓一下,但总的来说,一个方法想要对哪些条件做判断,就去思考哪些方法,哪些情况会对这些条件涉及的变量有修改,把这些情况列出来,就能逐一理解为何这么写了。
作者在这个方法里的注释中还有写道:
The conservatism in both of these checks may cause unnecessary wake-ups, but only when there are multiple racing acquires/releases, so most need signals now or soon anyway.
大致意思就是该方法可能会存在一些不必要的唤醒,比如同步队列如下所示:
互斥锁-共享锁-共享锁-互斥锁
这种情况下第二个共享锁醒了之后会唤醒后继的互斥锁,互斥锁因为拿不到锁,又得进去休眠~
感觉自己离并发编程还有好长一段路要走,简单的就是这么缜密的思维,佩服佩服!
总结
感觉这篇文章写的没那么完美,但是各种巧妙设计都体验了一遍,一环扣一环的高内聚代码,实在是强。简单来说, AQS
里的Share
模式主要就是增加了连续的共享锁唤醒。而这个连续唤醒一排的共享锁的操作中可能会出现其他的线程竞争问题(多个判断),这也是后面花大笔墨想去说明清楚的东西。