前言
AQS 不仅实现了独占功能和共享功能,还实现了 Condition 的功能。
Condition 是 Lock 的伴侣,接下来就通过 ReentrantLock 来分析在 AQS 中 Condition 的实现。
ConditionObject
ConditionObject 是 AQS 内的一个内部类,实现了 Condition 接口。这个帮助类保存了等待队列的第一个等待节点和最后一个等待节点。
ConditionObject 类图
Condition 介绍
Condition 在 JDK1.5 中才被引入,它可以替代传统的 Object 中的 wait(), notify() 和 notifyAll() 方法来实现线程间的通信,使线程间协作更加安全和高效。
Condition 是一个接口,它的定义如下:
public interface Condition {void await() throws InterruptedException;void awaitUninterruptibly();long awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;void signal();void signalAll();}
Condition 定义的部分方法及描述:
| 方法名称 | 描述 | 
|---|---|
| await | 当线程进入等待状态直到被通知(signal)或中断 | 
| awaitUninterruptibly | 当前线程进入等待状态直到被通知,不受中断影响 | 
| awaitNanos(long nanosTimeout) | 当前线程进入等待状态直到被通知、中断或超时 | 
| awaitUntil(Date deadline) | 当前线程进入等待状态直到被通知、中断或者到某个时间 | 
| signal | 唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关的锁 | 
| signalAll | 唤醒所有等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关的锁 | 
常用的方法是 await(), signal() 和 signalAll(),Condition 和 Object 类中的方法对应如下:
| Object | Condition | 
|---|---|
| wait() | await() | 
| notify() | signal() | 
| notifyAll() | signalAll() | 
既然功能都一样,为什么还需要使用 Condition 呢?
简单来说,Condition 需要和 Lock 一起使用,在不使用 Lock 时,使用 synchronized 时的代码如下:
synchronized(obj){obj.wait();}synchronized(obj){obj.notify();}
使用 Lock 时的代码如下:
lock.lock();condition.await();lock.unlock();lock.lock();condition.signal();lock.unlock();
从代码上可以看出,使用 synchronized  关键字时,所有没有获取锁的线程都会等待,这时相当于只有 1 个等待队列。
而在实际应用中可能有时需要多个等待队列,比如 ReadLock 和 WriteLock。Lock 中的等待队列和 Condition 中的等待队列是分开的,例如在独占模式下,Lock 的独占保证了在同一时刻只会有一个线程访问临界区,也就是 lock() 方法返回后,Condition 等待队列保存着被阻塞的线程,也就是调用 await() 方法后阻塞的线程。
所以 Lock 比 synchronized 更灵活、更公平。
为什么调用 await 方法时需要持有锁
假设您有一些线程,需要消费某些元素。 有一个队列,有且仅有队列中有元素之后,线程才能处理它。 队列必须是线程安全的,因此必须由锁保护。 您可能会编写以下代码:
- 获取锁。
 - 检查队列是否为空。
 - 如果队列为空,当前线程等待,其它线程将元素放入队列。
 
很可惜,这行不通。 我们将锁保持在队列上,另一个线程如何在其上添加元素? 让我们再试一次:
- 获取锁。
 - 检查队列是否为空。
 - 如果队列为空,当前线程释放锁并等待,其它线程将元素放入队列。
 
这个逻辑依然有问题,如果在释放锁之后,队列上刚添加元素,其它线程就消费了元素,等来了个寂寞。
await 的职责是释放锁,并让调用线程休眠(原子地)。所以 await 操作必须是原子性的。所以当前线程必须持有锁,防止其它线程进入临界区。这样复杂的步骤也是为了避免线程在休眠时,产生一些竞态条件。
Condition 的使用
在 Condition 接口的 javadoc 中,有一个很好的例子来使用 Condition,代码如下:
class BoundedBuffer {final Lock lock = new ReentrantLock();final Condition notFull = lock.newCondition();final Condition notEmpty = lock.newCondition();final Object[] items = new Object[100];int putptr, takeptr, count;public void put(Object x) throws InterruptedException {lock.lock();try {while (count == items.length)notFull.await();items[putptr] = x;if (++putptr == items.length) putptr = 0;++count;notEmpty.signal();} finally {lock.unlock();}}public Object take() throws InterruptedException {lock.lock();try {while (count == 0)notEmpty.await();Object x = items[takeptr];if (++takeptr == items.length) takeptr = 0;--count;notFull.signal();return x;} finally {lock.unlock();}}}
代码很简单,定义了一个数组 items,put 用于向 items 中添加数据,take 用于从 items 中取出数据,count 代表当前 items 中存放了多少个对象,putptr 表示下一个需要添加的索引,takeptr 表示下一个需要取出的索引,这样就实现了数组的循环添加和取出数据的功能。
put 和 take 的具体功能如下:
put
- 当 count 与 items 的长度相同时,表示数组已满,则调用 notFull.await() 来等待同时释放了当前线程的锁;
 - 当线程被唤醒时,将 x 添加到 putptr 索引的位置;
 - 如果当前 putptr 的位置是最后一个,则下一个索引的位置从 0 开始;
 调用 notEmpty.signal(); 通知其他线程可以从数组中取出数据了。
