前言

AQS 不仅实现了独占功能和共享功能,还实现了 Condition 的功能。
Condition 是 Lock 的伴侣,接下来就通过 ReentrantLock 来分析在 AQS 中 Condition 的实现。

ConditionObject

ConditionObject 是 AQS 内的一个内部类,实现了 Condition 接口。这个帮助类保存了等待队列的第一个等待节点和最后一个等待节点。

ConditionObject 类图

image.png

Condition 介绍

Condition 在 JDK1.5 中才被引入,它可以替代传统的 Object 中的 wait(), notify() 和 notifyAll() 方法来实现线程间的通信,使线程间协作更加安全和高效。
Condition 是一个接口,它的定义如下:

  1. public interface Condition {
  2. void await() throws InterruptedException;
  3. void awaitUninterruptibly();
  4. long awaitNanos(long nanosTimeout) throws InterruptedException;
  5. boolean await(long time, TimeUnit unit) throws InterruptedException;
  6. boolean awaitUntil(Date deadline) throws InterruptedException;
  7. void signal();
  8. void signalAll();}

Condition 定义的部分方法及描述:

方法名称 描述
await 当线程进入等待状态直到被通知(signal)或中断
awaitUninterruptibly 当前线程进入等待状态直到被通知,不受中断影响
awaitNanos(long nanosTimeout) 当前线程进入等待状态直到被通知、中断或超时
awaitUntil(Date deadline) 当前线程进入等待状态直到被通知、中断或者到某个时间
signal 唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关的锁
signalAll 唤醒所有等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关的锁

常用的方法是 await(), signal() 和 signalAll(),Condition 和 Object 类中的方法对应如下:

Object Condition
wait() await()
notify() signal()
notifyAll() signalAll()

既然功能都一样,为什么还需要使用 Condition 呢?
简单来说,Condition 需要和 Lock 一起使用,在不使用 Lock 时,使用 synchronized 时的代码如下:

  1. synchronized(obj){
  2. obj.wait();
  3. }
  4. synchronized(obj){
  5. obj.notify();
  6. }

使用 Lock 时的代码如下:

  1. lock.lock();
  2. condition.await();
  3. lock.unlock();
  4. lock.lock();
  5. condition.signal();
  6. lock.unlock();

从代码上可以看出,使用 synchronized 关键字时,所有没有获取锁的线程都会等待,这时相当于只有 1 个等待队列。
而在实际应用中可能有时需要多个等待队列,比如 ReadLock 和 WriteLock。Lock 中的等待队列和 Condition 中的等待队列是分开的,例如在独占模式下,Lock 的独占保证了在同一时刻只会有一个线程访问临界区,也就是 lock() 方法返回后,Condition 等待队列保存着被阻塞的线程,也就是调用 await() 方法后阻塞的线程。
所以 Lock 比 synchronized 更灵活、更公平。

为什么调用 await 方法时需要持有锁

假设您有一些线程,需要消费某些元素。 有一个队列,有且仅有队列中有元素之后,线程才能处理它。 队列必须是线程安全的,因此必须由锁保护。 您可能会编写以下代码:

  1. 获取锁。
  2. 检查队列是否为空。
  3. 如果队列为空,当前线程等待,其它线程将元素放入队列。

很可惜,这行不通。 我们将锁保持在队列上,另一个线程如何在其上添加元素? 让我们再试一次:

  1. 获取锁。
  2. 检查队列是否为空。
  3. 如果队列为空,当前线程释放锁并等待,其它线程将元素放入队列。

这个逻辑依然有问题,如果在释放锁之后,队列上刚添加元素,其它线程就消费了元素,等来了个寂寞。
await 的职责是释放锁,并让调用线程休眠(原子地)。所以 await 操作必须是原子性的。所以当前线程必须持有锁,防止其它线程进入临界区。这样复杂的步骤也是为了避免线程在休眠时,产生一些竞态条件。

Condition 的使用

