Latch翻译为中文有门闩的意思,也就是门锁,当锁没有打开时,相当于一个屏障把人挡在门外,CountDownLatch的作用是当没有放行时,可以让一组线程不能继续向下执行,等到某个节点时可以让线程“”抱团”继续执行。

CountDownLatch也可以称之为闭锁,是Java中线程的一种辅助工具类,其内部维护了一个不可为零值的初始化计数器,每当执行一次countDown()方法,计数器就会减一,当计数器为零值时,就会开始进行”抱团”开始的操作。

CountDownLatch在JDK1.5由Doug Lea引入。

适用场景

实际生活中也有很多类似的场景,比如一场运动会,等所有运动员到场比赛开始;一场会议,等所有参会人员到场会议开始。

总结起来可以概括为,等待一组线程在某个时机进行开就行后续行为。

实际开发中,比如需要对复杂数据进行组装,这个最终结果由多部分组成又互不依赖,为了节省时间就可以开启多线程进行异步加载,最终等所有数据加载完成,对数据进行封装返回。

使用案例

使用代码模拟运动会等待所有运动员准备完成,开始进行比赛的场景。

  1. package juc.countdown;
  2. import java.util.concurrent.CountDownLatch;
  3. import java.util.concurrent.TimeUnit;
  4. /**
  5. * CountDownLatch 测试类
  6. *
  7. * @author starsray
  8. * @date 2021/12/14
  9. */
  10. public class Referee {
  11. public static void main(String[] args) {
  12. Referee referee = new Referee();
  13. try {
  14. referee.test();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. public void test() throws InterruptedException {
  20. CountDownLatch start = new CountDownLatch(1);
  21. int n = 10;
  22. CountDownLatch done = new CountDownLatch(n);
  23. for (int i = 0; i < n; i++) {
  24. new Thread(new Runner(start, done)).start();
  25. }
  26. Thread.sleep(1000);
  27. start.countDown();
  28. done.await();
  29. System.out.println("all runner complete!");
  30. }
  31. /**
  32. * 工作线程 执行单元
  33. */
  34. public static class Runner implements Runnable {
  35. private final CountDownLatch start;
  36. private final CountDownLatch done;
  37. public Runner(CountDownLatch start, CountDownLatch done) {
  38. this.start = start;
  39. this.done = done;
  40. }
  41. @Override
  42. public void run() {
  43. try {
  44. System.out.println(Thread.currentThread().getName() + " : prepare complete!");
  45. start.await();
  46. running();
  47. done.countDown();
  48. } catch (InterruptedException e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. private void running() throws InterruptedException {
  53. System.out.println(Thread.currentThread().getName() + " : runner time : " + time());
  54. }
  55. public int time() {
  56. int time = (int) (1 + Math.random() * (10 - 1 + 1));
  57. try {
  58. TimeUnit.SECONDS.sleep(time);
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. }
  62. return time;
  63. }
  64. }
  65. }

输出结果:

  1. Thread-1 : prepare complete!
  2. Thread-3 : prepare complete!
  3. Thread-2 : prepare complete!
  4. Thread-0 : prepare complete!
  5. Thread-4 : prepare complete!
  6. Thread-5 : prepare complete!
  7. Thread-6 : prepare complete!
  8. Thread-7 : prepare complete!
  9. Thread-8 : prepare complete!
  10. Thread-9 : prepare complete!
  11. Thread-1 : run time : 2
  12. Thread-9 : run time : 2
  13. Thread-0 : run time : 3
  14. Thread-2 : run time : 3
  15. Thread-7 : run time : 4
  16. Thread-6 : run time : 4
  17. Thread-3 : run time : 7
  18. Thread-5 : run time : 9
  19. Thread-8 : run time : 9
  20. Thread-4 : run time : 10
  21. all runner complete!

源码简析

对CountDownLatch的源码进行简单分析,查看构造方法

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }

前面说到不为零值,如果传入0会抛出IllegalArgumentException异常。而且里面维护了一个静态内部类Sync,继承了AbstractQueuedSynchronizer,调用CountDownLatch的构造方法其实也就是调用Sync的构造方法,然后设置了AQS的state的值。

CountDownLatch中关键的两个方法就是await()和countDown(),其中await阻塞调用的主线程继续执行,countDown使计数器的值每次通过CAS原子操作减一。

  • await方法通过Sync调用了AQS的acquireSharedInterruptibly方法,acquireSharedInterruptibly方法会根据Sync中子类实现tryAcquireShared方法的结果进行判断,当AQS中state的值为0时tryAcquireShared返回1,抛出InterruptedException异常,当返回不为0时调用doAcquireSharedInterruptibly方法。
  1. // CountDownLatch await
  2. public void await() throws InterruptedException {
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. // CountDownLatch 中Sync AQS方法tryAcquireShared实现
  6. protected int tryAcquireShared(int acquires) {
  7. return (getState() == 0) ? 1 : -1;
  8. }
  9. // 调用AQS中doAcquireSharedInterruptibly方法
  10. public final void acquireSharedInterruptibly(int arg)
  11. throws InterruptedException {
  12. if (Thread.interrupted())
  13. throw new InterruptedException();
  14. if (tryAcquireShared(arg) < 0)
  15. doAcquireSharedInterruptibly(arg);
  16. }
  • countDown方法通过Sync调用了AQS的releaseShared方法,releaseShared方法会根据子类实现Sync的tryReleaseShared执行结果判断是否继续调用AQS的doReleaseShared方法,进而返回true/false结果。
    • tryReleaseShared方法可以看作是一个自选操作,每次获取AQS中当前state的值,如果为0,直接返回flase,否则就进行减一的操作,如果失败就继续重试,当最后state的值为0时,就要释放所有阻塞线程,调用AQS中的doReleaseShared方法。
  1. // CountDownLatch countDown
  2. public void countDown() {
  3. sync.releaseShared(1);
  4. }
  5. // CountDownLatch 中Sync AQS方法tryReleaseShared实现
  6. protected boolean tryReleaseShared(int releases) {
  7. // Decrement count; signal when transition to zero
  8. for (;;) {
  9. int c = getState();
  10. if (c == 0)
  11. return false;
  12. int nextc = c-1;
  13. if (compareAndSetState(c, nextc))
  14. return nextc == 0;
  15. }
  16. }
  17. public final boolean releaseShared(int arg) {
  18. if (tryReleaseShared(arg)) {
  19. doReleaseShared();
  20. return true;
  21. }
  22. return false;
  23. }

如果对于阻塞等待时间有要求可以使用await的重载方法,如果超时就会抛出InterruptedException异常。

  1. public boolean await(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  4. }

CountDownLatch在实际开发中是使用率比较高的线程辅助工具类,灵活可用性强,源码实现基于AQS的基本功能,源码本身比较简单,需要深入理解AQS相关的内容。