take
当 count 为0时,表示数组是空的,则调用 notEmpty.await() 来等待同时释放了当前线程的锁;
- 当线程被唤醒时,将 x 添加到 takeptr 索引的位置;
 - 如果当前 takeptr 的位置是最后一个,则下一个索引的位置从 0 开始;
 - 调用 notFull.signal(),通知其他线程可以向数组中添加数据了。
AQS 中 Condition 的实现
本文还是通过 ReentrantLock 来分析。
Condition 必须被绑定到一个独占锁上使用,在 ReentrantLock 中,有一个 newCondition 方法,该方法调用了 Sync 中的 newCondition 方法,看下 Sync 中 newCondition 的实现:
ConditionObject 是在 AQS 中定义的,它实现了 Condition 接口,自然也就实现了上述的 Condition 接口中的方法。该类中有两个重要的变量:final ConditionObject newCondition() {return new ConditionObject();}
这里的 firstWaiter 和 lastWaiter 是不是和 AQS 中的 head 和 tail 有些类似,而且都是 Node 类型的。/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;
对于 Condition 来说,它是不与独占模式或共享模式使用同一个队列的,它有自己的队列,所以这两个变量表示了队列的头节点和尾节点。await(一)
总结一个这个方法的逻辑:public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 创建一个新的节点,追加到 Condition Queue 中尾节点.Node node = addConditionWaiter();// 释放这个锁,并唤醒 Sync Queue 队列中一个线程.int savedState = fullyRelease(node);int interruptMode = 0;// isOnSyncQueue判断这个节点是否在 Sync Queue 队列上,第一次判断总是返回 falsewhile (!isOnSyncQueue(node)) {// 第一次总是 park 自己,开始阻塞等待LockSupport.park(this);// 如果中断就退出if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)// 如果被中断了,就跳出循环break;}...}
 
