Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知 模式。
    Object的监视器方法与Condition接口的对比

    对比项 Object Monitor Methods Condition
    前置条件 获取对象的锁 调用Lock.lock()获取锁
    调用方式 直接调用。如object.wait() 直接调用。如condition.await()
    等待队列个数 一个 多个
    当前线程释放锁并进入等待状态 支持 支持
    当前线程释放锁并进入等待状态,在等待状态不响应中断 不支持 支持
    担心线程释放锁并进入超时等待状态 支持 支持
    当前线程释放锁并进入等待状态到将来的某个时间 不支持 支持
    唤醒等待队列中的一个线程 支持 支持
    唤醒等待队列中的全部线程 支持 支持

    使用示例

    1. class ConditionUseCase {
    2. Lock lock = new ReentrantLock();
    3. Condition condition = lock.newCondition();
    4. public void conditionWait() throws InterruptedException {
    5. lock.lock();
    6. try {
    7. condition.await();
    8. } finally {
    9. lock.unlock();
    10. }
    11. }
    12. public void conditionSignal() throws InterruptedException {
    13. lock.lock();
    14. try {
    15. condition.signal();
    16. } finally {
    17. lock.unlock();
    18. }
    19. }
    20. }
    方法名称 描述
    void await() throws InterruptedException 当前线程进入等待状态直到被通知(signal)或中断,当前线程将进入运行状态且从await()方法返回的情况,包括:
    其他线程调用该Condition的signal()或signalAll()方法,而当前线程被选中唤醒
    - 其他线程(调用interrupt()方法)中断当前线程


    - 如果当前等待线程从await()方法返回,那么表明该线程已经获取了Condition对象所对应的锁
    | | void awaitUninterruptibly() | 当前线程进入等待状态直到被通知。不响应中断 | | long awaitNanos(long nanosTimeout) throws InterruptedException | 当前线程进入等待状态直到被通知、中断或者超时。返回值表示剩余时间,如果在nanosTimeout纳秒之前被唤醒,纳秒返回值就是(nanosTimeout-实际耗时)。如果返回值是0或者负数,纳秒就可以认定已经超时了 | | boolean awaitUntil(Date deadline) throws InterruptedException | 当前线程进入等待状态直到被通知、中断、或者到某个时间。如果没有到指定时间就被通知,方法返回true,否则表示到了指定时间,返回false | | void signal() | 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁 | | void signalAll() | 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得Condition相关联的锁 |

    1. public class BoundedQueue<T> {
    2. private Object[] items;
    3. private int addIndex, removeIndex, count;
    4. private Lock lock = new ReentrantLock();
    5. private Condition notFull = lock.newCondition();
    6. private Condition notEmpty = lock.newCondition();
    7. public BoundedQueue(int size) {
    8. items = new Object[size];
    9. }
    10. //添加一个元素,如果数组满,则添加线程进入等待状态,直到有“空位”
    11. public void add(T t) throws InterruptedException {
    12. lock.lock();
    13. try {
    14. while(count == items.length) {
    15. notFull.await();
    16. }
    17. items[addIndex] = t;
    18. if(++addIndex == items.length) {
    19. addIndex = 0;
    20. }
    21. ++count;
    22. notEmpty.signal();
    23. } finally {
    24. lock.unlock();
    25. }
    26. }
    27. //由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
    28. public T remove() throws InterruptedException {
    29. lock.lock();
    30. try {
    31. while(count == 0) {
    32. notEmpty.await();
    33. }
    34. Object x = items[removeIndex];
    35. if(++removeIndex == items.length) {
    36. removeIndex = 0;
    37. }
    38. --count;
    39. notFull.signal();
    40. return (T) x;
    41. } finally {
    42. lock.unlock();
    43. }
    44. }
    45. }

    ConditionObject是同步器AQS的内部类,每个Condition对象都包含着一个队列,该队列是Condition实现等待/通知功能的关键。
    等待队列
    等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待线程。Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。
    新增节点只需要将原有的尾节点nextWaiter指向它,并更新尾节点即可。这个过程并没有使用CAS保证,原有在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
    等待

    1. public final void await() throws InterruptedException {
    2. if (Thread.interrupted())
    3. throw new InterruptedException();
    4. //构造节点并加入等待队列尾节点
    5. Node node = addConditionWaiter();
    6. //释放锁
    7. int savedState = fullyRelease(node);
    8. int interruptMode = 0;
    9. while (!isOnSyncQueue(node)) {
    10. LockSupport.park(this);
    11. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    12. break;
    13. }
    14. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    15. interruptMode = REINTERRUPT;
    16. if (node.nextWaiter != null) // clean up if cancelled
    17. unlinkCancelledWaiters();
    18. if (interruptMode != 0)
    19. reportInterruptAfterWait(interruptMode);
    20. }

    通知
    调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点,在唤醒节点之前,会将节点移到同步队列中。调用该方法的前置条件是获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移到到同步队列并使用LockSupport唤醒节点中的线程。
    通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移到同步队列。当节点移到到同步队列后,当前线程再使用LockSupport唤醒该节点的线程

    1. public final void signal() {
    2. if (!isHeldExclusively())
    3. throw new IllegalMonitorStateException();
    4. Node first = firstWaiter;
    5. if (first != null)
    6. doSignal(first);
    7. }
    1. private void doSignal(Node first) {
    2. do {
    3. if ( (firstWaiter = first.nextWaiter) == null)
    4. lastWaiter = null;
    5. first.nextWaiter = null;
    6. } while (!transferForSignal(first) &&
    7. (first = firstWaiter) != null);
    8. }
    1. final boolean transferForSignal(Node node) {
    2. /*
    3. * If cannot change waitStatus, the node has been cancelled.
    4. */
    5. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    6. return false;
    7. /*
    8. * Splice onto queue and try to set waitStatus of predecessor to
    9. * indicate that thread is (probably) waiting. If cancelled or
    10. * attempt to set waitStatus fails, wake up to resync (in which
    11. * case the waitStatus can be transiently and harmlessly wrong).
    12. */
    13. Node p = enq(node);
    14. int ws = p.waitStatus;
    15. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    16. LockSupport.unpark(node.thread);
    17. return true;
    18. }