Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知 模式。
Object的监视器方法与Condition接口的对比
对比项 | Object Monitor Methods | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock()获取锁 |
调用方式 | 直接调用。如object.wait() | 直接调用。如condition.await() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态不响应中断 | 不支持 | 支持 |
担心线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
使用示例
class ConditionUseCase {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
}
方法名称 | 描述 |
---|---|
void await() throws InterruptedException | 当前线程进入等待状态直到被通知(signal)或中断,当前线程将进入运行状态且从await()方法返回的情况,包括: 其他线程调用该Condition的signal()或signalAll()方法,而当前线程被选中唤醒 - 其他线程(调用interrupt()方法)中断当前线程 |
- 如果当前等待线程从await()方法返回,那么表明该线程已经获取了Condition对象所对应的锁
|
| void awaitUninterruptibly() | 当前线程进入等待状态直到被通知。不响应中断 |
| long awaitNanos(long nanosTimeout) throws InterruptedException | 当前线程进入等待状态直到被通知、中断或者超时。返回值表示剩余时间,如果在nanosTimeout纳秒之前被唤醒,纳秒返回值就是(nanosTimeout-实际耗时)。如果返回值是0或者负数,纳秒就可以认定已经超时了 |
| boolean awaitUntil(Date deadline) throws InterruptedException | 当前线程进入等待状态直到被通知、中断、或者到某个时间。如果没有到指定时间就被通知,方法返回true,否则表示到了指定时间,返回false |
| void signal() | 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁 |
| void signalAll() | 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得Condition相关联的锁 |
public class BoundedQueue<T> {
private Object[] items;
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
//添加一个元素,如果数组满,则添加线程进入等待状态,直到有“空位”
public void add(T t) throws InterruptedException {
lock.lock();
try {
while(count == items.length) {
notFull.await();
}
items[addIndex] = t;
if(++addIndex == items.length) {
addIndex = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
//由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
public T remove() throws InterruptedException {
lock.lock();
try {
while(count == 0) {
notEmpty.await();
}
Object x = items[removeIndex];
if(++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal();
return (T) x;
} finally {
lock.unlock();
}
}
}
ConditionObject是同步器AQS的内部类,每个Condition对象都包含着一个队列,该队列是Condition实现等待/通知功能的关键。
等待队列
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待线程。Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。
新增节点只需要将原有的尾节点nextWaiter指向它,并更新尾节点即可。这个过程并没有使用CAS保证,原有在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
等待
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//构造节点并加入等待队列尾节点
Node node = addConditionWaiter();
//释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
通知
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点,在唤醒节点之前,会将节点移到同步队列中。调用该方法的前置条件是获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移到到同步队列并使用LockSupport唤醒节点中的线程。
通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移到同步队列。当节点移到到同步队列后,当前线程再使用LockSupport唤醒该节点的线程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}