1. 简介

任意一个Java对象,都拥有一组监视器方法(定义在Object),主要包括以下方法:

  1. wait()
  2. wait(long timeout)
  3. notify()
  4. notifyAll()

这些方法与synchronized同步关键字配合使用时,可以实现等待/通知模式。
Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。方法如下所示:

  1. void await() throws InterruptedException; 当前线程进入等待直到被通知或中断
  2. boolean await(long time, TimeUnit unit) throws InterruptedException; 当前线程进入等待直到被通知或中断或超时
  3. long awaitNanos(long nanosTimeout) throws InterruptedException; 当前线程进入等待直到被通知或中断或超时
  4. void awaitUninterruptibly(); 当前线程进入等待直到被通知
  5. boolean awaitUntil(Date deadline) throws InterruptedException; 当前线程进入等待直到被通知或中断或到某个时间
  6. void signal(); 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁
  7. void signalAll(); 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁

2. 使用

Condition的类注释文档中给了一个有界缓冲区的例子,如下所示:该例子使用了两个条件队列,分别用于缓冲区满或者空的条件等待。

  1. /**
  2. * 有界缓冲区
  3. */
  4. static class BoundedBuffer {
  5. final Lock lock = new ReentrantLock();
  6. /**
  7. * 条件:不为满
  8. */
  9. final Condition notFull = lock.newCondition();
  10. /**
  11. * 条件:不为空
  12. */
  13. final Condition notEmpty = lock.newCondition();
  14. /**
  15. * 缓冲区
  16. */
  17. final Object[] items = new Object[10];
  18. /**
  19. * putptr: 进缓冲区的下标
  20. * takeptr: 出缓冲区的下标
  21. * count: 缓冲区内存在的数量
  22. */
  23. int putptr, takeptr, count;
  24. /**
  25. * 往缓冲区里加
  26. */
  27. public void put(Object x) throws InterruptedException {
  28. // 执行await或者signal之前得先获取锁
  29. lock.lock();
  30. try {
  31. // 满了, notFull wait
  32. while (count == items.length)
  33. notFull.await();
  34. items[putptr] = x;
  35. // 如果到尾了, 从0开始
  36. if (++putptr == items.length) putptr = 0;
  37. ++count;
  38. // 往缓冲区里加了, 则需要唤醒因为缓冲区空了而在等待的notEmpty条件队列
  39. notEmpty.signal();
  40. } finally {
  41. lock.unlock();
  42. }
  43. }
  44. /**
  45. * 往缓冲区里取
  46. */
  47. public Object take() throws InterruptedException {
  48. // 执行await或者signal之前得先获取锁
  49. lock.lock();
  50. try {
  51. // 空了, notEmpty wait
  52. while (count == 0)
  53. notEmpty.await();
  54. Object x = items[takeptr];
  55. // 如果到尾了, 从0开始
  56. if (++takeptr == items.length) takeptr = 0;
  57. --count;
  58. // 从缓冲区里取了, 则需要唤醒因为缓冲区满了而在等待的notEmpty条件队列
  59. notFull.signal();
  60. return x;
  61. } finally {
  62. lock.unlock();
  63. }
  64. }
  65. }
  66. @Test
  67. public void test_condition_use() throws InterruptedException {
  68. BoundedBuffer boundedBuffer = new BoundedBuffer();
  69. Thread thread1 = new Thread(() -> {
  70. while (true) {
  71. try {
  72. boundedBuffer.put(new Object());
  73. } catch (InterruptedException ignore) {
  74. }
  75. }
  76. });
  77. Thread thread2 = new Thread(() -> {
  78. while (true) {
  79. try {
  80. boundedBuffer.take();
  81. } catch (InterruptedException ignore) {
  82. }
  83. }
  84. });
  85. thread1.start();
  86. thread2.start();
  87. thread1.join();
  88. thread2.join();
  89. }

3. 详解

