Condition 接口提供了类似 Object 的监视器方法,可以与 Lock 接口配合实现等待-通知模式。Condition 实现了管程模型里面的条件变量,区别于 Java 语言内置的管程模型,Lock&Condition 实现的管程是支持多个条件变量的,这是二者的一个重要区别。
下面代码演示了在阻塞队列中使用 Condition 的经典案例:
public class BlockedQueue {
private final Lock lock = new ReentrantLock();
// 条件变量:队列不满
private final Condition notFull = lock.newCondition();
// 条件变量:队列不空
private final Condition notEmpty = lock.newCondition();
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (队列已满) {
notFull.await();
}
//...
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (队列为空) {
notEmpty.await();
}
//...
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
需要注意,Condition 必须在临界区内使用,即必须先持有相应的锁。并且 Lock 和 Condition 实现的管程,线程的等待和通知需要调用 await()、signal()、signalAll() 方法,它们的语义和 wait()、notify()、notifyAll() 是相同的。区别是后者只有在 synchronized 实现的管程里才能使用。
常用方法
Condition 的使用比较简单,总的来说只有等待和唤醒两套方法:
等待:
// 使当前线程进入等待状态,同时释放当前锁,直到被通知(signal、signalAll)或中断。
void await() throws InterruptedException;
// 与await()方法基本相同,但它不会在等待过程中响应中断。
void awaitUninterruptibly();
// 使当前线程进入等待状态,同时释放当前锁,直到被通知、中断或超时。
// 返回值表示剩余时间,如果返回值是0或负数,则已超时。
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 使当前线程进入等待状态,同时释放当前锁,直到被通知、中断或超时。
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 使当前线程进入等待状态,同时释放当前锁,直到被通知、中断或超时。
// 如果没有到指定时间就被通知,方法返回true;否则表示到了指定时间,方法返回false。
boolean awaitUntil(Date deadline) throws InterruptedException;
唤醒:
// 唤醒一个等待在Condition队列上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁
void signal();
// 唤醒所有等待在Condition队列上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁
void signalAll();
实现原理
1. ConditionObject
以可重入锁为例,Lock 接口的 newCondition 方法最终构造了一个 _ConditionObject _实例,ConditionObject 是 AQS 内部定义的一个类,该类实现了 Condition 接口,因为 Condition 的操作需要获取相关联的锁,所以作为 AQS 的内部类也是很合理的。
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
每调用一次 newCondition 方法就会创建一个新的 ConditionObject 对象,所以一个 AQS 是可以对应多个 Condition 条件的,即可以使用多个条件队列。
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
......
}
每个 ConditionObject 对象都包含了一个条件队列,该队列是 Condition 实现等待-通知模式的关键。大致原理就是:当调用 await 时,等待队列的头节点,即锁的持有者释放锁,并为当前线程创建一个节点加入到条件队列中等待;当调用 signal 时会释放条件队列的节点,并把这个节点接入到等待队列中等待获得锁。注意条件队列的节点 Node 跟 AQS 中用来等待锁释放中的等待队列中的 Node 是同一个类型。
2. 条件队列
条件队列是一个 FIFO 的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在 Condition 对象上等待的线程,如果一个线程调用了 Condition 对象的 await() 方法,那么该线程将会释放锁、构造成节点(AQS 中的 Node)从尾部加入条件队列(将原有的尾节点 nextWaiter 指向它,并更新尾节点)并进入等待状态。
上述节点引用更新的过程并没有使用 CAS 保证,原因在于调用 await() 方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
3. await
public final void await() throws InterruptedException {
// 响应中断,抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程构造成节点,并加入到条件队列的尾部
Node node = addConditionWaiter();
// 释放同步状态,savedState为释放锁之前AQS的共享状态变量state的值
// 由于调用此方法必须获得锁,所以这里是锁的持有者主动释放锁;如果不是锁的持有者,在释放锁时会报错
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果不是在同步队列则一直阻塞,当然如果是中断的话,也会break出来
// 当节点第一次进来时,由于在条件队列中,肯定不在同步队列中,所以会被park
// 这里会一直block,直到在同步队列由前驱节点unpark唤醒
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被唤醒后,由于此时是在临界区,所以必须再次获得锁才能继续。这里会阻塞直到获得锁
// state必须为savedState,有点还原现场的意思
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
调用 await() 方法,会使当前线程进入条件队列并释放锁,同时线程状态变为等待状态。当从 await() 方法返回时,当前线程一定获取了 Condition 相关联的锁。如果从 AQS(同步队列)的角度看 await() 方法,就相当于同步队列的首节点(获取了锁的节点)移动到了 Condition 的条件队列中的尾部。
4. signal
唤醒同步队列有 signal 和 signalAll 方法,两者的区别是前者只唤醒条件队列中等待时间最长的头节点,而后者则循环唤醒条件队列中的所有节点。唤醒操作通常由另一个线程来操作,就像生产者-消费者模式中,如果线程因为等待消费而挂起,那么当生产者生产了一个东西后,会调用 signal 唤醒正在等待的线程来消费。
public final void signal() {
// 当前线程必须是当前锁的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 将条件队列的第一个节点释放掉
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 释放当前first节点
first.nextWaiter = null;
// 如果转移失败且下一个节点不null,那么继续转移下一个节点
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 把节点加入到同步队列中,返回值p是node加入到同步队列的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
该方法会将条件队列中的首节点(firstWaiter)移动到同步队列,然后当前线程使用 LockSupport.unpark() 唤醒该节点的线程。被唤醒后的线程将从 await() 方法的 while 循环中退出(isOnSyncQueue() 方法返回 true,节点已经在同步队列中),进而调用 acquireQueued() 方法加入到获取同步状态的竞争中。成功获取同步状态后,被唤醒的线程将从先前调用 await() 方法返回,此时该线程已经成功地获取了锁。
LockSupport
LockSupport 可以在线程内任意位置让线程阻塞。与 Thread.suspend() 方法相比,它弥补了由于 resume() 方法调用时发生异常,导致线程无法继续执行的情况。和 object.wait() 方法相比,它不需要事先获得某个对象的锁,也不会抛出 InterruptedException 异常。
LockSupport 定义了一组公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而 LockSupport 也成为构建同步组件的基础工具。
LockSupport.park() 方法还能支持中断影响,但 Locksupport.park() 方法不会抛出 InterruptedException 异常而是会默默返回,但是我们可以从 Thread.interrupted() 等方法中获得中断标记。