await()

等待 state 变为 0 时被唤醒。这个方法可以响应中断。
一旦 state == 0,这个方法会立即返回。如果 state > 0,那么该线程就会阻塞,直到:

  • state == 0。
  • 线程被中断。

    1. // java.util.concurrent.CountDownLatch#await()
    2. public void await() throws InterruptedException {
    3. sync.acquireSharedInterruptibly(1);
    4. }

    acquireSharedInterruptibly

    ```java // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())

      throw new InterruptedException();
    

    // #1 当 state != 0 时,才会进入 doAcquireShared… 方法 if (tryAcquireShared(arg) < 0)

      doAcquireSharedInterruptibly(arg);
    

    }

<a name="OQWC4"></a>
## tryAcquireShared
只有当 `state == 0` 时,返回 `1`。
```java
// java.util.concurrent.CountDownLatch.Sync#tryAcquireShared
/**
 * 如果 state == 0,返回 1
 * 如果 state != 0,返回 -1
 */
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

doAcquireSharedInterruptibly

  1. 获取共享锁。
  2. 方法是可中断的。

如果当前 Node 位于阻塞队列队头,那么就去尝试获取共享锁。对于 CountDownLatch 来说,如果 state > 0,那么相当于获取失败。接着执行 #6 处的代码。
#6 会把前驱节点的 waitStatus 修改为 -1,然后调用 parkAndCheckInterrupt() 方法将当前线程挂起。
当新的线程 t4 到达时,会把它的前驱节点的 waitStatus 修改为 -1,自己也会被挂起。

// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // #1 插入阻塞队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // #2 进入死循环
        for (;;) {
            // #3 获取当前节点的前驱节点
            final Node p = node.predecessor();

            // #4 头结点,说明当前Node可以尝试获得锁
            if (p == head) {
                // #5 尝试获取获得共享锁。对于 CountDownLatch 来说,
                // 只要 state != 0,就返回 -1,当 state == 0,返回 1,就可以进入 if
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // #6 满足 state == 0,说明需要唤醒因 await() 方法阻塞的线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }

            // #6 state > 0,
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire

// java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
/**
 * 在这个方法其实在分析AQS源码时解析过,目的是更新前驱节点的状态
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

唤醒

CountDownLatch 唤醒操作.png

countDown()

countDown() 表示当前线程执行完毕,可以释放共享锁。

// java.util.concurrent.CountDownLatch#countDown
public void countDown() {
    sync.releaseShared(1);
}

releaseShared

// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
    // #1 tryReleaseShared 是由CountDownLatch#Sync重写的方法。
    // 只有当 state == 0 才会返回 true,才会进入 if
    if (tryReleaseShared(arg)) {
        // #2 唤醒阻塞在 await() 的所有线程
        doReleaseShared();

        // #3 返回 true
        return true;
    }
    return false;
}

tryReleaseShared

使用自旋方式将 state 减 1。如果本来 state = 0,返回 false。

// java.util.concurrent.CountDownLatch.Sync#tryReleaseShared
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

doReleaseShared

对 CountDownLatch 来说,调用这个方法的前提条件必须是 state == 0,表示之前持有共享锁的线程都已执行完毕,就可以唤醒所有阻塞在 await() 方法调用的线程。

// java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    // #1 死循环
    for (;;) {
        // #2 指向阻塞队列的头结点
        Node h = head;

        // #3 1. h == null 说明阻塞队列为空
        //    2. h == tail 说明头结点刚被初始化
        if (h != null && h != tail) {
            // #3 意味着没有其它线程被阻塞了,不需要唤醒后继节点
            int ws = h.waitStatus;

            // #4 ws = -1,CAS 设置为 0,这里有失败的可能
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // #5 CAS 设置成功,唤醒线程
                unparkSuccessor(h);
            } else if (ws == 0 && 
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果刚好有一个节点入队,所以可能会CAS失败
                continue;                // loop on failed CAS
        }

        // #6 h != head 说明前面已被唤醒的线程占领了 head 指针,不会退出 for(;;)
        // 如果 h == head 意味着 head 没有改变,则退出循环。
        if (h == head)                   // loop if head changed
            break;
    }
}

我们分析下最后一个 if 语句,然后才能解释第一个 CAS 为什么可能会失败:

  1. h == head:说明头节点还没有被刚刚用 unparkSuccessor 唤醒的线程(这里可以理解为 t4)占有,此时 break 退出循环。
  2. h != head:头节点被刚刚唤醒的线程(这里可以理解为 t4)占有,那么这里重新进入下一轮循环,唤醒下一个节点(这里是 t4 )。我们知道,等到 t4 被唤醒后,其实是会主动唤醒 t5、t6、t7…,那为什么这里要进行下一个循环来唤醒 t5 呢?我觉得是出于吞吐量的考虑。

    被唤醒的线程接下来处理

    回到 await() 方法的调用,线程是在 doAcquireSharedInterruptibly() 方法被阻塞的:

    // java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
    private void doAcquireSharedInterruptibly(int arg)
     throws InterruptedException {
     // #1 插入阻塞队列中
     final Node node = addWaiter(Node.SHARED);
     boolean failed = true;
     try {
         // #2 进入死循环
         for (;;) {
             // #3 获取当前节点的前驱节点
             final Node p = node.predecessor();
    
             // #4 头结点,说明当前Node可以尝试获得锁
             if (p == head) {
                 // #5 尝试获取获得共享锁。对于 CountDownLatch 来说,
                 // 只要 state != 0,就返回 -1,当 state == 0,返回 1,就可以进入 if
                 int r = tryAcquireShared(arg);
                 if (r >= 0) {
                     // #6 满足 state == 0,说明需要唤醒因 await() 方法阻塞的线程
                     setHeadAndPropagate(node, r);
                     p.next = null; // help GC
                     failed = false;
                     return;
                 }
             }
    
             // #6 state > 0,
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 throw new InterruptedException();
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
    }
    

    当线程被唤醒,就会通过 Thread.interrupted() 方法判断线程是否被中断,如果是则返回 true,就会抛出 InterruptedException()
    如果没有被中断,继续在 for(;;) 循环中执行。会进入到 setHeadAndPropagate() 方法:

    setHeadAndPropagate

    ```java // java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) { // #1 记录旧的头结点的引用 Node h = head;

// #2 将当前的Node设置为头结点(清除信息)
setHead(node);
/*
 * Try to signal next queued node if:
 *   Propagation was indicated by caller,
 *     or was recorded (as h.waitStatus either before
 *     or after setHead) by a previous operation
 *     (note: this uses sign-check of waitStatus because
 *      PROPAGATE status may transition to SIGNAL.)
 * and
 *   The next node is waiting in shared mode,
 *     or we don't know, because it appears null
 *
 * The conservatism in both of these checks may cause
 * unnecessary wake-ups, but only when there are multiple
 * racing acquires/releases, so most need signals now or soon
 * anyway.
 */
// #3 唤醒当前node之后的节点,类似传播,后面的阻塞线程都会被唤醒
// 回到前面的 doReaseShared() 方法
if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
        doReleaseShared();
}

} ```