接下来,看一下条件等待队列是如何实现的,从上文的例子可知,Condition是通过ReentrantLock的newCondition方法获取的,阅读ReentrantLock源码可知,是由内部类Sync实现的该方法,如下所示:

  1. final ConditionObject newCondition() {
  2. return new ConditionObject();
  3. }

ConditionObject为Condition的实现类,在AQS中可以找到该类,接下来主要对该类进行分析。

3.1 成员变量

ConditionObject类的成员变量较为简单,如下所示:

  1. /**
  2. * 条件等待队列的头结点
  3. */
  4. private transient Node firstWaiter;
  5. /**
  6. * 条件等待队列的尾结点
  7. */
  8. private transient Node lastWaiter;

3.2 方法

3.2.1 addConditionWaiter方法

addConditionWaiter方法主要用来将当前线程加入到条件等待队列。代码如下所示:
ps:unlinkCancelledWaiters方法是用来移除条件等待队列中状态不为CONDITION的节点。

  1. /**
  2. * 将当前线程封装为一个Node加入条件等待队列中
  3. */
  4. private Node addConditionWaiter() {
  5. Node t = lastWaiter;
  6. // 最后一个节点是取消状态, 将处于取消状态的节点从等待队列中移除
  7. if (t != null && t.waitStatus != Node.CONDITION) {
  8. unlinkCancelledWaiters();
  9. t = lastWaiter;
  10. }
  11. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  12. // 队列为空
  13. if (t == null)
  14. firstWaiter = node;
  15. // 队列不为空, 将节点挂到队列尾
  16. else
  17. t.nextWaiter = node;
  18. lastWaiter = node;
  19. return node;
  20. }

3.2.2 fullyRelease方法

fullyRelease方法用来释放当前锁持有的同步状态,线程进入await方法之前肯定是获取同步状态了的,所以此处需要将其释放,并将同步状态记录下来(即saveState该方法的返回值),之后再次唤醒时需要重新获取该同步状态。
此处注意一个参数failed,在release方法中会调用子类实现的tryRelease方法,如果当前线程没有持有锁锁就进行release会抛出IllegalMonitorStateException。
ps:release方法中还会唤醒当前节点的后继节点。

  1. final int fullyRelease(Node node) {
  2. boolean failed = true;
  3. try {
  4. // 获取同步状态, 这些状态肯定是当前线程获取的, 因为是独占锁
  5. int savedState = getState();
  6. // 释放同步状态, 返回
  7. if (release(savedState)) {
  8. failed = false;
  9. return savedState;
  10. } else {
  11. throw new IllegalMonitorStateException();
  12. }
  13. } finally {
  14. // 如果失败了, 将当前节点的状态置为CANCELLED
  15. // 失败的情况: 当前线程没有持有锁, 在tryRelease抛出IllegalMonitorStateException
  16. if (failed)
  17. node.waitStatus = Node.CANCELLED;
  18. }
  19. }
  20. /**
  21. * 释放同步状态
  22. */
  23. public final boolean release(int arg) {
  24. // 调用子类实现的模板方法释放同步状态
  25. if (tryRelease(arg)) {
  26. /**
  27. * 此时head可能的情况
  28. * 1. null, 此时无竞争, head没有初始化
  29. * 2. head是当前线程的节点
  30. * 3. 在tryRelease之后, 别的线程的节点获取到了锁, 通过setHead方法设置(acquireQueued方法里)
  31. */
  32. Node h = head;
  33. // 唤醒后继节点
  34. if (h != null && h.waitStatus != 0)
  35. unparkSuccessor(h);
  36. return true;
  37. }
  38. return false;
  39. }

3.2.3 isOnSyncQueue方法

