一、简介
1.1 CountDownLatch 是什么?
CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
1.2 CountDownLatch 与 CyclicBarrier 的区别?
- CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
- CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
- CountDownLatch 内部自行采用 AQS实现的共享锁 ;而 CyclicBarrier内部采用 可重入锁 ReentrantLock 和Condition
1.3 CountDownLatch的API
```java //给定一个count的计数器 初始化CountDownLatch public CountDownLatch(int count);
//阻塞当前线程,直到CountDownLatch 的计数器归零,除非线程被打断(Thread#interrupt) public void await() throws InterruptedException
//阻塞当前线程,直到CountDownLatch 的计数器归零,或者等待超时,除非线程被打断(Thread#interrupt) public boolean await(long timeout, TimeUnit unit)
//递减锁存器的计数,如果计数到达零,则释放所有等待的线程。 public void countDown();
//返回当前计数。 public long getCount();
<a name="4N8CS"></a>
### 二、数据结构及图示
<a name="s7rSm"></a>
#### 2.1 CountDownLatch的UML类图
![image.png](https://cdn.nlark.com/yuque/0/2020/png/438760/1587548175049-39c19b58-7424-45f0-b157-08a05006afa5.png#crop=0&crop=0&crop=1&crop=1&height=557&id=X1w6o&margin=%5Bobject%20Object%5D&name=image.png&originHeight=557&originWidth=865&originalType=binary&ratio=1&rotation=0&showTitle=false&size=28497&status=done&style=none&title=&width=865)
<a name="c9WWt"></a>
#### 2.2 内部成员
```java
#共享锁
private final Sync sync;
CountDownLatch 内部采用“共享锁” 实现,内部的对象Sync 继承与AbstractQueuedSynchronizer。
三、源码分析
3.1 构造方法
//传入计数值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
//实例化“共享锁”
this.sync = new Sync(count);
}
3.2 核心 Sync 内部类
//继承AQS 实现的共享锁
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//尝试获取共享锁
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//释放共享锁
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
//获取AQS 同步状态的值
int c = getState();
//判断锁是否已经释放
if (c == 0)
return false;
int nextc = c-1;
//CAS 修改同步状态值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
3.3 await 方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
以上其实是调用AQS 中的 acquireSharedInterruptibly (可打断获取共享锁)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//判断线程是否被打断
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取共享锁
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
打断模式获取共享锁
//doAcquireSharedInterruptibly()会使当前线程一直等待,
//直到当前线程获取到共享锁(或被中断)才返回。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//以共享模式,创建节点加入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取节点的前一个节点
final Node p = node.predecessor();
//如果是CLH队列的头节点,则可以尝试获取共享锁
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//如果不是表头,则自旋,直到获取到共享锁。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
图示
3.4 countDown 方法
public void countDown() {
sync.releaseShared(1);
}
- 以上实际上是调用AQS 中的releaseShared方法
```java
public final boolean releaseShared(int arg) {
//tryReleaseShared 尝试释放锁
if (tryReleaseShared(arg)) {
} return false; }//尝试失败,则通过doReleaseShared()去释放共享锁
doReleaseShared();
return true;
- CountDownLatch Sync 实现的tryReleaseShared 方法
```java
//释放共享锁
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
//获取AQS 同步状态的值
int c = getState();
//判断锁是否已经释放
if (c == 0)
return false;
//计数器 -1
int nextc = c-1;
//CAS 修改同步状态值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
3.5 总结
CountDownLatch 内部是 通过继承AQS 的共享锁 Sync 实现的同步辅助类。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是“锁计数器”的初始状态(实际赋值给AQS 的同步状态值 state ),表示该“共享锁”最多能被count 个线程同时获取。而“共享锁”可用的条件,就是“锁计数器”的值为0!而“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1;通过这种方式,必须有count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待线程才能继续运行!
四、示例(Java 8)
4.1 实现最大并行(默认用户同时请求)
//实例化 CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(1);
List<Thread> threads = Stream.generate(() -> new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 等待");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 获取用户信息API ");
}))
.limit(100)
.collect(Collectors.toList());
//启动线程
threads.forEach(Thread::start);
//开始同时访问
TimeUnit.SECONDS.sleep(5);
countDownLatch.countDown();
System.out.println("完成请求===> ");
4.2 “主线程”等待”5个子线程”全部都完成”指定的工作”之后,再继续运行。
//实例化 CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> threads = Stream.generate(() -> new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "处理任务");
try {
//模拟耗时
TimeUnit.SECONDS.sleep(new Random().nextInt(3));
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + "完成任务");
}))
.limit(5)
.collect(Collectors.toList());
//启动线程
threads.forEach(Thread::start);
System.out.println("主线程等待");
countDownLatch.await();
System.out.println("主线程开始汇总任务");