在 Condition 接口的 javadoc 中,有一个很好的例子来使用 Condition,代码如下:

  1. class BoundedBuffer {
  2. final Lock lock = new ReentrantLock();
  3. final Condition notFull = lock.newCondition();
  4. final Condition notEmpty = lock.newCondition();
  5. final Object[] items = new Object[100];
  6. int putptr, takeptr, count;
  7. public void put(Object x) throws InterruptedException {
  8. lock.lock();
  9. try {
  10. while (count == items.length)
  11. notFull.await();
  12. items[putptr] = x;
  13. if (++putptr == items.length) putptr = 0;
  14. ++count;
  15. notEmpty.signal();
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  20. public Object take() throws InterruptedException {
  21. lock.lock();
  22. try {
  23. while (count == 0)
  24. notEmpty.await();
  25. Object x = items[takeptr];
  26. if (++takeptr == items.length) takeptr = 0;
  27. --count;
  28. notFull.signal();
  29. return x;
  30. } finally {
  31. lock.unlock();
  32. }
  33. }}

代码很简单,定义了一个数组 items,put 用于向 items 中添加数据,take 用于从 items 中取出数据,count 代表当前 items 中存放了多少个对象,putptr 表示下一个需要添加的索引,takeptr 表示下一个需要取出的索引,这样就实现了数组的循环添加和取出数据的功能。
put 和 take 的具体功能如下:

put

  1. 当 count 与 items 的长度相同时,表示数组已满,则调用 notFull.await() 来等待同时释放了当前线程的锁;
  2. 当线程被唤醒时,将 x 添加到 putptr 索引的位置;
  3. 如果当前 putptr 的位置是最后一个,则下一个索引的位置从 0 开始;
  4. 调用 notEmpty.signal(); 通知其他线程可以从数组中取出数据了。

    take

  5. 当 count 为0时,表示数组是空的,则调用 notEmpty.await() 来等待同时释放了当前线程的锁;

  6. 当线程被唤醒时,将 x 添加到 takeptr 索引的位置;
  7. 如果当前 takeptr 的位置是最后一个,则下一个索引的位置从 0 开始;
  8. 调用 notFull.signal(),通知其他线程可以向数组中添加数据了。

    AQS 中 Condition 的实现

    本文还是通过 ReentrantLock 来分析。
    Condition 必须被绑定到一个独占锁上使用,在 ReentrantLock 中,有一个 newCondition 方法,该方法调用了 Sync 中的 newCondition 方法,看下 Sync 中 newCondition 的实现:
    1. final ConditionObject newCondition() {
    2. return new ConditionObject();}
    ConditionObject 是在 AQS 中定义的,它实现了 Condition 接口,自然也就实现了上述的 Condition 接口中的方法。该类中有两个重要的变量:
    1. /** First node of condition queue. */
    2. private transient Node firstWaiter;
    3. /** Last node of condition queue. */
    4. private transient Node lastWaiter;
    这里的 firstWaiter 和 lastWaiter 是不是和 AQS 中的 head 和 tail 有些类似,而且都是 Node 类型的。
    对于 Condition 来说,它是不与独占模式或共享模式使用同一个队列的,它有自己的队列,所以这两个变量表示了队列的头节点和尾节点。

    await(一)

    1. public final void await() throws InterruptedException {
    2. if (Thread.interrupted())
    3. throw new InterruptedException();
    4. // 创建一个新的节点,追加到 Condition Queue 中尾节点.
    5. Node node = addConditionWaiter();
    6. // 释放这个锁,并唤醒 Sync Queue 队列中一个线程.
    7. int savedState = fullyRelease(node);
    8. int interruptMode = 0;
    9. // isOnSyncQueue判断这个节点是否在 Sync Queue 队列上,第一次判断总是返回 false
    10. while (!isOnSyncQueue(node)) {
    11. // 第一次总是 park 自己,开始阻塞等待
    12. LockSupport.park(this);
    13. // 如果中断就退出
    14. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    15. // 如果被中断了,就跳出循环
    16. break;
    17. }
    18. ...
    19. }
    总结一个这个方法的逻辑:
  • 在 Condition 中,维护着一个队列,每当执行 await 方法,都会根据当前线程创建一个节点,并添加到 Condition Queue 尾部;
  • 然后释放锁,唤醒阻塞在 Sync Queue 的一个线程,并将自己阻塞;
  • 在被别的线程唤醒后(别的线程调用 signal 方法),将自身节点放回 Sync Queue 中,接下来应该抢占式地获取锁;

唤醒后的操作等介绍了 signal 之后,再详细解释。

addConditionWaiter

  1. // 该方法就是创建一个当前线程的节点,追加到最后一个节点中.
  2. private Node addConditionWaiter() {
  3. // 找到最后一个节点,放在局部变量中,速度更快
  4. Node t = lastWaiter;
  5. // 如果最后一个节点失效了,就清除链表中所有失效节点,并重新赋值 t
  6. if (t != null && t.waitStatus != Node.CONDITION) {
  7. unlinkCancelledWaiters();
  8. t = lastWaiter;
  9. }
  10. // 创建一个当前线程的 node 节点
  11. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  12. // 如果最后一个节点是 null
  13. if (t == null)
  14. // 将当前节点设置成第一个节点
  15. firstWaiter = node;
  16. else
  17. // 如果不是 null, 将当前节点追加到最后一个节点
  18. t.nextWaiter = node;
  19. // 将当前节点设置成最后一个节点
  20. lastWaiter = node;
  21. // 返回
  22. return node;
  23. }

创建一个当前线程的节点,追加到最后一个节点中。

unlinkCancelledWaiters

  1. // 清除链表中所有失效的节点.
  2. private void unlinkCancelledWaiters() {
  3. Node t = firstWaiter;
  4. // 当 next 正常的时候,需要保存这个 next, 方便下次循环是链接到下一个节点上.
  5. Node trail = null;
  6. while (t != null) {
  7. Node next = t.nextWaiter;
  8. // 如果这个节点被取消了
  9. if (t.waitStatus != Node.CONDITION) {
  10. // 先将他的 next 节点设置为 null
  11. t.nextWaiter = null;
  12. // 如果这是第一次判断 trail 变量
  13. if (trail == null)
  14. // 将 next 变量设置为 first, 也就是去除之前的 first(由于是第一次,肯定去除的是 first)
  15. firstWaiter = next;
  16. else
  17. // 如果不是 null,说明上个节点正常,将上个节点的 next 设置为无效节点的 next, 让 t 失效
  18. trail.nextWaiter = next;
  19. // 如果 next 是 null, 说明没有节点了,那么就可以将 trail 设置成最后一个节点
  20. if (next == null)
  21. lastWaiter = trail;
  22. }
  23. // 如果该节点正常,那么就保存这个节点,在下次链接下个节点时使用
  24. else
  25. trail = t;
  26. // 换下一个节点继续循环
  27. t = next;
  28. }
  29. }

该方法主要是清除链表中所有失效的节点。

fullyRelease

这个方法主要是用于释放锁,并唤醒 Sync Queue 中一个节点。

  1. final int fullyRelease(Node node) {
  2. boolean failed = true;
  3. try {
  4. // 获取 state 变量
  5. int savedState = getState();
  6. // 如果释放成功,则返回 state 的大小,也就是之前持有锁的线程的数量
  7. if (release(savedState)) {
  8. failed = false;
  9. return savedState;
  10. } else {
  11. // 如果释放失败,抛出异常
  12. throw new IllegalMonitorStateException();
  13. }
  14. } finally {
  15. //释放失败
  16. if (failed)
  17. // 将这个节点是指成取消状态.随后将从队列中移除.
  18. node.waitStatus = Node.CANCELLED;
  19. }
  20. }

release

  1. // 主要功能,就是释放锁,并唤醒阻塞在锁上的线程.
  2. public final boolean release(int arg) {
  3. // 如果释放锁成功,返回 true, 可能会抛出监视器异常,即当前线程不是持有锁的线程.
  4. // 也可能是释放失败,但 fullyRelease 基本能够释放成功.
  5. if (tryRelease(arg)) {
  6. // 释放成功后, 唤醒 head 的下一个节点上的线程.
  7. Node h = head;
  8. if (h != null && h.waitStatus != 0)
  9. unparkSuccessor(h);
  10. return true;
  11. }
  12. // 释放失败
  13. return false;
  14. }

tryRelease

  1. // 主要功能就是对 state 变量做减法, 如果 state 变成0,则将持有锁的线程设置成 null.
  2. protected final boolean tryRelease(int releases) {
  3. // 计算 state
  4. int c = getState() - releases;
  5. // 如果当前线程不是持有该锁的线程,则抛出异常
  6. if (Thread.currentThread() != getExclusiveOwnerThread())
  7. throw new IllegalMonitorStateException();
  8. boolean free = false;
  9. // 如果结果是 0,说明成功释放了锁.
  10. if (c == 0) {
  11. free = true;
  12. // 将持有当前锁的线程设置成 null.
  13. setExclusiveOwnerThread(null);
  14. }
  15. // 设置变量
  16. setState(c);
  17. return free;
  18. }

我们到这大概直到了 Condition 是如何释放锁,那么它是如何将自己阻塞的呢?
在将自己阻塞之前,需要调用 isOnSyncQueue 方法判断,代码如下:

  1. final boolean isOnSyncQueue(Node node) {
  2. // 如果他的状态不是等地啊,且他的上一个节点是 null, 便不在队列中了
  3. // 这里判断 == CONDITION,实际上是第一次判断,而后面的判断则是线程醒来后的判断.
  4. if (node.waitStatus == Node.CONDITION || node.prev == null)
  5. return false;
  6. // 如果他的 next 不是 null, 说明他还在队列上.
  7. if (node.next != null) // If has successor, it must be on queue
  8. return true;
  9. // 如果从 tail 开始找上一个节点,找到了给定的节点,说明也在队列上.返回 true.
  10. return findNodeFromTail(node);
  11. }

实际上,第一次总是会返回 false,在 while 中取反,从而进入死循环,调用 park 方法,将自己阻塞。
至此,Condition 成功地释放了所在的 Lock 锁,并将自己阻塞。

acquireQueued

  1. // 返回结果:是否被中断了, 当返回 false 就是拿到锁了,反之没有拿到.
  2. final boolean acquireQueued(final Node node, int arg) {
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false;
  6. for (;;) {
  7. // 返回他的上一个节点
  8. final Node p = node.predecessor();
  9. // 如果这个节点的上个节点是 head, 且成功获取了锁.
  10. if (p == head && tryAcquire(arg)) {
  11. // 将当前节点设置成 head
  12. setHead(node);
  13. // 他的上一个节点(head)设置成 null.
  14. p.next = null; // help GC
  15. failed = false;
  16. // 返回 false,没有中断
  17. return interrupted;
  18. }
  19. // shouldParkAfterFailedAcquire >>> 如果没有获取到锁,就尝试阻塞自己等待(上个节点的状态是 -1 SIGNAL).
  20. // parkAndCheckInterrupt >>>> 返回自己是否被中断了.
  21. if (shouldParkAfterFailedAcquire(p, node) &&
  22. parkAndCheckInterrupt())
  23. interrupted = true;
  24. }
  25. } finally {
  26. if (failed)
  27. cancelAcquire(node);
  28. }
  29. }

这里如果不能立马获取到锁的话,就会在 parkAndCheckInterrupt 方法中阻塞。这里就和 Sync Queue 的获取锁逻辑一模一样。

awaitNanos

  1. public final long awaitNanos(long nanosTimeout)
  2. throws InterruptedException {
  3. ...
  4. int interruptMode = 0;
  5. while (!isOnSyncQueue(node)) {
  6. if (nanosTimeout <= 0L) {
  7. transferAfterCancelledWait(node);
  8. break;
  9. }
  10. if (nanosTimeout >= spinForTimeoutThreshold)
  11. LockSupport.parkNanos(this, nanosTimeout);
  12. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  13. break;
  14. nanosTimeout = deadline - System.nanoTime();
  15. }
  16. ...
  17. return deadline - System.nanoTime();
  18. }

该方法进行超时控制,功能与 await 方法类似,不用点在于该方法中每次 park 是有时间限制的。
spinForTimeoutThreshold 的作用还需要再了解一下。

tryAcquire

  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. // 如果锁的状态是空闲的.
  5. if (c == 0) {
  6. // !hasQueuedPredecessors() >>> 是否前面还有等待的节点, false >> 没有
  7. // compareAndSetState >>> CAS 设置 state 变量成功
  8. // 设置当前线程为锁的持有线程成功
  9. if (!hasQueuedPredecessors() &&
  10. compareAndSetState(0, acquires)) {
  11. setExclusiveOwnerThread(current);
  12. // 上面 3 个条件都满足, 抢锁成功.
  13. return true;
  14. }
  15. }
  16. // 如果 state 状态不是0, 且当前线程和锁的持有线程相同,则认为是重入.
  17. else if (current == getExclusiveOwnerThread()) {
  18. int nextc = c + acquires;
  19. if (nextc < 0)
  20. throw new Error("Maximum lock count exceeded");
  21. setState(nextc);
  22. return true;
  23. }
  24. return false;
  25. }