isOnSyncQueue方法用于判断节点是否在同步等待队列中。

  1. final boolean isOnSyncQueue(Node node) {
  2. // 如果状态已经被改为CONDITION
  3. if (node.waitStatus == Node.CONDITION || node.prev == null)
  4. return false;
  5. // (当前节点状态不为CONDITION, 并且有前驱结点)如果有后继节点, 则当前节点肯定在等待队列里了。
  6. if (node.next != null) // If has successor, it must be on queue
  7. return true;
  8. // 从尾节点往前遍历, 查找当前节点
  9. return findNodeFromTail(node);
  10. }
  11. private boolean findNodeFromTail(Node node) {
  12. Node t = tail;
  13. for (; ; ) {
  14. if (t == node)
  15. return true;
  16. if (t == null)
  17. return false;
  18. t = t.prev;
  19. }
  20. }

3.2.4 checkInterruptWhileWaiting方法 & transferAfterCancelledWait方法

checkInterruptWhileWaiting方法用于线程被唤醒之后判断在等待期间是否被中断。

  1. /**
  2. * 检测线程在等待过程中, 是否被中断
  3. * <p>
  4. * 中断了 -> 返回 -1 or 1
  5. * 未中断 -> 返回0
  6. */
  7. private int checkInterruptWhileWaiting(Node node) {
  8. return Thread.interrupted() ?
  9. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  10. 0;
  11. }

这里讲一下这几个返回值的含义,对应await方法中的interruptMode

  • interruptMode = 0 没有被中断
  • interruptMode =-1 是在条件队列中被中断的,需要抛出中断异常
  • interruptMode = 1 转移到等待队列之后被中断

如果线程当前被中断,则通过transferAfterCancelledWait判断是-1还是1。

  1. /**
  2. * @return true -> 在条件队列中被中断的, false -> 被中断时不在条件队列里了
  3. */
  4. final boolean transferAfterCancelledWait(Node node) {
  5. // 在从wait状态退出的时候, 需要将节点的状态设置为0并且加入到等待队列
  6. // 即, 重新加入到锁的竞争中
  7. // 如果cas成功, 说明节点之前是在条件队列中被中断的
  8. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  9. enq(node);
  10. return true;
  11. }
  12. // 如果cas失败, 执行到这一步, 那么有两种情况
  13. // 1. 节点已经被搞到等待队列里了 , 此时!isOnSyncQueue(node)返回false
  14. // 2. 节点正在被别的线程搞到等待队列里 , 此时!isOnSyncQueue(node)返回true
  15. while (!isOnSyncQueue(node))
  16. // 没在等待队列里, 对应上面第二种情况, 给调度程序示意当前线程愿意放弃当前使用的CPU时间片
  17. Thread.yield();
  18. // 返回false, 说明被中断时, 不在条件队列里了
  19. return false;
  20. }


3.2.5 await方法

await方法主要做的事情:

  1. 将当前线程加入到条件等待队列
  2. 释放当前线程获取的同步状态,唤醒后继节点
  3. 挂起当前线程
  4. 唤醒之后(检查是否被中断),重新获取同步状态

如下图所示:当前节点原先是同步等待队列的头节点(但是node.thread在获取锁时已经被置为null的)。在调用await方法时,会使用当前线程构建一个新的节点加入到条件等待队列,并且会唤醒同步等待队列的后继节点。
aqs-condition-await.png
代码如下所示:

  1. public final void await() throws InterruptedException {
  2. // 判断当前线程是否被中断
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. // 将当前线程加入到condition队列
  6. Node node = addConditionWaiter();
  7. // 释放锁, 因为调用 await 之前获取了锁, 所以需要释放锁
  8. // savedState为持有的锁数量, 在被唤醒之后需要重新获取
  9. int savedState = fullyRelease(node);
  10. int interruptMode = 0;
  11. // 判断是否在等待队列中, 不在则挂起当前线程
  12. while (!isOnSyncQueue(node)) {
  13. LockSupport.park(this);
  14. // 被唤醒之后, 判断是否在等待期间被中断
  15. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  16. break;
  17. }
  18. // 执行到这里, 线程已经被加入等待队列
  19. // 重新获取同步状态
  20. // acquireQueued(node, savedState)返回true -> 在等待队列中被中断
  21. // interruptMode != THROW_IE -> 在条件队列中未被中断
  22. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  23. interruptMode = REINTERRUPT;
  24. // 如果是被中断唤醒, 那么nextWaiter没有被设置为null; 如果signal唤醒的话, 会设置first.nextWaiter=null
  25. if (node.nextWaiter != null) // clean up if cancelled
  26. // 移除cancelled节点
  27. unlinkCancelledWaiters();
  28. // 1重新中断, -1抛出异常
  29. if (interruptMode != 0)
  30. reportInterruptAfterWait(interruptMode);
  31. }

