await()
等待 state
变为 0 时被唤醒。这个方法可以响应中断。
一旦 state == 0
,这个方法会立即返回。如果 state > 0
,那么该线程就会阻塞,直到:
- state == 0。
线程被中断。
// java.util.concurrent.CountDownLatch#await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly
```java // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())
throw new InterruptedException();
// #1 当 state != 0 时,才会进入 doAcquireShared… 方法 if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
<a name="OQWC4"></a>
## tryAcquireShared
只有当 `state == 0` 时,返回 `1`。
```java
// java.util.concurrent.CountDownLatch.Sync#tryAcquireShared
/**
* 如果 state == 0,返回 1
* 如果 state != 0,返回 -1
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly
- 获取共享锁。
- 方法是可中断的。
如果当前 Node 位于阻塞队列队头,那么就去尝试获取共享锁。对于 CountDownLatch 来说,如果 state > 0
,那么相当于获取失败。接着执行 #6
处的代码。#6
会把前驱节点的 waitStatus
修改为 -1
,然后调用 parkAndCheckInterrupt()
方法将当前线程挂起。
当新的线程 t4
到达时,会把它的前驱节点的 waitStatus
修改为 -1
,自己也会被挂起。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// #1 插入阻塞队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// #2 进入死循环
for (;;) {
// #3 获取当前节点的前驱节点
final Node p = node.predecessor();
// #4 头结点,说明当前Node可以尝试获得锁
if (p == head) {
// #5 尝试获取获得共享锁。对于 CountDownLatch 来说,
// 只要 state != 0,就返回 -1,当 state == 0,返回 1,就可以进入 if
int r = tryAcquireShared(arg);
if (r >= 0) {
// #6 满足 state == 0,说明需要唤醒因 await() 方法阻塞的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// #6 state > 0,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
// java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
/**
* 在这个方法其实在分析AQS源码时解析过,目的是更新前驱节点的状态
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
唤醒
countDown()
countDown() 表示当前线程执行完毕,可以释放共享锁。
// java.util.concurrent.CountDownLatch#countDown
public void countDown() {
sync.releaseShared(1);
}
releaseShared
// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {
// #1 tryReleaseShared 是由CountDownLatch#Sync重写的方法。
// 只有当 state == 0 才会返回 true,才会进入 if
if (tryReleaseShared(arg)) {
// #2 唤醒阻塞在 await() 的所有线程
doReleaseShared();
// #3 返回 true
return true;
}
return false;
}
tryReleaseShared
使用自旋方式将 state 减 1。如果本来 state = 0,返回 false。
// java.util.concurrent.CountDownLatch.Sync#tryReleaseShared
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
doReleaseShared
对 CountDownLatch 来说,调用这个方法的前提条件必须是 state == 0
,表示之前持有共享锁的线程都已执行完毕,就可以唤醒所有阻塞在 await()
方法调用的线程。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// #1 死循环
for (;;) {
// #2 指向阻塞队列的头结点
Node h = head;
// #3 1. h == null 说明阻塞队列为空
// 2. h == tail 说明头结点刚被初始化
if (h != null && h != tail) {
// #3 意味着没有其它线程被阻塞了,不需要唤醒后继节点
int ws = h.waitStatus;
// #4 ws = -1,CAS 设置为 0,这里有失败的可能
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// #5 CAS 设置成功,唤醒线程
unparkSuccessor(h);
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果刚好有一个节点入队,所以可能会CAS失败
continue; // loop on failed CAS
}
// #6 h != head 说明前面已被唤醒的线程占领了 head 指针,不会退出 for(;;)
// 如果 h == head 意味着 head 没有改变,则退出循环。
if (h == head) // loop if head changed
break;
}
}
我们分析下最后一个 if 语句,然后才能解释第一个 CAS 为什么可能会失败:
- h == head:说明头节点还没有被刚刚用 unparkSuccessor 唤醒的线程(这里可以理解为 t4)占有,此时 break 退出循环。
h != head:头节点被刚刚唤醒的线程(这里可以理解为 t4)占有,那么这里重新进入下一轮循环,唤醒下一个节点(这里是 t4 )。我们知道,等到 t4 被唤醒后,其实是会主动唤醒 t5、t6、t7…,那为什么这里要进行下一个循环来唤醒 t5 呢?我觉得是出于吞吐量的考虑。
被唤醒的线程接下来处理
回到
await()
方法的调用,线程是在doAcquireSharedInterruptibly()
方法被阻塞的:// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // #1 插入阻塞队列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // #2 进入死循环 for (;;) { // #3 获取当前节点的前驱节点 final Node p = node.predecessor(); // #4 头结点,说明当前Node可以尝试获得锁 if (p == head) { // #5 尝试获取获得共享锁。对于 CountDownLatch 来说, // 只要 state != 0,就返回 -1,当 state == 0,返回 1,就可以进入 if int r = tryAcquireShared(arg); if (r >= 0) { // #6 满足 state == 0,说明需要唤醒因 await() 方法阻塞的线程 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // #6 state > 0, if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
当线程被唤醒,就会通过
Thread.interrupted()
方法判断线程是否被中断,如果是则返回true
,就会抛出InterruptedException()
。
如果没有被中断,继续在for(;;)
循环中执行。会进入到setHeadAndPropagate()
方法:setHeadAndPropagate
```java // java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) { // #1 记录旧的头结点的引用 Node h = head;
// #2 将当前的Node设置为头结点(清除信息)
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// #3 唤醒当前node之后的节点,类似传播,后面的阻塞线程都会被唤醒
// 回到前面的 doReaseShared() 方法
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
} ```