主要逻辑是设置 state 变量,将锁的持有线程变为自己。这些是在没有比自己等待时间长的线程的情况下发生的。也就是说,优先试等待时间久的线程获得锁。当然,这里还有一些重入的逻辑。

signal

  1. public final void signal() {
  2. // 如果当前线程不是持有该锁的线程.抛出异常
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. // 拿到 Condition 队列上第一个节点
  6. Node first = firstWaiter;
  7. if (first != null)
  8. doSignal(first);
  9. }

很明显,唤醒是从头部开始的。

doSignal

  1. private void doSignal(Node first) {
  2. do {
  3. // 如果第一个节点的下一个节点是 null, 那么, 最后一个节点也是 null.
  4. if ((firstWaiter = first.nextWaiter) == null)
  5. lastWaiter = null;
  6. // 将 next 节点设置成 null.
  7. first.nextWaiter = null;
  8. // 如果修改这个 node 状态为0失败了(也就是唤醒失败), 并且 firstWaiter 不是 null, 就重新循环.
  9. // 通过从 First 向后找节点,直到唤醒或者没有节点为止.
  10. } while (!transferForSignal(first) &&
  11. (first = firstWaiter) != null);
  12. }

重点在于 transferForSignal 方法,该方法肯定做了唤醒操作。

