Latch翻译为中文有门闩的意思,也就是门锁,当锁没有打开时,相当于一个屏障把人挡在门外,CountDownLatch的作用是当没有放行时,可以让一组线程不能继续向下执行,等到某个节点时可以让线程“”抱团”继续执行。
CountDownLatch也可以称之为闭锁,是Java中线程的一种辅助工具类,其内部维护了一个不可为零值的初始化计数器,每当执行一次countDown()方法,计数器就会减一,当计数器为零值时,就会开始进行”抱团”开始的操作。
CountDownLatch在JDK1.5由Doug Lea引入。
适用场景
实际生活中也有很多类似的场景,比如一场运动会,等所有运动员到场比赛开始;一场会议,等所有参会人员到场会议开始。
总结起来可以概括为,等待一组线程在某个时机进行开就行后续行为。
实际开发中,比如需要对复杂数据进行组装,这个最终结果由多部分组成又互不依赖,为了节省时间就可以开启多线程进行异步加载,最终等所有数据加载完成,对数据进行封装返回。
使用案例
使用代码模拟运动会等待所有运动员准备完成,开始进行比赛的场景。
package juc.countdown;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* CountDownLatch 测试类
*
* @author starsray
* @date 2021/12/14
*/
public class Referee {
public static void main(String[] args) {
Referee referee = new Referee();
try {
referee.test();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void test() throws InterruptedException {
CountDownLatch start = new CountDownLatch(1);
int n = 10;
CountDownLatch done = new CountDownLatch(n);
for (int i = 0; i < n; i++) {
new Thread(new Runner(start, done)).start();
}
Thread.sleep(1000);
start.countDown();
done.await();
System.out.println("all runner complete!");
}
/**
* 工作线程 执行单元
*/
public static class Runner implements Runnable {
private final CountDownLatch start;
private final CountDownLatch done;
public Runner(CountDownLatch start, CountDownLatch done) {
this.start = start;
this.done = done;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " : prepare complete!");
start.await();
running();
done.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void running() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " : runner time : " + time());
}
public int time() {
int time = (int) (1 + Math.random() * (10 - 1 + 1));
try {
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
return time;
}
}
}
输出结果:
Thread-1 : prepare complete!
Thread-3 : prepare complete!
Thread-2 : prepare complete!
Thread-0 : prepare complete!
Thread-4 : prepare complete!
Thread-5 : prepare complete!
Thread-6 : prepare complete!
Thread-7 : prepare complete!
Thread-8 : prepare complete!
Thread-9 : prepare complete!
Thread-1 : run time : 2
Thread-9 : run time : 2
Thread-0 : run time : 3
Thread-2 : run time : 3
Thread-7 : run time : 4
Thread-6 : run time : 4
Thread-3 : run time : 7
Thread-5 : run time : 9
Thread-8 : run time : 9
Thread-4 : run time : 10
all runner complete!
源码简析
对CountDownLatch的源码进行简单分析,查看构造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
前面说到不为零值,如果传入0会抛出IllegalArgumentException异常。而且里面维护了一个静态内部类Sync,继承了AbstractQueuedSynchronizer,调用CountDownLatch的构造方法其实也就是调用Sync的构造方法,然后设置了AQS的state的值。
CountDownLatch中关键的两个方法就是await()和countDown(),其中await阻塞调用的主线程继续执行,countDown使计数器的值每次通过CAS原子操作减一。
- await方法通过Sync调用了AQS的acquireSharedInterruptibly方法,acquireSharedInterruptibly方法会根据Sync中子类实现tryAcquireShared方法的结果进行判断,当AQS中state的值为0时tryAcquireShared返回1,抛出InterruptedException异常,当返回不为0时调用doAcquireSharedInterruptibly方法。
// CountDownLatch await
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// CountDownLatch 中Sync AQS方法tryAcquireShared实现
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 调用AQS中doAcquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
- countDown方法通过Sync调用了AQS的releaseShared方法,releaseShared方法会根据子类实现Sync的tryReleaseShared执行结果判断是否继续调用AQS的doReleaseShared方法,进而返回true/false结果。
- tryReleaseShared方法可以看作是一个自选操作,每次获取AQS中当前state的值,如果为0,直接返回flase,否则就进行减一的操作,如果失败就继续重试,当最后state的值为0时,就要释放所有阻塞线程,调用AQS中的doReleaseShared方法。
// CountDownLatch countDown
public void countDown() {
sync.releaseShared(1);
}
// CountDownLatch 中Sync AQS方法tryReleaseShared实现
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
如果对于阻塞等待时间有要求可以使用await的重载方法,如果超时就会抛出InterruptedException异常。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
CountDownLatch在实际开发中是使用率比较高的线程辅助工具类,灵活可用性强,源码实现基于AQS的基本功能,源码本身比较简单,需要深入理解AQS相关的内容。