前言
AQS 不仅实现了独占功能和共享功能,还实现了 Condition 的功能。
Condition 是 Lock 的伴侣,接下来就通过 ReentrantLock 来分析在 AQS 中 Condition 的实现。
ConditionObject
ConditionObject 是 AQS 内的一个内部类,实现了 Condition 接口。这个帮助类保存了等待队列的第一个等待节点和最后一个等待节点。
ConditionObject 类图
Condition 介绍
Condition 在 JDK1.5 中才被引入,它可以替代传统的 Object 中的 wait(), notify() 和 notifyAll() 方法来实现线程间的通信,使线程间协作更加安全和高效。
Condition 是一个接口,它的定义如下:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();}
Condition 定义的部分方法及描述:
方法名称 | 描述 |
---|---|
await | 当线程进入等待状态直到被通知(signal)或中断 |
awaitUninterruptibly | 当前线程进入等待状态直到被通知,不受中断影响 |
awaitNanos(long nanosTimeout) | 当前线程进入等待状态直到被通知、中断或超时 |
awaitUntil(Date deadline) | 当前线程进入等待状态直到被通知、中断或者到某个时间 |
signal | 唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关的锁 |
signalAll | 唤醒所有等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关的锁 |
常用的方法是 await(), signal() 和 signalAll(),Condition 和 Object 类中的方法对应如下:
Object | Condition |
---|---|
wait() | await() |
notify() | signal() |
notifyAll() | signalAll() |
既然功能都一样,为什么还需要使用 Condition 呢?
简单来说,Condition 需要和 Lock 一起使用,在不使用 Lock 时,使用 synchronized
时的代码如下:
synchronized(obj){
obj.wait();
}
synchronized(obj){
obj.notify();
}
使用 Lock 时的代码如下:
lock.lock();
condition.await();
lock.unlock();
lock.lock();
condition.signal();
lock.unlock();
从代码上可以看出,使用 synchronized
关键字时,所有没有获取锁的线程都会等待,这时相当于只有 1 个等待队列。
而在实际应用中可能有时需要多个等待队列,比如 ReadLock 和 WriteLock。Lock 中的等待队列和 Condition 中的等待队列是分开的,例如在独占模式下,Lock 的独占保证了在同一时刻只会有一个线程访问临界区,也就是 lock() 方法返回后,Condition 等待队列保存着被阻塞的线程,也就是调用 await() 方法后阻塞的线程。
所以 Lock 比 synchronized
更灵活、更公平。
为什么调用 await 方法时需要持有锁
假设您有一些线程,需要消费某些元素。 有一个队列,有且仅有队列中有元素之后,线程才能处理它。 队列必须是线程安全的,因此必须由锁保护。 您可能会编写以下代码:
- 获取锁。
- 检查队列是否为空。
- 如果队列为空,当前线程等待,其它线程将元素放入队列。
很可惜,这行不通。 我们将锁保持在队列上,另一个线程如何在其上添加元素? 让我们再试一次:
- 获取锁。
- 检查队列是否为空。
- 如果队列为空,当前线程释放锁并等待,其它线程将元素放入队列。
这个逻辑依然有问题,如果在释放锁之后,队列上刚添加元素,其它线程就消费了元素,等来了个寂寞。
await 的职责是释放锁,并让调用线程休眠(原子地)。所以 await 操作必须是原子性的。所以当前线程必须持有锁,防止其它线程进入临界区。这样复杂的步骤也是为了避免线程在休眠时,产生一些竞态条件。
Condition 的使用
在 Condition 接口的 javadoc 中,有一个很好的例子来使用 Condition,代码如下:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}}
代码很简单,定义了一个数组 items,put 用于向 items 中添加数据,take 用于从 items 中取出数据,count 代表当前 items 中存放了多少个对象,putptr 表示下一个需要添加的索引,takeptr 表示下一个需要取出的索引,这样就实现了数组的循环添加和取出数据的功能。
put 和 take 的具体功能如下:
put
- 当 count 与 items 的长度相同时,表示数组已满,则调用 notFull.await() 来等待同时释放了当前线程的锁;
- 当线程被唤醒时,将 x 添加到 putptr 索引的位置;
- 如果当前 putptr 的位置是最后一个,则下一个索引的位置从 0 开始;
调用 notEmpty.signal(); 通知其他线程可以从数组中取出数据了。
take
当 count 为0时,表示数组是空的,则调用 notEmpty.await() 来等待同时释放了当前线程的锁;
- 当线程被唤醒时,将 x 添加到 takeptr 索引的位置;
- 如果当前 takeptr 的位置是最后一个,则下一个索引的位置从 0 开始;
- 调用 notFull.signal(),通知其他线程可以向数组中添加数据了。
AQS 中 Condition 的实现
本文还是通过 ReentrantLock 来分析。
Condition 必须被绑定到一个独占锁上使用,在 ReentrantLock 中,有一个 newCondition 方法,该方法调用了 Sync 中的 newCondition 方法,看下 Sync 中 newCondition 的实现:
ConditionObject 是在 AQS 中定义的,它实现了 Condition 接口,自然也就实现了上述的 Condition 接口中的方法。该类中有两个重要的变量:final ConditionObject newCondition() {
return new ConditionObject();}
这里的 firstWaiter 和 lastWaiter 是不是和 AQS 中的 head 和 tail 有些类似,而且都是 Node 类型的。/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
对于 Condition 来说,它是不与独占模式或共享模式使用同一个队列的,它有自己的队列,所以这两个变量表示了队列的头节点和尾节点。await(一)
总结一个这个方法的逻辑:public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 创建一个新的节点,追加到 Condition Queue 中尾节点.
Node node = addConditionWaiter();
// 释放这个锁,并唤醒 Sync Queue 队列中一个线程.
int savedState = fullyRelease(node);
int interruptMode = 0;
// isOnSyncQueue判断这个节点是否在 Sync Queue 队列上,第一次判断总是返回 false
while (!isOnSyncQueue(node)) {
// 第一次总是 park 自己,开始阻塞等待
LockSupport.park(this);
// 如果中断就退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
// 如果被中断了,就跳出循环
break;
}
...
}
- 在 Condition 中,维护着一个队列,每当执行 await 方法,都会根据当前线程创建一个节点,并添加到
Condition Queue
尾部; - 然后释放锁,唤醒阻塞在
Sync Queue
的一个线程,并将自己阻塞; - 在被别的线程唤醒后(别的线程调用 signal 方法),将自身节点放回
Sync Queue
中,接下来应该抢占式地获取锁;
addConditionWaiter
// 该方法就是创建一个当前线程的节点,追加到最后一个节点中.
private Node addConditionWaiter() {
// 找到最后一个节点,放在局部变量中,速度更快
Node t = lastWaiter;
// 如果最后一个节点失效了,就清除链表中所有失效节点,并重新赋值 t
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个当前线程的 node 节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果最后一个节点是 null
if (t == null)
// 将当前节点设置成第一个节点
firstWaiter = node;
else
// 如果不是 null, 将当前节点追加到最后一个节点
t.nextWaiter = node;
// 将当前节点设置成最后一个节点
lastWaiter = node;
// 返回
return node;
}
unlinkCancelledWaiters
// 清除链表中所有失效的节点.
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// 当 next 正常的时候,需要保存这个 next, 方便下次循环是链接到下一个节点上.
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 如果这个节点被取消了
if (t.waitStatus != Node.CONDITION) {
// 先将他的 next 节点设置为 null
t.nextWaiter = null;
// 如果这是第一次判断 trail 变量
if (trail == null)
// 将 next 变量设置为 first, 也就是去除之前的 first(由于是第一次,肯定去除的是 first)
firstWaiter = next;
else
// 如果不是 null,说明上个节点正常,将上个节点的 next 设置为无效节点的 next, 让 t 失效
trail.nextWaiter = next;
// 如果 next 是 null, 说明没有节点了,那么就可以将 trail 设置成最后一个节点
if (next == null)
lastWaiter = trail;
}
// 如果该节点正常,那么就保存这个节点,在下次链接下个节点时使用
else
trail = t;
// 换下一个节点继续循环
t = next;
}
}
fullyRelease
这个方法主要是用于释放锁,并唤醒 Sync Queue 中一个节点。
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取 state 变量
int savedState = getState();
// 如果释放成功,则返回 state 的大小,也就是之前持有锁的线程的数量
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 如果释放失败,抛出异常
throw new IllegalMonitorStateException();
}
} finally {
//释放失败
if (failed)
// 将这个节点是指成取消状态.随后将从队列中移除.
node.waitStatus = Node.CANCELLED;
}
}
release
// 主要功能,就是释放锁,并唤醒阻塞在锁上的线程.
public final boolean release(int arg) {
// 如果释放锁成功,返回 true, 可能会抛出监视器异常,即当前线程不是持有锁的线程.
// 也可能是释放失败,但 fullyRelease 基本能够释放成功.
if (tryRelease(arg)) {
// 释放成功后, 唤醒 head 的下一个节点上的线程.
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
// 释放失败
return false;
}
tryRelease
// 主要功能就是对 state 变量做减法, 如果 state 变成0,则将持有锁的线程设置成 null.
protected final boolean tryRelease(int releases) {
// 计算 state
int c = getState() - releases;
// 如果当前线程不是持有该锁的线程,则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果结果是 0,说明成功释放了锁.
if (c == 0) {
free = true;
// 将持有当前锁的线程设置成 null.
setExclusiveOwnerThread(null);
}
// 设置变量
setState(c);
return free;
}
我们到这大概直到了 Condition 是如何释放锁,那么它是如何将自己阻塞的呢?
在将自己阻塞之前,需要调用 isOnSyncQueue
方法判断,代码如下:
final boolean isOnSyncQueue(Node node) {
// 如果他的状态不是等地啊,且他的上一个节点是 null, 便不在队列中了
// 这里判断 == CONDITION,实际上是第一次判断,而后面的判断则是线程醒来后的判断.
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果他的 next 不是 null, 说明他还在队列上.
if (node.next != null) // If has successor, it must be on queue
return true;
// 如果从 tail 开始找上一个节点,找到了给定的节点,说明也在队列上.返回 true.
return findNodeFromTail(node);
}
实际上,第一次总是会返回 false,在 while 中取反,从而进入死循环,调用 park 方法,将自己阻塞。
至此,Condition 成功地释放了所在的 Lock 锁,并将自己阻塞。
acquireQueued
// 返回结果:是否被中断了, 当返回 false 就是拿到锁了,反之没有拿到.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 返回他的上一个节点
final Node p = node.predecessor();
// 如果这个节点的上个节点是 head, 且成功获取了锁.
if (p == head && tryAcquire(arg)) {
// 将当前节点设置成 head
setHead(node);
// 他的上一个节点(head)设置成 null.
p.next = null; // help GC
failed = false;
// 返回 false,没有中断
return interrupted;
}
// shouldParkAfterFailedAcquire >>> 如果没有获取到锁,就尝试阻塞自己等待(上个节点的状态是 -1 SIGNAL).
// parkAndCheckInterrupt >>>> 返回自己是否被中断了.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里如果不能立马获取到锁的话,就会在 parkAndCheckInterrupt 方法中阻塞。这里就和 Sync Queue 的获取锁逻辑一模一样。
awaitNanos
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
...
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
...
return deadline - System.nanoTime();
}
该方法进行超时控制,功能与 await 方法类似,不用点在于该方法中每次 park 是有时间限制的。spinForTimeoutThreshold
的作用还需要再了解一下。
tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果锁的状态是空闲的.
if (c == 0) {
// !hasQueuedPredecessors() >>> 是否前面还有等待的节点, false >> 没有
// compareAndSetState >>> CAS 设置 state 变量成功
// 设置当前线程为锁的持有线程成功
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
// 上面 3 个条件都满足, 抢锁成功.
return true;
}
}
// 如果 state 状态不是0, 且当前线程和锁的持有线程相同,则认为是重入.
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
主要逻辑是设置 state 变量,将锁的持有线程变为自己。这些是在没有比自己等待时间长的线程的情况下发生的。也就是说,优先试等待时间久的线程获得锁。当然,这里还有一些重入的逻辑。
signal
public final void signal() {
// 如果当前线程不是持有该锁的线程.抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 拿到 Condition 队列上第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
doSignal
private void doSignal(Node first) {
do {
// 如果第一个节点的下一个节点是 null, 那么, 最后一个节点也是 null.
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 将 next 节点设置成 null.
first.nextWaiter = null;
// 如果修改这个 node 状态为0失败了(也就是唤醒失败), 并且 firstWaiter 不是 null, 就重新循环.
// 通过从 First 向后找节点,直到唤醒或者没有节点为止.
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
重点在于 transferForSignal 方法,该方法肯定做了唤醒操作。
transferForSignal
final boolean transferForSignal(Node node) {
/*
* CAS操作尝试将Condition的节点的ws改为0
* 如果失败,意味着:节点的ws已经不是CONDITION,说明节点已经被取消了
* 如果成功,则该节点的状态ws被改为0了
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将这个 node 放进 AQS 的队列,然后返回他的上一个节点.
Node p = enq(node);
int ws = p.waitStatus;
// ws大于0 的情况只有 cancenlled,表示node的前驱节点取消了争取锁,那直接唤醒node线程
// ws <= 0 会使用cas操作将前驱节点的ws置为signal,如果cas失败也会唤醒node
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
// 如果成功修改了 node 的状态成0,就返回 true.
return true;
}
果然,看到了 unpark 操作,该方法先用 CAS 修改了节点状态。如果修改成功,将这个节点放到 Sync Queue 中,然后唤醒这个节点上的线程。
此时,那个节点就会在 await 方法中苏醒,并在执行 checkInterruptWhileWaiting 方法后开始尝试获取锁。
doSignalAll
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
doSignalAll 方法唤醒了所有的 Condition 节点,并加入到 Sync Queue。
await(二)
唤醒了线程后,我们再来看一下线程唤醒后需要做哪些操作,先看一下代码:
public final void await() throws InterruptedException {
...
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 如果中断就退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 已经重入同步队列,清除等待队列中的关系
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果线程被中断了,需要抛出异常.或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
一旦调用 signal 之后,节点被成功转移到同步队列后,会开始执行上面的第5行代码。如果线程被中断,或节点已经进入了同步队列,则退出循环体,继续向下执行。
signal 方法除了会将节点转移到同步队列,同时会调用 LockSupport.unpark(node.thread) 方法,唤醒被挂起的线程。
唤醒之后,我们可以看到调用 checkInterruptWhileWaiting 方法检查等待期间是否发生了中断,如果不为 0 表示确实在等待期间发生了中断。
但其实这个方法的返回结果用 interruptMode
变量接收,拥有更加丰富的内涵,它还能够判断中断的时机是否在 signal 之前。interruptMode
取值范围:
/** await 返回的时候,需要重新设置中断状态 */
private static final int REINTERRUPT = 1;
/** await 返回的时候,需要抛出 InterruptedException 异常 */
private static final int THROW_IE = -1;
checkInterruptWhileWaiting
该方法用于判断该线程是否在挂起期间发生了中断。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?// 如果处于中断状态,返回true,且将重置中断状态
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :// 如果中断了,判断何时中断
0; // 没有中断, 返回0
}
transferAfterCancelledWait
该方法判断何时中断,是否在signal之前。
final boolean transferAfterCancelledWait(Node node) {
// 尝试使用CAS操作将node的ws设置为0
// 如果节点的状态是Condition,说明还没有进入同步队列,因为调用signal的时候
// 必然进入等待队列,这里不太可能成功。除非在signal之前被中断了,状态一直没有变化。
// 所以调用signal之前发生了中断,这里会返回true
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 就算中断了,也将节点入队
enq(node);
return true;
}
/*
* 这里就是signal之后发生的中断
* 但是signal可能还在进行转移中,这边自旋等一下它完成
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
Condition 执行步骤示意图
总结
ConditionObject 是 AQS 的内部类,它实现了 Condition 接口,提供了类似 wait, notify 和 notifyAll 类似的功能。
Condition 必须和一个独占锁绑定使用,在 await 或 signal 之前必须持有独占锁。Condition 队列是一个单向链表,它是公平的,按照先进先出的顺序从队列中被唤醒并添加到 Sync Queue 中,这时便恢复了参与竞争锁的资格。
Condition Queue 和 Sync Queue 是不同的,Condition Queue 是单向的,队列的第一个节点 firstWaiter 是可以绑定线程的;而 Sync Queue 是双向的,队列的第一个节点 head 是不与线程绑定的。
Condition 在设计时充分考虑了 Object 中监视器方法的缺陷,设计为一个 lock 可以对应多个 Condition,从而可以使线程分散到多个等待队列中,使得应用更为灵活,并且在实现上使用了 FIFO 队列来保存等待线程,确保了可以做到使用 signal 按 FIFO 方式唤醒等待线程,避免每次唤醒所有线程导致数据竞争。
但这样也会导致使用更加复杂,在实际应用中谨慎使用多个等待队列。