transferForSignal

  1. final boolean transferForSignal(Node node) {
  2. /*
  3. * CAS操作尝试将Condition的节点的ws改为0
  4. * 如果失败,意味着:节点的ws已经不是CONDITION,说明节点已经被取消了
  5. * 如果成功,则该节点的状态ws被改为0了
  6. */
  7. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  8. return false;
  9. // 将这个 node 放进 AQS 的队列,然后返回他的上一个节点.
  10. Node p = enq(node);
  11. int ws = p.waitStatus;
  12. // ws大于0 的情况只有 cancenlled,表示node的前驱节点取消了争取锁,那直接唤醒node线程
  13. // ws <= 0 会使用cas操作将前驱节点的ws置为signal,如果cas失败也会唤醒node
  14. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  15. LockSupport.unpark(node.thread);
  16. // 如果成功修改了 node 的状态成0,就返回 true.
  17. return true;
  18. }

果然,看到了 unpark 操作,该方法先用 CAS 修改了节点状态。如果修改成功,将这个节点放到 Sync Queue 中,然后唤醒这个节点上的线程。
此时,那个节点就会在 await 方法中苏醒,并在执行 checkInterruptWhileWaiting 方法后开始尝试获取锁。

doSignalAll

  1. private void doSignalAll(Node first) {
  2. lastWaiter = firstWaiter = null;
  3. do {
  4. Node next = first.nextWaiter;
  5. first.nextWaiter = null;
  6. transferForSignal(first);
  7. first = next;
  8. } while (first != null);
  9. }

