上锁
- 里面蓝色的方块表明要铺开分析的部分;虚线的菱形方块表示模板方法,其指向的方块是具体实现
整体将划分为:
- 进入
acquire()
,分析acquire()
的源码 - 学习
tryAcquire()
的具体实现,看看ReentrantLock
如何实现的tryAcquire()
- 学习
addWaiter()
- 学习
acquireQueued()
下面就直奔主题:
acquire()
属于public final
级方法,它规划了获取锁的整体流程。它首先调用了模板方法tryAcquire()
来尝试获取锁,如果获取锁失败,则进入等待队列;如果获取成功,就直接返回// AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
// 尝试acquire
// 如果tryAcquire失败了,则将当前线程封装为节点入队
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
下面看看
tryAcquire()
的具体实现,我们当前以ReentrantLock
为例:// ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); // 获取当前线程
int c = getState(); // 获取当前同步器的同步状态
if (c == 0) { // state == 0 表示没有人持有锁
if (compareAndSetState(0, acquires)) { // 尝试获取锁
setExclusiveOwnerThread(current); // 如果获取成功,则将自己设置为锁持有者
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果持有锁的线程就是自己本身(重入锁)
int nextc = c + acquires; // 增加次数
if (nextc < 0) // 溢出
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; // 获取失败进入等待
}
| 在
tryAcquire()
的实现里面,我们可以看到两点:
- 虽然等待队列是FIFO
的,但是acquire()
先执行tryAcquire()
会让新来的线程有机会直接获取到锁(在释放锁的瞬间,新线程直接tryAcquire()
)。这是非公平锁的原理。
- 进入tryAcquire()
方法后,发现持有锁的就是自己本身,那么就对**state**
进行递增;释放锁时也逐个递减。这就是锁可重入的原理。
|
| | —- | —- | | | |当
tryAcquire()
失败时,该线程就要准备进入等待队列——addWaiter()
// AbstractQueuedSynchronizer#addWaiter()
private Node addWaiter(Node mode) {
// 将当前线程封装为Node
Node node = new Node(Thread.currentThread(), mode);
// 这部分是简单版的入队操作,主要原因是为了提高效率;否则进入enq(),就是完整的入队操作
Node pred = tail;
if (pred != null) { // TAIL不为空
node.prev = pred; // 将node的prev属性设置为tail(链表式队列的操作)
if (compareAndSetTail(pred, node)) { // 尝试CAS将node作为TAIL
pred.next = node; // 如果CAS成功,那么直接设置TAIL的next属性
return node;
}
}
enq(node); // 进入完整的入队操作
return node;
}
————————————————————————————————————————————————————————————————————————————————
// AbstractQueuedSynchronizer#enq()
private Node enq(final Node node) {
for (;;) { // 经典的死循环
Node t = tail; // 流程和简化版入队很像,就是多了一步初始化
if (t == null) { // 如果TAIL为null,说明还没有初始化
if (compareAndSetHead(new Node())) // 随便搞个Node对象,先初始化队列
tail = head;
} else {
node.prev = t; // 将node的prev属性设置为TAIL(和前面一样的)
if (compareAndSetTail(t, node)) { // 后续的逻辑和简化版一样
t.next = node;
return t;
}
}
}
}
| 这里有两个细节要讨论一下:
1. 区分简化版和完整版的入队操作。针对已经完成初始化的同步器,在没有大量线程竞争的情况,通过简化版入队少了几步操作,提高了速度
1. 不知道各位小伙伴有没有想过除了CAS
以外,其他的操作会不会涉及并发问题?这里给个思路,并发问题都是因为同个资源上的共享读写造成的。对于一个等待队列来说,大家竞争的都是**HEAD/TAIL**
。至于每个节点的属性,只要设置HEAD/TAIL
成功就没问题啦~
|
| | —- | —- | | | |进入
acquireQueued
的节点都会一直请求获取锁,直到被取消或者被中断// AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 悲观判断
try {
// 记录是否被interrupt过
boolean interrupted = false;
for (;;) {
// !!!!分析片段0 !!!!
// 获取当前节点的前驱节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 如果前驱节点是HEAD,尝试acquire
setHead(node); // 将node设置为HEAD
p.next = null; // 将原来的HEAD,即p的next属性清空。防止node无法GC
failed = false; // failed设为false
return interrupted;
}
// 如果自己的前驱节点并不是HEAD
// 设置状态并准备进入阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// !!!!分析片段1 !!!!
// 最终判断
if (failed)
// 取消accquire
cancelAcquire(node);
}
}
接下来看看
shouldParkAfterFailedAcquire()
,这部分涉及到Node
的waitStatus
:// AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 获取前驱节点的waitStatus
if (ws == Node.SIGNAL) // 如果前驱节点是SIGNAL,那么自己(node)就进入阻塞
return true;
if (ws > 0) { // 如果前驱节点已经被取消了,将头尾没有取消的节点连接起来
// 然后再重新判断一波
// !!!!分析片段2 !!!!
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 当前驱节点的waitStatus为0或者PROPAGATE时,设置前驱节点为SIGNAL并重新尝试
// 一开始所有的节点的waitStatus都为0,所以都会进入else块
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
该方法都是针对前驱节点的状态进行操作的,所以可以得到这么一个规律:
点击查看【processon】
只要有新节点入队,新节点的前驱节点如果不是SIGNAL
,就将前驱节点设置为SIGNAL
并让自己进入阻塞状态。
当当当~又是我~第四阶段的代码比较长,但是逻辑还是简单的。我挑了三个感觉有坑的片段进行分析: - 分析片段0:为什么要通过前驱节点来判断?这是什么意思 - 分析片段1:这部分涉及取消上锁的逻辑,后面讲 - 分析片段2:这里不会有并发问题吗?这部分处理的范围主要在当前节点之前的所有空节点,遇到非空节点就停下来;又因为前面(除HEAD)都处于阻塞状态,每个节点都只处理它前面的一个节点,所以不必担心有并发问题 |
|
---|---|
总结一下, AQS
上锁时如果没有线程竞争,就不会初始化同步队列(说明同步队列是懒加载的);如果发生了线程竞争,没有抢到锁的剩余线程会被封装为 Node
并被连接起来,串成一个链式双向队列,此时等待中的线程都会处于阻塞状态,并等待自己的前驱节点唤醒自己。
释放锁
本节内容比较简单,主要就是涉及 waitStatus
的变化以及 HEAD
节点和 HEAD
后继节点的变化:
点击查看【processon】
当A线程调用了
ReentrantLock.release()
(其本质就是AQS#release()
)public final boolean release(int arg) { // 调用子类的实现 if (tryRelease(arg)) { // 获取头节点,此时的头结点只有next属性仍然存在,thread、prev属性均被清除 // 详细看setHead()方法 Node h = head; // 如果HEAD部位空,而且waitStatus不为0 if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 解除后续节点的阻塞 return true; } return false; }
接下来看看
tryRelease()
,这个方法是由ReentrantLock
实现的:protected final boolean tryRelease(int releases) { // 计算当前线程持有锁的个数 int c = getState() - releases; // 防止未持有锁的线程瞎搞 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
tryRelease()
的关键点如下所示:
tryRelease()
传入的参数:如果state
减去releases
参数不为0,依然无法释放锁。这是因为ReentrantLock
是可重入锁,要把所有的锁释放了才能真正释放锁
- 最后看看
unparkSuccessor()
,该方法的主要作用就是为了唤醒后继节点:
简单来说就是如果后继节点被取消了或者为private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 获取HEAD if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 将HEAD的waitStatus设置为0 /* * 获取头结点后面的结点s * 如果结点s是null或者s已经被取消了 * 就从后往前遍历,直到找到一个满足条件的结点?? */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) // 查找最靠近HEAD的 且 waitStatus<=0 的节点 if (t.waitStatus <= 0) s = t; } // 如果s != null,则唤醒s if (s != null) LockSupport.unpark(s.thread); }
null
,就去找离自己最近的可唤醒的节点,找到后唤醒;被唤醒的节点在acquireQueued()
方法里再次尝试获取锁。
取消/中断
本节介绍方法 cancelAcquire()
,该方法都出现在 acquireXXX()
最后的 finally
块中,而且需要 failed
属性为 true
(在获取锁的过程中被打断了),才会进入 cancelAcquire()
。而可以取消的结点,要么是定时器到时间了,要么是线程被中断了:
/**
* 10s过后,假设A线程仍持有锁,而B线程的tryLock()到期,那么B线程因为
* throw new InterruptedException()而跳出for循环进入finally块,准备执行
* cancelAcquire()
* PS:此情况下C尚未到期,因为方便写:)
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 获取当前结点的前驱结点
Node pred = node.prev;
/* 判断前驱结点是否为取消状态;
* 如果是取消状态的话,就一直向前查找,直到找到非CANCELLED状态的结点
*/
// 本示例里pred是A结点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
/* 不管三七二十一,来了这个方法,这个结点就注定要设置成CANCELLED
* 那会不会有其他线程同时对这个结点的状态进行操作?
* 有也没关系,它的顺序无论在其他CAS写操作之前还是之后最终都会设置为
*/
// ①到这一步,本示例里B结点的ws转为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前结点是尾结点,那么就将predNext(从后到前第一个非CANCELLED状态的结点)设置为尾结点
// 本示例里B结点不是尾结点,跳过if进入else
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果pred不是头结点,就设置状态为SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {
Node next = node.next;
/* 当前结点是否有后继结点
* 若有后继结点且不是CANCELLED状态,则将后继结点和pred相连
*/
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
/*
* 如果pred是头结点就唤醒后继的非CANCELLED结点
*/
} else {
// ② 唤醒后面的非CANCELLED状态的结点
unparkSuccessor(node);
}
/* 很骚的操作,一般都是设置null来help gc,这里采用循环引用当新一波的
* tryAcquire()之后,会将CANCELLED的引用清理掉,所以最后就变成不可达的
* 引用,自然就被gc了
*/
node.next = node;
}
}
流程图大致是这样的:
到这里,整个 cancelAcquire()
的流程结束了,最后的 unparkSuccessor()
方法唤醒C,具体的可以参考前面的 release()
一节。
其他上锁操作
AQS
不仅提供了常规的 acquire()
还提供了支持中断的 acquireInterruptibly()
、支持计时的 tryAcquireNanos
。使用这些方法进入等待队列的节点(线程)在被中断/超时后会被取消,即 cancelAcquire()
。
可中断的上锁 - acquireInterruptibly
acquireInterruptibly()
方法支持用户去中断,它的内部逻辑比较简单:首先尝试一下 tryAcquire()
,如果没有竞争,那获取锁成功;如果失败了,则进入等待队列。
// AbstractQueuedSynchronizer#acquireInterruptibly()
public final void acquireInterruptibly(int arg)
throws InterruptedException { // 注意!他之类把 InterruptedException 抛出来了
if (Thread.interrupted()) // 如果检测到了当前线程已经被中断,就直接抛出
throw new InterruptedException();
if (!tryAcquire(arg)) // 尝试tryAcquire()
doAcquireInterruptibly(arg); // 可中断的acqurie()
}
如果 tryAcquire()
失败了,这个线程就要准备好进入等待队列,即调用 doAcquireInterruptibly()
。该方法的逻辑 acquireQueued()
基本一致,主要区别就是被中断后抛出了中断异常:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 处理状态 && 阻塞并检查中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可超时的上锁 - tryAcquireNanos
这部分逻辑和 acquire()
有异曲同工之妙,基本逻辑都差不多:
- 首先尝试上锁——
tryAcquire()
,如果成功了就不用创建等待队列了;若发生了竞争且竞争失败了就进入doAcquireNanos()
doAcquireNanos()
仅是在acquireQueued()
基础上增加了计时功能
首先看 tryAcquireNanos()
:先检查中断状态,如果已经中断了直接抛出异常;然后尝试tryAcquire()
,如果 tryAcquire()
失败了,就进入等待队列
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
随后看 doAcquireNanos()
,该方法在 acquireQueued()
基础上加了时限。唯一注意的点就是这里用到了自旋,在时间没超过 AQS.spinForTimeoutThreshold
之前,一直处于 for
循环(其实也就1000纳秒的时间处于自旋)。当超过自旋时间,进入 LockSupport.parkNanos()
,故名思意,就是带计时的 park()
~
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L) // 设置的时间小于0,直接返回
return false;
final long deadline = System.nanoTime() + nanosTimeout; // 超时时限
final Node node = addWaiter(Node.EXCLUSIVE); // 创建一个新的结点,注意EXCLUSIVE
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) // 如果当前时间超过了时限,就当获取锁失败,返回false
return false;
// 如果剩余超时的时间小于 spinForTimeoutThreshold 就没必要阻塞,直接自选
// 如果剩余超时的时间大于 spinForTimeoutThreshold 就进入阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout); // 带计时功能的park
if (Thread.interrupted()) // 发现已经中断了,就抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
总结一下“上锁操作”,“上锁操作”在独占模式下分为三种面对开发人员的API: acquire() 、 acquireInterruptibly() 、 tryAcquireNanos() ,它们分别支持了普通上锁、可中断上锁、超时上锁,主要的流程都是先尝试 tryAcquire() ,如果获取锁失败才考虑进入等待队列。 |
|
---|---|
公平锁实现原理
前面讲过了“非公平锁”的实现原理,我们可以知道这样一个流程:lock() -> AQS#tryAcquire() -> 具体实现
而“公平”、“非公平”就是在这个“具体实现”里完成的。对于“非公平”来说,只需要正常acquire()
即可,就算发生了抢占也没事;对于“公平”来说,如果等待队列里有其他等待线程,则需要排队。由于整个队列本身就是FIFO
的,所以只需要保证tryAcquire()
时不要发生插队即可,那么事实上也确实是这么做的:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
如果发现这个等待队列里还有其他元素,则返回true
;此时外部的判断:
static final class FairSync extends Sync {
...
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果队列中有其他元素,则 !hasQueuedPredecessors() == false
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
...
return false;
}
}