- 在 Condition 中,维护着一个队列,每当执行 await 方法,都会根据当前线程创建一个节点,并添加到 
Condition Queue尾部; - 然后释放锁,唤醒阻塞在 
Sync Queue的一个线程,并将自己阻塞; - 在被别的线程唤醒后(别的线程调用 signal 方法),将自身节点放回 
Sync Queue中,接下来应该抢占式地获取锁; 
addConditionWaiter
// 该方法就是创建一个当前线程的节点,追加到最后一个节点中.private Node addConditionWaiter() {// 找到最后一个节点,放在局部变量中,速度更快Node t = lastWaiter;// 如果最后一个节点失效了,就清除链表中所有失效节点,并重新赋值 tif (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 创建一个当前线程的 node 节点Node node = new Node(Thread.currentThread(), Node.CONDITION);// 如果最后一个节点是 nullif (t == null)// 将当前节点设置成第一个节点firstWaiter = node;else// 如果不是 null, 将当前节点追加到最后一个节点t.nextWaiter = node;// 将当前节点设置成最后一个节点lastWaiter = node;// 返回return node;}
unlinkCancelledWaiters
// 清除链表中所有失效的节点.private void unlinkCancelledWaiters() {Node t = firstWaiter;// 当 next 正常的时候,需要保存这个 next, 方便下次循环是链接到下一个节点上.Node trail = null;while (t != null) {Node next = t.nextWaiter;// 如果这个节点被取消了if (t.waitStatus != Node.CONDITION) {// 先将他的 next 节点设置为 nullt.nextWaiter = null;// 如果这是第一次判断 trail 变量if (trail == null)// 将 next 变量设置为 first, 也就是去除之前的 first(由于是第一次,肯定去除的是 first)firstWaiter = next;else// 如果不是 null,说明上个节点正常,将上个节点的 next 设置为无效节点的 next, 让 t 失效trail.nextWaiter = next;// 如果 next 是 null, 说明没有节点了,那么就可以将 trail 设置成最后一个节点if (next == null)lastWaiter = trail;}// 如果该节点正常,那么就保存这个节点,在下次链接下个节点时使用elsetrail = t;// 换下一个节点继续循环t = next;}}
fullyRelease
这个方法主要是用于释放锁,并唤醒 Sync Queue 中一个节点。
final int fullyRelease(Node node) {boolean failed = true;try {// 获取 state 变量int savedState = getState();// 如果释放成功,则返回 state 的大小,也就是之前持有锁的线程的数量if (release(savedState)) {failed = false;return savedState;} else {// 如果释放失败,抛出异常throw new IllegalMonitorStateException();}} finally {//释放失败if (failed)// 将这个节点是指成取消状态.随后将从队列中移除.node.waitStatus = Node.CANCELLED;}}
release
// 主要功能,就是释放锁,并唤醒阻塞在锁上的线程.public final boolean release(int arg) {// 如果释放锁成功,返回 true, 可能会抛出监视器异常,即当前线程不是持有锁的线程.// 也可能是释放失败,但 fullyRelease 基本能够释放成功.if (tryRelease(arg)) {// 释放成功后, 唤醒 head 的下一个节点上的线程.Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}// 释放失败return false;}
tryRelease
// 主要功能就是对 state 变量做减法, 如果 state 变成0,则将持有锁的线程设置成 null.protected final boolean tryRelease(int releases) {// 计算 stateint c = getState() - releases;// 如果当前线程不是持有该锁的线程,则抛出异常if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 如果结果是 0,说明成功释放了锁.if (c == 0) {free = true;// 将持有当前锁的线程设置成 null.setExclusiveOwnerThread(null);}// 设置变量setState(c);return free;}
我们到这大概直到了 Condition 是如何释放锁,那么它是如何将自己阻塞的呢?
在将自己阻塞之前,需要调用 isOnSyncQueue 方法判断,代码如下:
final boolean isOnSyncQueue(Node node) {// 如果他的状态不是等地啊,且他的上一个节点是 null, 便不在队列中了// 这里判断 == CONDITION,实际上是第一次判断,而后面的判断则是线程醒来后的判断.if (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 如果他的 next 不是 null, 说明他还在队列上.if (node.next != null) // If has successor, it must be on queuereturn true;// 如果从 tail 开始找上一个节点,找到了给定的节点,说明也在队列上.返回 true.return findNodeFromTail(node);}
实际上,第一次总是会返回 false,在 while 中取反,从而进入死循环,调用 park 方法,将自己阻塞。
至此,Condition 成功地释放了所在的 Lock 锁,并将自己阻塞。
acquireQueued
// 返回结果:是否被中断了, 当返回 false 就是拿到锁了,反之没有拿到.final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 返回他的上一个节点final Node p = node.predecessor();// 如果这个节点的上个节点是 head, 且成功获取了锁.if (p == head && tryAcquire(arg)) {// 将当前节点设置成 headsetHead(node);// 他的上一个节点(head)设置成 null.p.next = null; // help GCfailed = false;// 返回 false,没有中断return interrupted;}// shouldParkAfterFailedAcquire >>> 如果没有获取到锁,就尝试阻塞自己等待(上个节点的状态是 -1 SIGNAL).// parkAndCheckInterrupt >>>> 返回自己是否被中断了.if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
这里如果不能立马获取到锁的话,就会在 parkAndCheckInterrupt 方法中阻塞。这里就和 Sync Queue 的获取锁逻辑一模一样。
awaitNanos
public final long awaitNanos(long nanosTimeout)throws InterruptedException {...int interruptMode = 0;while (!isOnSyncQueue(node)) {if (nanosTimeout <= 0L) {transferAfterCancelledWait(node);break;}if (nanosTimeout >= spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;nanosTimeout = deadline - System.nanoTime();}...return deadline - System.nanoTime();}
该方法进行超时控制,功能与 await 方法类似,不用点在于该方法中每次 park 是有时间限制的。spinForTimeoutThreshold 的作用还需要再了解一下。
tryAcquire
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// 如果锁的状态是空闲的.if (c == 0) {// !hasQueuedPredecessors() >>> 是否前面还有等待的节点, false >> 没有// compareAndSetState >>> CAS 设置 state 变量成功// 设置当前线程为锁的持有线程成功if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);// 上面 3 个条件都满足, 抢锁成功.return true;}}// 如果 state 状态不是0, 且当前线程和锁的持有线程相同,则认为是重入.else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
主要逻辑是设置 state 变量,将锁的持有线程变为自己。这些是在没有比自己等待时间长的线程的情况下发生的。也就是说,优先试等待时间久的线程获得锁。当然,这里还有一些重入的逻辑。
signal
public final void signal() {// 如果当前线程不是持有该锁的线程.抛出异常if (!isHeldExclusively())throw new IllegalMonitorStateException();// 拿到 Condition 队列上第一个节点Node first = firstWaiter;if (first != null)doSignal(first);}
doSignal
private void doSignal(Node first) {do {// 如果第一个节点的下一个节点是 null, 那么, 最后一个节点也是 null.if ((firstWaiter = first.nextWaiter) == null)lastWaiter = null;// 将 next 节点设置成 null.first.nextWaiter = null;// 如果修改这个 node 状态为0失败了(也就是唤醒失败), 并且 firstWaiter 不是 null, 就重新循环.// 通过从 First 向后找节点,直到唤醒或者没有节点为止.} while (!transferForSignal(first) &&(first = firstWaiter) != null);}
重点在于 transferForSignal 方法,该方法肯定做了唤醒操作。
transferForSignal
final boolean transferForSignal(Node node) {/** CAS操作尝试将Condition的节点的ws改为0* 如果失败,意味着:节点的ws已经不是CONDITION,说明节点已经被取消了* 如果成功,则该节点的状态ws被改为0了*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 将这个 node 放进 AQS 的队列,然后返回他的上一个节点.Node p = enq(node);int ws = p.waitStatus;// ws大于0 的情况只有 cancenlled,表示node的前驱节点取消了争取锁,那直接唤醒node线程// ws <= 0 会使用cas操作将前驱节点的ws置为signal,如果cas失败也会唤醒nodeif (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);// 如果成功修改了 node 的状态成0,就返回 true.return true;}
果然,看到了 unpark 操作,该方法先用 CAS 修改了节点状态。如果修改成功,将这个节点放到 Sync Queue 中,然后唤醒这个节点上的线程。
此时,那个节点就会在 await 方法中苏醒,并在执行 checkInterruptWhileWaiting 方法后开始尝试获取锁。
doSignalAll
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);}
doSignalAll 方法唤醒了所有的 Condition 节点,并加入到 Sync Queue。
await(二)
唤醒了线程后,我们再来看一下线程唤醒后需要做哪些操作,先看一下代码:
public final void await() throws InterruptedException {...while (!isOnSyncQueue(node)) {LockSupport.park(this);// 如果中断就退出if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 重新获取锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 已经重入同步队列,清除等待队列中的关系if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();// 如果线程被中断了,需要抛出异常.或者什么都不做if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
一旦调用 signal 之后,节点被成功转移到同步队列后,会开始执行上面的第5行代码。如果线程被中断,或节点已经进入了同步队列,则退出循环体,继续向下执行。
signal 方法除了会将节点转移到同步队列,同时会调用 LockSupport.unpark(node.thread) 方法,唤醒被挂起的线程。
唤醒之后,我们可以看到调用 checkInterruptWhileWaiting 方法检查等待期间是否发生了中断,如果不为 0 表示确实在等待期间发生了中断。
但其实这个方法的返回结果用 interruptMode 变量接收,拥有更加丰富的内涵,它还能够判断中断的时机是否在 signal 之前。interruptMode取值范围:
/** await 返回的时候,需要重新设置中断状态 */private static final int REINTERRUPT = 1;/** await 返回的时候,需要抛出 InterruptedException 异常 */private static final int THROW_IE = -1;
checkInterruptWhileWaiting
该方法用于判断该线程是否在挂起期间发生了中断。
private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?// 如果处于中断状态,返回true,且将重置中断状态(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :// 如果中断了,判断何时中断0; // 没有中断, 返回0}
transferAfterCancelledWait
该方法判断何时中断,是否在signal之前。
final boolean transferAfterCancelledWait(Node node) {// 尝试使用CAS操作将node的ws设置为0// 如果节点的状态是Condition,说明还没有进入同步队列,因为调用signal的时候// 必然进入等待队列,这里不太可能成功。除非在signal之前被中断了,状态一直没有变化。// 所以调用signal之前发生了中断,这里会返回trueif (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {// 就算中断了,也将节点入队enq(node);return true;}/** 这里就是signal之后发生的中断* 但是signal可能还在进行转移中,这边自旋等一下它完成*/while (!isOnSyncQueue(node))Thread.yield();return false;}
Condition 执行步骤示意图
总结
ConditionObject 是 AQS 的内部类,它实现了 Condition 接口,提供了类似 wait, notify 和 notifyAll 类似的功能。
Condition 必须和一个独占锁绑定使用,在 await 或 signal 之前必须持有独占锁。Condition 队列是一个单向链表,它是公平的,按照先进先出的顺序从队列中被唤醒并添加到 Sync Queue 中,这时便恢复了参与竞争锁的资格。
Condition Queue 和 Sync Queue 是不同的,Condition Queue 是单向的,队列的第一个节点 firstWaiter 是可以绑定线程的;而 Sync Queue 是双向的,队列的第一个节点 head 是不与线程绑定的。
Condition 在设计时充分考虑了 Object 中监视器方法的缺陷,设计为一个 lock 可以对应多个 Condition,从而可以使线程分散到多个等待队列中,使得应用更为灵活,并且在实现上使用了 FIFO 队列来保存等待线程,确保了可以做到使用 signal 按 FIFO 方式唤醒等待线程,避免每次唤醒所有线程导致数据竞争。
但这样也会导致使用更加复杂,在实际应用中谨慎使用多个等待队列。
