1. 简介
任意一个Java对象,都拥有一组监视器方法(定义在Object),主要包括以下方法:
wait()wait(long timeout)notify()notifyAll()
这些方法与synchronized同步关键字配合使用时,可以实现等待/通知模式。
Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。方法如下所示:
void await() throws InterruptedException; 当前线程进入等待直到被通知或中断boolean await(long time, TimeUnit unit) throws InterruptedException; 当前线程进入等待直到被通知或中断或超时long awaitNanos(long nanosTimeout) throws InterruptedException; 当前线程进入等待直到被通知或中断或超时void awaitUninterruptibly(); 当前线程进入等待直到被通知boolean awaitUntil(Date deadline) throws InterruptedException; 当前线程进入等待直到被通知或中断或到某个时间void signal(); 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁void signalAll(); 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁
2. 使用
Condition的类注释文档中给了一个有界缓冲区的例子,如下所示:该例子使用了两个条件队列,分别用于缓冲区满或者空的条件等待。
/*** 有界缓冲区*/static class BoundedBuffer {final Lock lock = new ReentrantLock();/*** 条件:不为满*/final Condition notFull = lock.newCondition();/*** 条件:不为空*/final Condition notEmpty = lock.newCondition();/*** 缓冲区*/final Object[] items = new Object[10];/*** putptr: 进缓冲区的下标* takeptr: 出缓冲区的下标* count: 缓冲区内存在的数量*/int putptr, takeptr, count;/*** 往缓冲区里加*/public void put(Object x) throws InterruptedException {// 执行await或者signal之前得先获取锁lock.lock();try {// 满了, notFull waitwhile (count == items.length)notFull.await();items[putptr] = x;// 如果到尾了, 从0开始if (++putptr == items.length) putptr = 0;++count;// 往缓冲区里加了, 则需要唤醒因为缓冲区空了而在等待的notEmpty条件队列notEmpty.signal();} finally {lock.unlock();}}/*** 往缓冲区里取*/public Object take() throws InterruptedException {// 执行await或者signal之前得先获取锁lock.lock();try {// 空了, notEmpty waitwhile (count == 0)notEmpty.await();Object x = items[takeptr];// 如果到尾了, 从0开始if (++takeptr == items.length) takeptr = 0;--count;// 从缓冲区里取了, 则需要唤醒因为缓冲区满了而在等待的notEmpty条件队列notFull.signal();return x;} finally {lock.unlock();}}}@Testpublic void test_condition_use() throws InterruptedException {BoundedBuffer boundedBuffer = new BoundedBuffer();Thread thread1 = new Thread(() -> {while (true) {try {boundedBuffer.put(new Object());} catch (InterruptedException ignore) {}}});Thread thread2 = new Thread(() -> {while (true) {try {boundedBuffer.take();} catch (InterruptedException ignore) {}}});thread1.start();thread2.start();thread1.join();thread2.join();}
3. 详解
接下来,看一下条件等待队列是如何实现的,从上文的例子可知,Condition是通过ReentrantLock的newCondition方法获取的,阅读ReentrantLock源码可知,是由内部类Sync实现的该方法,如下所示:
final ConditionObject newCondition() {return new ConditionObject();}
ConditionObject为Condition的实现类,在AQS中可以找到该类,接下来主要对该类进行分析。
3.1 成员变量
ConditionObject类的成员变量较为简单,如下所示:
/*** 条件等待队列的头结点*/private transient Node firstWaiter;/*** 条件等待队列的尾结点*/private transient Node lastWaiter;
3.2 方法
3.2.1 addConditionWaiter方法
addConditionWaiter方法主要用来将当前线程加入到条件等待队列。代码如下所示:
ps:unlinkCancelledWaiters方法是用来移除条件等待队列中状态不为CONDITION的节点。
/*** 将当前线程封装为一个Node加入条件等待队列中*/private Node addConditionWaiter() {Node t = lastWaiter;// 最后一个节点是取消状态, 将处于取消状态的节点从等待队列中移除if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);// 队列为空if (t == null)firstWaiter = node;// 队列不为空, 将节点挂到队列尾elset.nextWaiter = node;lastWaiter = node;return node;}
3.2.2 fullyRelease方法
fullyRelease方法用来释放当前锁持有的同步状态,线程进入await方法之前肯定是获取同步状态了的,所以此处需要将其释放,并将同步状态记录下来(即saveState该方法的返回值),之后再次唤醒时需要重新获取该同步状态。
此处注意一个参数failed,在release方法中会调用子类实现的tryRelease方法,如果当前线程没有持有锁锁就进行release会抛出IllegalMonitorStateException。
ps:release方法中还会唤醒当前节点的后继节点。
final int fullyRelease(Node node) {boolean failed = true;try {// 获取同步状态, 这些状态肯定是当前线程获取的, 因为是独占锁int savedState = getState();// 释放同步状态, 返回if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {// 如果失败了, 将当前节点的状态置为CANCELLED// 失败的情况: 当前线程没有持有锁, 在tryRelease抛出IllegalMonitorStateExceptionif (failed)node.waitStatus = Node.CANCELLED;}}/*** 释放同步状态*/public final boolean release(int arg) {// 调用子类实现的模板方法释放同步状态if (tryRelease(arg)) {/*** 此时head可能的情况* 1. null, 此时无竞争, head没有初始化* 2. head是当前线程的节点* 3. 在tryRelease之后, 别的线程的节点获取到了锁, 通过setHead方法设置(acquireQueued方法里)*/Node h = head;// 唤醒后继节点if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
3.2.3 isOnSyncQueue方法
isOnSyncQueue方法用于判断节点是否在同步等待队列中。
final boolean isOnSyncQueue(Node node) {// 如果状态已经被改为CONDITIONif (node.waitStatus == Node.CONDITION || node.prev == null)return false;// (当前节点状态不为CONDITION, 并且有前驱结点)如果有后继节点, 则当前节点肯定在等待队列里了。if (node.next != null) // If has successor, it must be on queuereturn true;// 从尾节点往前遍历, 查找当前节点return findNodeFromTail(node);}private boolean findNodeFromTail(Node node) {Node t = tail;for (; ; ) {if (t == node)return true;if (t == null)return false;t = t.prev;}}
3.2.4 checkInterruptWhileWaiting方法 & transferAfterCancelledWait方法
checkInterruptWhileWaiting方法用于线程被唤醒之后判断在等待期间是否被中断。
/*** 检测线程在等待过程中, 是否被中断* <p>* 中断了 -> 返回 -1 or 1* 未中断 -> 返回0*/private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;}
这里讲一下这几个返回值的含义,对应await方法中的interruptMode
- interruptMode = 0 没有被中断
- interruptMode =-1 是在条件队列中被中断的,需要抛出中断异常
- interruptMode = 1 转移到等待队列之后被中断
如果线程当前被中断,则通过transferAfterCancelledWait判断是-1还是1。
/*** @return true -> 在条件队列中被中断的, false -> 被中断时不在条件队列里了*/final boolean transferAfterCancelledWait(Node node) {// 在从wait状态退出的时候, 需要将节点的状态设置为0并且加入到等待队列// 即, 重新加入到锁的竞争中// 如果cas成功, 说明节点之前是在条件队列中被中断的if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {enq(node);return true;}// 如果cas失败, 执行到这一步, 那么有两种情况// 1. 节点已经被搞到等待队列里了 , 此时!isOnSyncQueue(node)返回false// 2. 节点正在被别的线程搞到等待队列里 , 此时!isOnSyncQueue(node)返回truewhile (!isOnSyncQueue(node))// 没在等待队列里, 对应上面第二种情况, 给调度程序示意当前线程愿意放弃当前使用的CPU时间片Thread.yield();// 返回false, 说明被中断时, 不在条件队列里了return false;}
3.2.5 await方法
await方法主要做的事情:
- 将当前线程加入到条件等待队列
- 释放当前线程获取的同步状态,唤醒后继节点
- 挂起当前线程
- 唤醒之后(检查是否被中断),重新获取同步状态
如下图所示:当前节点原先是同步等待队列的头节点(但是node.thread在获取锁时已经被置为null的)。在调用await方法时,会使用当前线程构建一个新的节点加入到条件等待队列,并且会唤醒同步等待队列的后继节点。
代码如下所示:
public final void await() throws InterruptedException {// 判断当前线程是否被中断if (Thread.interrupted())throw new InterruptedException();// 将当前线程加入到condition队列Node node = addConditionWaiter();// 释放锁, 因为调用 await 之前获取了锁, 所以需要释放锁// savedState为持有的锁数量, 在被唤醒之后需要重新获取int savedState = fullyRelease(node);int interruptMode = 0;// 判断是否在等待队列中, 不在则挂起当前线程while (!isOnSyncQueue(node)) {LockSupport.park(this);// 被唤醒之后, 判断是否在等待期间被中断if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 执行到这里, 线程已经被加入等待队列// 重新获取同步状态// acquireQueued(node, savedState)返回true -> 在等待队列中被中断// interruptMode != THROW_IE -> 在条件队列中未被中断if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 如果是被中断唤醒, 那么nextWaiter没有被设置为null; 如果signal唤醒的话, 会设置first.nextWaiter=nullif (node.nextWaiter != null) // clean up if cancelled// 移除cancelled节点unlinkCancelledWaiters();// 1重新中断, -1抛出异常if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
这里解释一下线程被唤醒(即从park中醒过来)的可能性:
- 中断唤醒
- signal, 在转移到等待队列后, 在transferForSignal中发现前驱节点状态为CANCELLED, 唤醒 (可见下文3.2.7)
- signal, 在转移到等待队列后, 在transferForSignal中设置前驱结点状态为SIGNAL未成功, 唤醒(可见下文3.2.7)
3.2.6 signal方法
signal方法的逻辑比较简单,先判断当前显示是否持有锁,然后唤醒条件等待队列的头结点(如果不为空)。代码如下所示:
/*** 唤醒条件等待队列的第一个节点*/public final void signal() {// 判断当前是否拥有锁if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}
3.2.7 doSignal方法 & transferForSignal方法
doSignal方法主要做的事情:调用transferForSignal方法将first节点从条件等待队列移到同步等待队列中,如果transferForSignal失败则会尝试使用下一个节点。
如下图所示:在signal方法被调用时,会将条件等待队列的首个节点从队列中移除,并加入到同步等待队列中。
代码如下所示:
private void doSignal(Node first) {do {// firstWaiter = first.nextWaiter : 因为first即将被唤醒出队列, 所以让first等于下一个// firstWaiter == null : 队列里无节点了, 把lastWaiter也置为nullif ((firstWaiter = first.nextWaiter) == null)lastWaiter = null;// 断开与下一个节点的关系first.nextWaiter = null;}// !transferForSignal(first) 将当前节点迁移到等待队列// 1. true -> 即transferForSignal失败, 继续do(如果队列里还有节点)// 2. false-> 即transferForSignal成功, 结束// 当第一个条件true, 判断(first = firstWaiter) != null, 即队列里是否还有节点, 决定是不是继续dowhile (!transferForSignal(first) &&(first = firstWaiter) != null);}final boolean transferForSignal(Node node) {// 将节点的状态从CONDITION设置为0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 将节点加入到等待队列, 返回之前的tail, 即当前节点的前驱结点Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
这里解释一下transferForSignal方法中的第二个if判断的条件:
ps:线程被唤醒之后,会继续await方法,调用acquireQueued。
- ws > 0 : 说明前驱节点是CANCELLED状态,则需要唤醒当前节点,通过在acquireQueued方法调用shouldParkAfterFailedAcquire方法跳过CANCELLED状态的节点。
- !compareAndSetWaitStatus(p, ws, Node.SIGNAL) : 在队列尾加了节点之后,如果要进入阻塞则需要将前驱节点的状态设置为SINGAL。此处CAS失败, 则需要唤醒当前节点,通过在acquireQueued方法调用shouldParkAfterFailedAcquire方法设置前驱节点为SIGNAL状态
如果这个if里的条件都不成立,则不需要唤醒该节点,该节点在同步等待队列中等待前驱节点唤醒。
Other
代码
参考:《Java并发编程的艺术》
