什么是CountDownLatch?
CountDownLatch,它是 JDK 提供的并发流程控制的工具类,它是在 java.util.concurrent 包下,在 JDK1.5 以后加入的。
CountDownLatch
是多线程控制的一种工具,它被称为 门阀
、 计数器
或者 闭锁
。这个工具经常用来用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)
CountDownLatch 能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。它相当于是一个计数器,这个计数器的初始值就是线程的数量,每当一个任务完成后,计数器的值就会减一,当计数器的值为 0 时,表示所有的线程都已经任务了,然后在 CountDownLatch 上等待的线程就可以恢复执行接下来的任务。它相当于一个同步辅助器,允许一个或多个线程一直等待,直到一组在其他线程执行的操作全部完成
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. A CountDownLatch is initialized with a given count .The await methods block until the current count reaches zero due to invocations of the countDown method, after which all waiting threads are released and any subsequent invocations of await return immediately.
CountDownLatch 的API
CountDownLatch 的底层是由 AbstractQueuedSynchronizer
支持,而 AQS 的数据结构的核心就是两个队列,一个是 同步队列(sync queue)
,一个是条件队列(condition queue)
。
Sync
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
/**
* getCount() 方法的返回值是 getState() 方法,它是 AbstractQueuedSynchronizer 中的方法,
* 这个方法会返回当前线程计数,具有 volatile 读取的内存语义
*/
int getCount() {
return getState();
}
/**
* tryAcquireShared() 方法用于获取·共享状态下对象的状态,判断对象是否为 0
* 如果为 0 返回 1 ,表示能够尝试获取,如果不为 0,那么返回 -1,表示无法获取.
*
* 这个 共享状态 属于 AQS 中的概念,在 AQS 中分为两种模式,一种是 独占模式,一种是 共享模式。
* tryAcquire 独占模式,尝试获取资源,成功则返回 true,失败则返回 false。
* tryAcquireShared 共享方式,尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;
* 正数表示成功,且有剩余资源
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
/**
* 共享模式下的释放
*
* 这个方法是一个无限循环,获取线程状态,如果线程状态是 0 则表示没有被线程占有,
* 没有占有的话那么直接返回 false ,表示已经释放;然后下一个状态进行 - 1 ,
* 使用 compareAndSetState CAS 方法进行和内存值的比较,如果内存值也是 1 的话,
* 就会更新内存值为 0 ,判断 nextc 是否为 0 ,如果 CAS 比较不成功的话,会再次进行循环判断。
*/
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
构造方法CountDownLatch(int count)
- CountDownLatch 提供了一个构造方法,该参数 count 是需要倒数的数值,你必须指定其初始值
初始化的时候必须指定计数器的数量,如果数量为负会直接抛出异常
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
await
调用 await() 方法的线程开始等待,直到倒数结束,也就是 count 值为 0 的时候才会继续执行。
- 当count == 0,开始执行
- acquireSharedInterruptibly 是 AQS 中的方法,以共享模式进行中断。
- acquireSharedInterruptibly 方法的内部会首先判断线程是否
中断
,如果线程中断,则直接抛出线程中断异常。如果没有中断,那么会以共享的方式获取。如果能够在共享的方式下不能获取锁,那么就会以共享的方式断开链接。 ```java public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 构造一个共享模式的 Node 入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 使用无限循环判断新构造 node 的前驱节点,如果 node 节点的前驱节点是头节点,那么就会判断线程的状态 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 首先会设置头节点,然后进行一系列的判断,获取节点的获取节点的后继,以共享模式进行释放 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //如果获取前驱节点不是头节点,就会进行 park 断开操作,判断此时是否能够断开 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head;
// 首先会设置头节点,然后进行一系列的判断,获取节点的获取节点的后继,以共享模式进行释放
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
- 以无限循环的方式首先判断头节点是否等于尾节点,如果头节点等于尾节点的话,就会直接退出。
- 如果头节点不等于尾节点,会判断状态是否为 SIGNAL,不是的话就继续循环 compareAndSetWaitStatus,
- 然后断开后继节点。如果状态不是 SIGNAL,也会调用 compareAndSetWaitStatus 设置状态为 PROPAGATE,
- 状态为 0 并且不成功,就会继续循环。
- /
private void doReleaseShared() {
for (;;) {
} }Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
// 断开操作 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
<a name="pA4k7"></a>
#### shouldParkAfterFailedAcquire()
这个方法会判断 Node p 的前驱节点的`结点状态(waitStatus)`,节点状态一共有五种,分别是
- `CANCELLED(1)`:表示当前结点已取消调度。当超时或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
- `SIGNAL(-1)`:表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL。
- `CONDITION(-2)`:表示结点等待在 Condition 上,当其他线程调用了 Condition 的 signal() 方法后,CONDITION状态的结点将**从等待队列转移到同步队列中**,等待获取同步锁。
- `PROPAGATE(-3)`:共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点
- `0`:新结点入队时的默认状态。<br />
<a name="4YHUa"></a>
### await_(_long timeout, TimeUnit unit_)_
- await() 重载的方法,里面会传入超时参数,这个方法的作用和 await() 类似,但是这里可以设置超时时间,如果超时就不再等待了
- 当acount == 0,等待timeout之后开始执行
```java
/**
* timeout: 等待时间
* unit: 时间单位
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
countDown()
- 把数值倒数 1,也就是将 count 值减 1,直到减为 0 时,之前等待的线程会被唤起。
- 这个方法的作用主要用来减小计数器的值,当计数器变为 0 时,在 CountDownLatch 上
await
的线程就会被唤醒,继续执行其他任务。当然也可以延迟唤醒,给 CountDownLatch 加一个延迟时间就可以实现。 countDown 是和 await 同等重要的方法,countDown 用于减少计数器的数量,如果计数减为 0 的话,就会释放所有的线程。
public void countDown() {
sync.releaseShared(1);
}
getCount()
返回值是
getState()
方法,它是 AbstractQueuedSynchronizer 中的方法,这个方法会返回当前线程计数,具有 volatile 读取的内存语义public long getCount() {
return sync.getCount();
}
CountDownLatch的使用:
场景1:让单个线程等待:多个线程(任务)完成后,进行汇总合并
假如10个人在线抽签,只有10个人都抽完了签才能公布结果,没有抽完不能公布结果,怎么做呢?
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
int num = i + 1;
Runnable runnable = () -> {
try {
Thread.sleep(100);
System.out.println(num + "号完成抽签。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
};
service.submit(runnable);
}
System.out.println("等待所有人都完成抽签。。。。");
latch.await();
System.out.println("所有人都完成了抽签");
service.shutdown();
}
执行结果:
场景2:让多个线程等待:模拟并发,让并发线程一起执行
- 每年高考结束,都有大量的莘莘学子等待放榜,看自己究竟考了多少分,但是没有发布成绩之前,所有人都只能等着,等成绩公布出来,才能陆续去查成绩,通过代码如何实现呢?
执行结果:public static void main(String[] args) throws InterruptedException {
System.out.println("还有两小时成绩发榜");
CountDownLatch latch = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
int num = i + 1;
Runnable runnable = () -> {
System.out.println("学号00"+num+"在等待查询成绩");
try {
latch.await();
System.out.println("学号00"+ num + "完成绩查询。");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
service.submit(runnable);
}
Thread.sleep(5000);
System.out.println("成绩已公布");
latch.countDown();
service.shutdown();
}
还有两小时成绩发榜 学号001在等待查询成绩 学号002在等待查询成绩 学号003在等待查询成绩
学号004在等待查询成绩
学号005在等待查询成绩
学号006在等待查询成绩
学号007在等待查询成绩
学号008在等待查询成绩
学号009在等待查询成绩
学号0010在等待查询成绩
成绩已公布
学号001完成绩查询。
学号003完成绩查询。
学号005完成绩查询。
学号002完成绩查询。
学号009完成绩查询。
学号008完成绩查询。
学号007完成绩查询。
学号006完成绩查询。
学号004完成绩查询。
学号0010完成绩查询。
总结:
本文是 CountDownLatch 的基本使用和源码分析,CountDownLatch 就是一个基于 AQS 的计数器,它内部的方法都是围绕 AQS 框架来谈的,除此之外还有其他比如 ReentrantLock、Semaphore 等都是 AQS 的实现,所以要研究并发的话,离不开对 AQS 的探讨。CountDownLatch 的源码看起来很少,比较简单,但是其内部比如 await 方法的调用链路却很长,也值得花费时间深入研究。
CountDownLatch 类在创建实例的时候,需要在构造函数中传入倒数次数,然后由需要等待的线程去调用 await 方法开始等待,而每一次其他线程调用了 countDown 方法之后,计数便会减 1,直到减为 0 时,之前等待的线程便会继续运行。
当线程调用 CountDownLatch 的 await 方法时,便会尝试获取“共享锁”,不过一开始通常获取不到锁,于是线程被阻塞。“共享锁”可获取到的条件是“锁计数器”的值为 0,而“锁计数器”的初始值为 count,当每次调用 CountDownLatch 对象的 countDown 方法时,也可以把“锁计数器” -1。通过这种方式,调用 count 次 countDown 方法之后,“锁计数器”就为 0 了,于是之前等待的线程就会继续运行了,并且此时如果再有线程想调用 await 方法时也会被立刻放行,不会再去做任何阻塞操作了。