doSignalAll 方法唤醒了所有的 Condition 节点,并加入到 Sync Queue。

await(二)

唤醒了线程后,我们再来看一下线程唤醒后需要做哪些操作,先看一下代码:

  1. public final void await() throws InterruptedException {
  2. ...
  3. while (!isOnSyncQueue(node)) {
  4. LockSupport.park(this);
  5. // 如果中断就退出
  6. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  7. break;
  8. }
  9. // 重新获取锁
  10. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  11. interruptMode = REINTERRUPT;
  12. // 已经重入同步队列,清除等待队列中的关系
  13. if (node.nextWaiter != null) // clean up if cancelled
  14. unlinkCancelledWaiters();
  15. // 如果线程被中断了,需要抛出异常.或者什么都不做
  16. if (interruptMode != 0)
  17. reportInterruptAfterWait(interruptMode);
  18. }

一旦调用 signal 之后,节点被成功转移到同步队列后,会开始执行上面的第5行代码。如果线程被中断,或节点已经进入了同步队列,则退出循环体,继续向下执行。

signal 方法除了会将节点转移到同步队列,同时会调用 LockSupport.unpark(node.thread) 方法,唤醒被挂起的线程。

唤醒之后,我们可以看到调用 checkInterruptWhileWaiting 方法检查等待期间是否发生了中断,如果不为 0 表示确实在等待期间发生了中断。
但其实这个方法的返回结果用 interruptMode 变量接收,拥有更加丰富的内涵,它还能够判断中断的时机是否在 signal 之前。
interruptMode取值范围:

  1. /** await 返回的时候,需要重新设置中断状态 */
  2. private static final int REINTERRUPT = 1;
  3. /** await 返回的时候,需要抛出 InterruptedException 异常 */
  4. private static final int THROW_IE = -1;

