上锁
- 里面蓝色的方块表明要铺开分析的部分;虚线的菱形方块表示模板方法,其指向的方块是具体实现
整体将划分为:
- 进入
acquire(),分析acquire()的源码 - 学习
tryAcquire()的具体实现,看看ReentrantLock如何实现的tryAcquire() - 学习
addWaiter() - 学习
acquireQueued()
下面就直奔主题:
acquire()属于public final级方法,它规划了获取锁的整体流程。它首先调用了模板方法tryAcquire()来尝试获取锁,如果获取锁失败,则进入等待队列;如果获取成功,就直接返回// AbstractQueuedSynchronizer#acquirepublic final void acquire(int arg) {// 尝试acquire// 如果tryAcquire失败了,则将当前线程封装为节点入队if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
下面看看
tryAcquire()的具体实现,我们当前以ReentrantLock为例:// ReentrantLock.NonfairSync#tryAcquireprotected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}// ReentrantLock.Sync#nonfairTryAcquirefinal boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread(); // 获取当前线程int c = getState(); // 获取当前同步器的同步状态if (c == 0) { // state == 0 表示没有人持有锁if (compareAndSetState(0, acquires)) { // 尝试获取锁setExclusiveOwnerThread(current); // 如果获取成功,则将自己设置为锁持有者return true;}}else if (current == getExclusiveOwnerThread()) { // 如果持有锁的线程就是自己本身(重入锁)int nextc = c + acquires; // 增加次数if (nextc < 0) // 溢出throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false; // 获取失败进入等待}
| 在
tryAcquire()的实现里面,我们可以看到两点:
- 虽然等待队列是FIFO的,但是acquire()先执行tryAcquire()会让新来的线程有机会直接获取到锁(在释放锁的瞬间,新线程直接tryAcquire())。这是非公平锁的原理。
- 进入tryAcquire()方法后,发现持有锁的就是自己本身,那么就对**state**进行递增;释放锁时也逐个递减。这就是锁可重入的原理。
|
|
| —- | —- |
| | |当
tryAcquire()失败时,该线程就要准备进入等待队列——addWaiter()// AbstractQueuedSynchronizer#addWaiter()private Node addWaiter(Node mode) {// 将当前线程封装为NodeNode node = new Node(Thread.currentThread(), mode);// 这部分是简单版的入队操作,主要原因是为了提高效率;否则进入enq(),就是完整的入队操作Node pred = tail;if (pred != null) { // TAIL不为空node.prev = pred; // 将node的prev属性设置为tail(链表式队列的操作)if (compareAndSetTail(pred, node)) { // 尝试CAS将node作为TAILpred.next = node; // 如果CAS成功,那么直接设置TAIL的next属性return node;}}enq(node); // 进入完整的入队操作return node;}————————————————————————————————————————————————————————————————————————————————// AbstractQueuedSynchronizer#enq()private Node enq(final Node node) {for (;;) { // 经典的死循环Node t = tail; // 流程和简化版入队很像,就是多了一步初始化if (t == null) { // 如果TAIL为null,说明还没有初始化if (compareAndSetHead(new Node())) // 随便搞个Node对象,先初始化队列tail = head;} else {node.prev = t; // 将node的prev属性设置为TAIL(和前面一样的)if (compareAndSetTail(t, node)) { // 后续的逻辑和简化版一样t.next = node;return t;}}}}
| 这里有两个细节要讨论一下:
1. 区分简化版和完整版的入队操作。针对已经完成初始化的同步器,在没有大量线程竞争的情况,通过简化版入队少了几步操作,提高了速度
1. 不知道各位小伙伴有没有想过除了CAS以外,其他的操作会不会涉及并发问题?这里给个思路,并发问题都是因为同个资源上的共享读写造成的。对于一个等待队列来说,大家竞争的都是**HEAD/TAIL**。至于每个节点的属性,只要设置HEAD/TAIL成功就没问题啦~
|
|
| —- | —- |
| | |进入
acquireQueued的节点都会一直请求获取锁,直到被取消或者被中断// AbstractQueuedSynchronizer#acquireQueuedfinal boolean acquireQueued(final Node node, int arg) {boolean failed = true; // 悲观判断try {// 记录是否被interrupt过boolean interrupted = false;for (;;) {// !!!!分析片段0 !!!!// 获取当前节点的前驱节点final Node p = node.predecessor();if (p == head && tryAcquire(arg)) { // 如果前驱节点是HEAD,尝试acquiresetHead(node); // 将node设置为HEADp.next = null; // 将原来的HEAD,即p的next属性清空。防止node无法GCfailed = false; // failed设为falsereturn interrupted;}// 如果自己的前驱节点并不是HEAD// 设置状态并准备进入阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {// !!!!分析片段1 !!!!// 最终判断if (failed)// 取消accquirecancelAcquire(node);}}
接下来看看
shouldParkAfterFailedAcquire(),这部分涉及到Node的waitStatus:// AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire()private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; // 获取前驱节点的waitStatusif (ws == Node.SIGNAL) // 如果前驱节点是SIGNAL,那么自己(node)就进入阻塞return true;if (ws > 0) { // 如果前驱节点已经被取消了,将头尾没有取消的节点连接起来// 然后再重新判断一波// !!!!分析片段2 !!!!do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 当前驱节点的waitStatus为0或者PROPAGATE时,设置前驱节点为SIGNAL并重新尝试// 一开始所有的节点的waitStatus都为0,所以都会进入else块compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
该方法都是针对前驱节点的状态进行操作的,所以可以得到这么一个规律:
点击查看【processon】
只要有新节点入队,新节点的前驱节点如果不是SIGNAL,就将前驱节点设置为SIGNAL并让自己进入阻塞状态。
| 当当当~又是我~第四阶段的代码比较长,但是逻辑还是简单的。我挑了三个感觉有坑的片段进行分析: - 分析片段0:为什么要通过前驱节点来判断?这是什么意思 - 分析片段1:这部分涉及取消上锁的逻辑,后面讲 - 分析片段2:这里不会有并发问题吗?这部分处理的范围主要在当前节点之前的所有空节点,遇到非空节点就停下来;又因为前面(除HEAD)都处于阻塞状态,每个节点都只处理它前面的一个节点,所以不必担心有并发问题 |
![]() |
|---|---|
总结一下, AQS 上锁时如果没有线程竞争,就不会初始化同步队列(说明同步队列是懒加载的);如果发生了线程竞争,没有抢到锁的剩余线程会被封装为 Node 并被连接起来,串成一个链式双向队列,此时等待中的线程都会处于阻塞状态,并等待自己的前驱节点唤醒自己。
释放锁
本节内容比较简单,主要就是涉及 waitStatus 的变化以及 HEAD 节点和 HEAD 后继节点的变化:
点击查看【processon】
当A线程调用了
ReentrantLock.release()(其本质就是AQS#release())public final boolean release(int arg) { // 调用子类的实现 if (tryRelease(arg)) { // 获取头节点,此时的头结点只有next属性仍然存在,thread、prev属性均被清除 // 详细看setHead()方法 Node h = head; // 如果HEAD部位空,而且waitStatus不为0 if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 解除后续节点的阻塞 return true; } return false; }接下来看看
tryRelease(),这个方法是由ReentrantLock实现的:protected final boolean tryRelease(int releases) { // 计算当前线程持有锁的个数 int c = getState() - releases; // 防止未持有锁的线程瞎搞 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }tryRelease()的关键点如下所示:
tryRelease()传入的参数:如果state减去releases参数不为0,依然无法释放锁。这是因为ReentrantLock是可重入锁,要把所有的锁释放了才能真正释放锁
- 最后看看
unparkSuccessor(),该方法的主要作用就是为了唤醒后继节点:
简单来说就是如果后继节点被取消了或者为private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 获取HEAD if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 将HEAD的waitStatus设置为0 /* * 获取头结点后面的结点s * 如果结点s是null或者s已经被取消了 * 就从后往前遍历,直到找到一个满足条件的结点?? */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) // 查找最靠近HEAD的 且 waitStatus<=0 的节点 if (t.waitStatus <= 0) s = t; } // 如果s != null,则唤醒s if (s != null) LockSupport.unpark(s.thread); }null,就去找离自己最近的可唤醒的节点,找到后唤醒;被唤醒的节点在acquireQueued()方法里再次尝试获取锁。
取消/中断
本节介绍方法 cancelAcquire() ,该方法都出现在 acquireXXX() 最后的 finally 块中,而且需要 failed 属性为 true (在获取锁的过程中被打断了),才会进入 cancelAcquire() 。而可以取消的结点,要么是定时器到时间了,要么是线程被中断了:
/**
* 10s过后,假设A线程仍持有锁,而B线程的tryLock()到期,那么B线程因为
* throw new InterruptedException()而跳出for循环进入finally块,准备执行
* cancelAcquire()
* PS:此情况下C尚未到期,因为方便写:)
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 获取当前结点的前驱结点
Node pred = node.prev;
/* 判断前驱结点是否为取消状态;
* 如果是取消状态的话,就一直向前查找,直到找到非CANCELLED状态的结点
*/
// 本示例里pred是A结点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
/* 不管三七二十一,来了这个方法,这个结点就注定要设置成CANCELLED
* 那会不会有其他线程同时对这个结点的状态进行操作?
* 有也没关系,它的顺序无论在其他CAS写操作之前还是之后最终都会设置为
*/
// ①到这一步,本示例里B结点的ws转为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前结点是尾结点,那么就将predNext(从后到前第一个非CANCELLED状态的结点)设置为尾结点
// 本示例里B结点不是尾结点,跳过if进入else
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果pred不是头结点,就设置状态为SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {
Node next = node.next;
/* 当前结点是否有后继结点
* 若有后继结点且不是CANCELLED状态,则将后继结点和pred相连
*/
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
/*
* 如果pred是头结点就唤醒后继的非CANCELLED结点
*/
} else {
// ② 唤醒后面的非CANCELLED状态的结点
unparkSuccessor(node);
}
/* 很骚的操作,一般都是设置null来help gc,这里采用循环引用当新一波的
* tryAcquire()之后,会将CANCELLED的引用清理掉,所以最后就变成不可达的
* 引用,自然就被gc了
*/
node.next = node;
}
}
流程图大致是这样的:
到这里,整个 cancelAcquire() 的流程结束了,最后的 unparkSuccessor() 方法唤醒C,具体的可以参考前面的 release() 一节。
其他上锁操作
AQS 不仅提供了常规的 acquire() 还提供了支持中断的 acquireInterruptibly() 、支持计时的 tryAcquireNanos 。使用这些方法进入等待队列的节点(线程)在被中断/超时后会被取消,即 cancelAcquire() 。
可中断的上锁 - acquireInterruptibly
acquireInterruptibly() 方法支持用户去中断,它的内部逻辑比较简单:首先尝试一下 tryAcquire() ,如果没有竞争,那获取锁成功;如果失败了,则进入等待队列。
// AbstractQueuedSynchronizer#acquireInterruptibly()
public final void acquireInterruptibly(int arg)
throws InterruptedException { // 注意!他之类把 InterruptedException 抛出来了
if (Thread.interrupted()) // 如果检测到了当前线程已经被中断,就直接抛出
throw new InterruptedException();
if (!tryAcquire(arg)) // 尝试tryAcquire()
doAcquireInterruptibly(arg); // 可中断的acqurie()
}
如果 tryAcquire() 失败了,这个线程就要准备好进入等待队列,即调用 doAcquireInterruptibly() 。该方法的逻辑 acquireQueued() 基本一致,主要区别就是被中断后抛出了中断异常:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 处理状态 && 阻塞并检查中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可超时的上锁 - tryAcquireNanos
这部分逻辑和 acquire() 有异曲同工之妙,基本逻辑都差不多:
- 首先尝试上锁——
tryAcquire(),如果成功了就不用创建等待队列了;若发生了竞争且竞争失败了就进入doAcquireNanos() doAcquireNanos()仅是在acquireQueued()基础上增加了计时功能
首先看 tryAcquireNanos() :先检查中断状态,如果已经中断了直接抛出异常;然后尝试tryAcquire() ,如果 tryAcquire() 失败了,就进入等待队列
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
随后看 doAcquireNanos() ,该方法在 acquireQueued() 基础上加了时限。唯一注意的点就是这里用到了自旋,在时间没超过 AQS.spinForTimeoutThreshold 之前,一直处于 for 循环(其实也就1000纳秒的时间处于自旋)。当超过自旋时间,进入 LockSupport.parkNanos() ,故名思意,就是带计时的 park() ~
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L) // 设置的时间小于0,直接返回
return false;
final long deadline = System.nanoTime() + nanosTimeout; // 超时时限
final Node node = addWaiter(Node.EXCLUSIVE); // 创建一个新的结点,注意EXCLUSIVE
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) // 如果当前时间超过了时限,就当获取锁失败,返回false
return false;
// 如果剩余超时的时间小于 spinForTimeoutThreshold 就没必要阻塞,直接自选
// 如果剩余超时的时间大于 spinForTimeoutThreshold 就进入阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout); // 带计时功能的park
if (Thread.interrupted()) // 发现已经中断了,就抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
总结一下“上锁操作”,“上锁操作”在独占模式下分为三种面对开发人员的API: acquire() 、 acquireInterruptibly() 、 tryAcquireNanos() ,它们分别支持了普通上锁、可中断上锁、超时上锁,主要的流程都是先尝试 tryAcquire() ,如果获取锁失败才考虑进入等待队列。 |
![]() |
|---|---|
公平锁实现原理
前面讲过了“非公平锁”的实现原理,我们可以知道这样一个流程:lock() -> AQS#tryAcquire() -> 具体实现
而“公平”、“非公平”就是在这个“具体实现”里完成的。对于“非公平”来说,只需要正常acquire()即可,就算发生了抢占也没事;对于“公平”来说,如果等待队列里有其他等待线程,则需要排队。由于整个队列本身就是FIFO的,所以只需要保证tryAcquire()时不要发生插队即可,那么事实上也确实是这么做的:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
如果发现这个等待队列里还有其他元素,则返回true;此时外部的判断:
static final class FairSync extends Sync {
...
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果队列中有其他元素,则 !hasQueuedPredecessors() == false
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
...
return false;
}
}


