Condition结构从上到下

面向开发人员的API
await() 让当前线程进入等待,直到被 signal() 或中断
awaitUninterruptibly() 让当前线程进入等待,直到被 signal()
awaitNanos(long) 让当前线程进入等待,直到被 signal() 、中断、超时
await(long, TimeUnit) 让当前线程进入等待,直到被 signal() 、中断、超时
awaitUntil(Date) 让当前线程进入等待,直到被 signal() 、中断、超时
signal() 唤醒一个等待中的线程
signalAll() 唤醒所有等待中的线程
由开发人员具体实现的方法
底层实现
基于AQS的队列、LockSupport

Condition 结构整体比较简单,它复用了 AQS的节点 单独维护了一套队列。

Condition流程

在实现简易的生产队列和消费队列时,第一个想到的大多数都是 Object.wait()Object.notify() ,而今天要介绍一个自带的一个工具 ReentrantLock#Condition 。它相比 Object 在唤醒方面多了一些可控性,阻塞方面多了一个限时功能。两者的等待唤醒机制几乎一致,宏观流程图如下所示:
SimpleConditionObjectWaitFlow.png
根据这个流程类比图,我们可以对 Condition 的执行流程有了一个整体上的认知:

  1. A线程获取锁之后,执行完 await()释放锁
  2. B线程获取锁后执行 sinal() 会唤醒A线程
  3. B线程释放锁
  4. A线程获取锁,开始 doSomething...
  5. A线程释放锁

主要难点应该就在于 B线程唤醒A线程之后,A线程做了哪些事;其次就是 Condition#await()Condition#signal() 做了什么事。

流程详解

执行 await()

当A线程执行完 await() 方法后,它需要进入 Condition 等待队列,其次是释放当前线程的锁,最后进入阻塞状态,其流程示意图如下所示:
whenWait()1.png
await() 的源码如下所示:

  1. public final void await() throws InterruptedException {
  2. // 判断当前显示是否已经被中断
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. // 将当前线程封装成结点,加入Condition队列尾部,类似enq()方法,但是在锁内进行await(),不需要CAS
  6. Node node = addConditionWaiter();
  7. // 释放当前线程持有的所有锁,在这个操作之后很有可能出现竞态条件
  8. int savedState = fullyRelease(node);
  9. int interruptMode = 0;
  10. // 结点在同步队列里时就返回true
  11. // 结点在CONDITION队列里时就返回false
  12. while (!isOnSyncQueue(node)) {
  13. // 当唤醒线程调用unlock()时就会解除该线程的等待
  14. LockSupport.park(this);
  15. /* 第一种情况,该线程被中断,就会将interruptMode设置为THROW_IE
  16. * 第二种情况,该线程被中断且在检查过程中状态改变(比如中断时,被唤醒),就会将mode设置为REINTERRUPT
  17. * 第三种情况,该线程被正常signal(notify),此时结点状态值为0
  18. */
  19. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  20. break;
  21. }
  22. // 加入获取锁的Sync队列中,等待获取锁;获取成功时判断一下mode类型
  23. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  24. interruptMode = REINTERRUPT;
  25. // 如果当前结点有后续结点,那么就清理一下链表
  26. if (node.nextWaiter != null) // clean up if cancelled
  27. unlinkCancelledWaiters();
  28. if (interruptMode != 0)
  29. // 这里判断事抛出异常还是简单的中断
  30. reportInterruptAfterWait(interruptMode);
  31. }

整体思路应该不难,里面的个别方法需要展开来分析一下:

  1. addConditionWaiter() ,该方法主要用来生成一个 Node 进入 Condition等待队列

    private Node addConditionWaiter() {
     Node t = lastWaiter;                 // 获取Condition队列的最后一个节点
     // 如果发现 最后一个节点已经取消了,那么就往前查找最近的一个没取消的等待节点
     // 并将其设置为lastWaiter
     if (t != null && t.waitStatus != Node.CONDITION) {
         unlinkCancelledWaiters();
         t = lastWaiter;                    // 这里的lastWaiter已经更新了
     }
     // 创建一个 Condition Node
     Node node = new Node(Thread.currentThread(), Node.CONDITION);
     if (t == null)
         firstWaiter = node;
     else
         t.nextWaiter = node;
     lastWaiter = node;
     return node;
    }
    
  2. fullRelease() ,该方法会将 head 进行释放,唤醒下一个同步队列中的等待节点,具体的源码就不分析了,大家可以看前面的 ExclusiveShare 模式

  3. isOnSyncQueue() ,该方法蛮有意思的,里面的判断逻辑还挺搞:

    final boolean isOnSyncQueue(Node node) {
     // node是CONDITION状态 或者 是CONDITION队列里的第一个
     if (node.waitStatus == Node.CONDITION || node.prev == null)
         return false;
     // 为什么有next就是在同步队列呢?因为signal()是唤醒第一个的
     // 新的Condition节点一定是在队尾,且next为空;因为之前的都在阻塞等待唤醒~
     if (node.next != null) 
         return true;
     // CAS失败的情况有可能存在 node.prev != null 但是却不在队列中的情况
     // 如果真发生了 CAS 失败的情况,就会进入该方法 从队尾往队首遍历
     return findNodeFromTail(node);
    }
    
  4. 进入阻塞

  5. 如果从阻塞中唤醒(调用 signal() 的线程会将 该节点放到同步队列中去并变更状态),成功跳出 while() 循环
  6. 后续的逻辑和 exclusive 争抢锁的流程差不多~

执行signal()

整个 signal() 的逻辑也比较简单,主要的内容就三行:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

当调用 signal() 时,获取Condition队列的队首并唤醒它。主要流程在下面的 doSignal() 方法中:

private void doSignal(Node first) {
    do {
        // 如果 Condition队列的队首 的 nextWaiter 为空
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null; // 设置lastWaiter为空
        first.nextWaiter = null;    // 设置nextWaiter为空
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

doSignal() 里面主要处理了队列中的空数据,并执行唤醒的逻辑—— transferForSignal()

final boolean transferForSignal(Node node) {
    /*
     * 更改node结点的状态;如果更改失败就是已经取消了
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    /*
     * 将当前结点接入Sync队列,让这个结点加入等待;
     */ 
    Node p = enq(node);
    int ws = p.waitStatus;
    // 更新p节点的状态为SIGNAL
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);  // 将对应node的线程唤醒
    return true;
}

最后当执行 signal() 方法的线程释放锁后, await() 就能上来尝试争抢一下了~

总结

AQS的锁机制主要依靠双向的链表,而 Condition等待唤醒机制只需要普通链表即可实现。这里对整体流程又进行了个总结。
summary4AQS_Condition.png