checkInterruptWhileWaiting

该方法用于判断该线程是否在挂起期间发生了中断。

  1. private int checkInterruptWhileWaiting(Node node) {
  2. return Thread.interrupted() ?// 如果处于中断状态,返回true,且将重置中断状态
  3. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :// 如果中断了,判断何时中断
  4. 0; // 没有中断, 返回0
  5. }

transferAfterCancelledWait

该方法判断何时中断,是否在signal之前。

  1. final boolean transferAfterCancelledWait(Node node) {
  2. // 尝试使用CAS操作将node的ws设置为0
  3. // 如果节点的状态是Condition,说明还没有进入同步队列,因为调用signal的时候
  4. // 必然进入等待队列,这里不太可能成功。除非在signal之前被中断了,状态一直没有变化。
  5. // 所以调用signal之前发生了中断,这里会返回true
  6. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  7. // 就算中断了,也将节点入队
  8. enq(node);
  9. return true;
  10. }
  11. /*
  12. * 这里就是signal之后发生的中断
  13. * 但是signal可能还在进行转移中,这边自旋等一下它完成
  14. */
  15. while (!isOnSyncQueue(node))
  16. Thread.yield();
  17. return false;
  18. }

Condition 执行步骤示意图

深入理解 Condition - 图2

总结

ConditionObject 是 AQS 的内部类,它实现了 Condition 接口,提供了类似 wait, notify 和 notifyAll 类似的功能。
Condition 必须和一个独占锁绑定使用,在 await 或 signal 之前必须持有独占锁。Condition 队列是一个单向链表,它是公平的,按照先进先出的顺序从队列中被唤醒并添加到 Sync Queue 中,这时便恢复了参与竞争锁的资格。
Condition Queue 和 Sync Queue 是不同的,Condition Queue 是单向的,队列的第一个节点 firstWaiter 是可以绑定线程的;而 Sync Queue 是双向的,队列的第一个节点 head 是不与线程绑定的。
Condition 在设计时充分考虑了 Object 中监视器方法的缺陷,设计为一个 lock 可以对应多个 Condition,从而可以使线程分散到多个等待队列中,使得应用更为灵活,并且在实现上使用了 FIFO 队列来保存等待线程,确保了可以做到使用 signal 按 FIFO 方式唤醒等待线程,避免每次唤醒所有线程导致数据竞争。
但这样也会导致使用更加复杂,在实际应用中谨慎使用多个等待队列。