Java Condition

概要

在JDK1.5之后出了Condition,它可以实现在同步语义中的等待/通知,以此来实现线程之间通信或协同。Condition和Object的wait和notify/notify在用法和效果上都十分的类似。

使用

例子

Condition经常可以用在生产者-消费者的场景中,看看官方的例子,这是一个经典的例子,看到会有熟悉的味道

  1. class BoundedBuffer {
  2. final Lock lock = new ReentrantLock();
  3. final Condition notFull = lock.newCondition();
  4. final Condition notEmpty = lock.newCondition();
  5. final Object[] items = new Object[100];
  6. int putptr, takeptr, count;
  7. public void put(Object x) throws InterruptedException {
  8. lock.lock();
  9. try {
  10. while (count == items.length)
  11. notFull.await();
  12. items[putptr] = x;
  13. if (++putptr == items.length) putptr = 0;
  14. ++count;
  15. notEmpty.signal();
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  20. public Object take() throws InterruptedException {
  21. lock.lock();
  22. try {
  23. while (count == 0)
  24. notEmpty.await();
  25. Object x = items[takeptr];
  26. if (++takeptr == items.length) takeptr = 0;
  27. --count;
  28. notFull.signal();
  29. return x;
  30. } finally {
  31. lock.unlock();
  32. }
  33. }
  34. }

上面是一个往有界的buffer数组写/取数据的例子,为了防止写数组溢出或读不到数据,用了Condition来做控制。可以看出Condition必须在临界区内使用,即必须先持有相应的锁,这跟synchronized和对象的监视器锁wait()/notify()很像。

常用方法

Condition的使用比较简单,总的来说只有等待和唤醒两套方法

  1. //当前线程进入等待状态直到被通知(signal)或中断。
  2. public final void await() throws InterruptedException
  3. //跟上面一样,但不响应中断。
  4. public final long awaitNanos(long nanosTimeout) throws InterruptedException
  5. //跟上面一样,指定超时等待多长的时间
  6. public final boolean awaitUntil(Date deadline) throws InterruptedException
  7. //当跟上面一样,指定超时等待的某个时间点
  8. public final boolean await(long time, TimeUnit unit) throws InterruptedException
  1. //唤醒一个等待在Condition队列上的线程
  2. public final void signal()
  3. //唤醒所有等待在Condition队列上的线程
  4. public final void signalAll()

实现原理

Condition其实是AbstractQueuedSynchronizer内部实现的ConditionObject。Condition的实现跟AQS有很大的关系。
Condition的原理大致是,当调用wait方法时,同步队列的头节点,即锁的持有者释放锁,并为当前线程创建一个节点加入到条件队列等待;当signal时,会释放条件队列的节点,并把这个节点接入到同步队列中等待获得锁。

初始化

Condition的初始化,我们可以看看ReentrantLock的,其最终是新建一个ConditionObject对象,ConditionObject是AQS的一个内部类

  1. public Condition newCondition() {
  2. return sync.newCondition();
  3. }
  4. final ConditionObject newCondition() {
  5. return new ConditionObject();
  6. }

每调用一次newCondition则会创建一个Condition对象,所以一个AQS可以对应多个Condition,可以使用多个条件队列,从上面BoundedBuffer的例子可以看出来。
Condition内部是一个condition queue条件队列。注意这个队列的节点Node,就是跟AQS用的是同一个

  1. /** First node of condition queue. */
  2. //指向队列头的指针
  3. private transient Node firstWaiter;
  4. /** Last node of condition queue. */
  5. //指向队列尾的指针
  6. private transient Node lastWaiter;

await

await方法有几个版本,看看最普通的那个

  1. public final void await() throws InterruptedException {
  2. //响应中断,抛出中断异常
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. //生成一个CONDITION属性节点,加入到条件队列的尾部
  6. Node node = addConditionWaiter();
  7. //释放锁,savedState为释放锁之前AQS的共享状态变量state的值。
  8. //由于调用此方法必须获得锁,所以这里是锁的持有者主动释放锁;如果不是锁的持有者,则会报错
  9. int savedState = fullyRelease(node);
  10. int interruptMode = 0;
  11. //如果不是在同步队列则一直阻塞
  12. //当然如果是中断的话,也会break出来
  13. //当节点第一次进来时,由于在条件队列中,肯定不在同步队列中,所以会被park住
  14. //这里会一直block,直到在同步队列由前驱节点unpark唤醒
  15. while (!isOnSyncQueue(node)) {
  16. LockSupport.park(this);
  17. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  18. break;
  19. }
  20. //被唤醒后,由于此时是在临界区,所以必须再次获得锁才能继续。这里会阻塞直到获得锁
  21. //state必须为savedState,有点还原现场的意思
  22. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  23. interruptMode = REINTERRUPT;
  24. if (node.nextWaiter != null) // clean up if cancelled
  25. unlinkCancelledWaiters();
  26. if (interruptMode != 0)
  27. reportInterruptAfterWait(interruptMode);
  28. }

从await方法已经可以大概了解Condition的处理流程,下面对里面的每个方法一一分析一下

第一步:加入条件队列

  1. //addConditionWaiter方法用来新建一个CONDITION的节点,然后加到条件队列尾部
  2. //上面说了,Condition持有firstWaiter和lastWaiter两个指针
  3. private Node addConditionWaiter() {
  4. Node t = lastWaiter;
  5. //不等于Node.CONDITION说明被cancel了,则清理一下这些垃圾节点
  6. if (t != null && t.waitStatus != Node.CONDITION) {
  7. unlinkCancelledWaiters();
  8. t = lastWaiter;
  9. }
  10. //新建一个node,状态为CONDITION
  11. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  12. if (t == null)
  13. firstWaiter = node;
  14. else
  15. //把尾部的next指向新建的node
  16. //注意,Node的nextWaiter属性是专门用来构成条件队列,也看出条件队列是一个单向队列;而Node的next属性则是用于同步队列
  17. t.nextWaiter = node;
  18. //尾部指向新建的node
  19. lastWaiter = node;
  20. return node;
  21. }

第二步:释放锁

  1. final int fullyRelease(Node node) {
  2. //默认释放锁失败
  3. boolean failed = true;
  4. try {
  5. //获得当前(即释放锁前)state的值
  6. int savedState = getState();
  7. //然后释放锁;释放多少?释放savedState这么多,因为有可能是重入锁;因为要完全的释放锁,才能让出锁给别人,所以这个方法叫fullyRelease
  8. //释放锁是调用AQS的release方法,而会最终调用子类的tryRelease方法,如果tryRelease返回失败,则release返回false
  9. if (release(savedState)) {
  10. failed = false;
  11. return savedState;
  12. } else {
  13. throw new IllegalMonitorStateException();
  14. }
  15. } finally {
  16. //如果tryRelease失败或者抛出异常,则把节点标记为CANCELLED,等待被清理掉
  17. //这里试想一种情况,如果一个线程在没有进入临界就调用Condition的await方法,会怎样?从我们的分析下来看没有问题,会在条件队列新加一个节点,直到这里就不行了,因为在tryRelease方法就会抛出IllegalMonitorStateException,然后就到这里。
  18. if (failed)
  19. node.waitStatus = Node.CANCELLED;
  20. }
  21. }
  1. //如上所述,第一次肯定不在同步队列,会被挂起
  2. //在signal时将节点从条件队列移到同步队列
  3. //这个方法就是判断node是否已经移动到同步队列了
  4. final boolean isOnSyncQueue(Node node) {
  5. //如果还是CONDITION则说明肯定还在条件队列中,因为移到同步队列会把状态更新为0
  6. //如果prev == null,则说明肯定还在条件队列中,因为移到同步队列肯定有前驱节点,这个prev属性只有在同步队列时才用到
  7. if (node.waitStatus == Node.CONDITION || node.prev == null)
  8. return false;
  9. //如果 node 已经有后继节点 next 的时候,那肯定是在阻塞队列了
  10. if (node.next != null) // If has successor, it must be on queue
  11. return true;
  12. //来到这里说明,node.waitStatus != Node.CONDITION && node.prev != null && node.next == null
  13. //但是node.prev != null 还不能说明node就在同步队列中,回顾一下AQS的进队操作,首先把node.prev指向tail,再通过cas把tail指向自己
  14. //但是第二步有可能失败,所以这里需要从tail开始遍历,如果能遍历到这个node,说明肯定在同步队列中
  15. return findNodeFromTail(node);
  16. }
  17. private boolean findNodeFromTail(Node node) {
  18. Node t = tail;
  19. for (;;) {
  20. if (t == node)
  21. return true;
  22. if (t == null)
  23. return false;
  24. t = t.prev;
  25. }
  26. }

接下来的acquireQueued方法是AQS获取锁的方法,熟悉AQS的应该清楚,会一直阻塞直到排队被唤醒,竞争锁成功。
上述大致如图所示:
640.png

signal

唤醒同步队列有signalsignalAll方法,两者的区别是前者只唤醒同步队列的头节点,后者唤醒同步队列的所有节点,我们重点看看signal
唤醒操作通常由另一个线程来操作,就像生产者-消费者模式中,如果线程因为等待消费而挂起,那么当生产者生产了一个东西后,会调用 signal 唤醒正在等待的线程来消费。

  1. public final void signal() {
  2. //必须是当前锁的持有者
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. Node first = firstWaiter;
  6. if (first != null)
  7. //将条件队列的第一个节点释放掉
  8. doSignal(first);
  9. }
  10. private void doSignal(Node first) {
  11. do {
  12. //将firstWaiter指向first的下一个节点,因为first将要被释放掉,所以如果没有下一个节点,那么也将lastWaiter置为null
  13. if ( (firstWaiter = first.nextWaiter) == null)
  14. lastWaiter = null;
  15. //释放当前first节点
  16. first.nextWaiter = null;
  17. //如果转移失败且下一个节点不null,那么继续转移下一个节点
  18. //为什么转移失败下面说
  19. } while (!transferForSignal(first) &&
  20. (first = firstWaiter) != null);
  21. }
  22. final boolean transferForSignal(Node node) {
  23. //CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消
  24. //既然已经取消,也就不需要转移了,返回false,继续转移一个节点
  25. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  26. return false;
  27. //把节点加入到同步队列中,调用的是AQS的enq方法
  28. //返回值p是node加入到同步队列的前驱节点
  29. Node p = enq(node);
  30. int ws = p.waitStatus;
  31. //ws > 0 表示前驱节点是取消状态,如果是,则unpark Node的线程。根据上面的await的分析,由于此时node已经在同步队列,所以unpark后还需要去获得锁
  32. //compareAndSetWaitStatus(p, ws, Node.SIGNAL) 是AQS的指定动作,把前驱节点设置为SIGNAL,意思是前驱节点要去唤醒后继节点
  33. //compareAndSetWaitStatus(p, ws, Node.SIGNAL) 会失败是由于恰好此时 p 的被取消了,所以也是和上面一个意思
  34. //这里的意思是,如果前驱节点取消了,就给当前节点一个机会去尝试获得锁
  35. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  36. LockSupport.unpark(node.thread);
  37. return true;
  38. }

signal过程大致如图所示:
640.webp

总结

Condition为在临界区中提供了协同的操作,试想想如果需要做到类似挂起/唤醒的操作,那么比较好的做法是通过一个队列来完成,而由于AbstractQueuedSynchronizer自身实现的存在,所以条件队列的节点跟同步队列用的是同一个类。Condition和Object的wait和notify/notify很像似,所以基本可以猜测实现原理也是差不多,只是前者在Java层面实现,后者在更底层的JVM实现。