这里解释一下线程被唤醒(即从park中醒过来)的可能性:

  1. 中断唤醒
  2. signal, 在转移到等待队列后, 在transferForSignal中发现前驱节点状态为CANCELLED, 唤醒 (可见下文3.2.7)
  3. signal, 在转移到等待队列后, 在transferForSignal中设置前驱结点状态为SIGNAL未成功, 唤醒(可见下文3.2.7)

3.2.6 signal方法

signal方法的逻辑比较简单,先判断当前显示是否持有锁,然后唤醒条件等待队列的头结点(如果不为空)。代码如下所示:

  1. /**
  2. * 唤醒条件等待队列的第一个节点
  3. */
  4. public final void signal() {
  5. // 判断当前是否拥有锁
  6. if (!isHeldExclusively())
  7. throw new IllegalMonitorStateException();
  8. Node first = firstWaiter;
  9. if (first != null)
  10. doSignal(first);
  11. }

3.2.7 doSignal方法 & transferForSignal方法

doSignal方法主要做的事情:调用transferForSignal方法将first节点从条件等待队列移到同步等待队列中,如果transferForSignal失败则会尝试使用下一个节点。
如下图所示:在signal方法被调用时,会将条件等待队列的首个节点从队列中移除,并加入到同步等待队列中。
aqs-condition-signal.png

代码如下所示:

  1. private void doSignal(Node first) {
  2. do {
  3. // firstWaiter = first.nextWaiter : 因为first即将被唤醒出队列, 所以让first等于下一个
  4. // firstWaiter == null : 队列里无节点了, 把lastWaiter也置为null
  5. if ((firstWaiter = first.nextWaiter) == null)
  6. lastWaiter = null;
  7. // 断开与下一个节点的关系
  8. first.nextWaiter = null;
  9. }
  10. // !transferForSignal(first) 将当前节点迁移到等待队列
  11. // 1. true -> 即transferForSignal失败, 继续do(如果队列里还有节点)
  12. // 2. false-> 即transferForSignal成功, 结束
  13. // 当第一个条件true, 判断(first = firstWaiter) != null, 即队列里是否还有节点, 决定是不是继续do
  14. while (!transferForSignal(first) &&
  15. (first = firstWaiter) != null);
  16. }
  17. final boolean transferForSignal(Node node) {
  18. // 将节点的状态从CONDITION设置为0
  19. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  20. return false;
  21. // 将节点加入到等待队列, 返回之前的tail, 即当前节点的前驱结点
  22. Node p = enq(node);
  23. int ws = p.waitStatus;
  24. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  25. LockSupport.unpark(node.thread);
  26. return true;
  27. }

这里解释一下transferForSignal方法中的第二个if判断的条件:
ps:线程被唤醒之后,会继续await方法,调用acquireQueued。

  1. ws > 0 : 说明前驱节点是CANCELLED状态,则需要唤醒当前节点,通过在acquireQueued方法调用shouldParkAfterFailedAcquire方法跳过CANCELLED状态的节点。
  2. !compareAndSetWaitStatus(p, ws, Node.SIGNAL) : 在队列尾加了节点之后,如果要进入阻塞则需要将前驱节点的状态设置为SINGAL。此处CAS失败, 则需要唤醒当前节点,通过在acquireQueued方法调用shouldParkAfterFailedAcquire方法设置前驱节点为SIGNAL状态

如果这个if里的条件都不成立,则不需要唤醒该节点,该节点在同步等待队列中等待前驱节点唤醒。

Other

代码
参考:《Java并发编程的艺术》