一、简介

1.1 CountDownLatch 是什么?

CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

1.2 CountDownLatch 与 CyclicBarrier 的区别?

  • CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
  • CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
  • CountDownLatch 内部自行采用 AQS实现的共享锁 ;而 CyclicBarrier内部采用 可重入锁 ReentrantLock 和Condition

    1.3 CountDownLatch的API

    ```java //给定一个count的计数器 初始化CountDownLatch public CountDownLatch(int count);

//阻塞当前线程,直到CountDownLatch 的计数器归零,除非线程被打断(Thread#interrupt) public void await() throws InterruptedException

//阻塞当前线程,直到CountDownLatch 的计数器归零,或者等待超时,除非线程被打断(Thread#interrupt) public boolean await(long timeout, TimeUnit unit)

//递减锁存器的计数,如果计数到达零,则释放所有等待的线程。 public void countDown();

//返回当前计数。 public long getCount();

  1. <a name="4N8CS"></a>
  2. ### 二、数据结构及图示
  3. <a name="s7rSm"></a>
  4. #### 2.1 CountDownLatch的UML类图
  5. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/438760/1587548175049-39c19b58-7424-45f0-b157-08a05006afa5.png#crop=0&crop=0&crop=1&crop=1&height=557&id=X1w6o&margin=%5Bobject%20Object%5D&name=image.png&originHeight=557&originWidth=865&originalType=binary&ratio=1&rotation=0&showTitle=false&size=28497&status=done&style=none&title=&width=865)
  6. <a name="c9WWt"></a>
  7. #### 2.2 内部成员
  8. ```java
  9. #共享锁
  10. private final Sync sync;

CountDownLatch 内部采用“共享锁” 实现,内部的对象Sync 继承与AbstractQueuedSynchronizer。

三、源码分析

3.1 构造方法

  1. //传入计数值
  2. public CountDownLatch(int count) {
  3. if (count < 0) throw new IllegalArgumentException("count < 0");
  4. //实例化“共享锁”
  5. this.sync = new Sync(count);
  6. }

3.2 核心 Sync 内部类

  1. //继承AQS 实现的共享锁
  2. private static final class Sync extends AbstractQueuedSynchronizer {
  3. private static final long serialVersionUID = 4982264981922014374L;
  4. Sync(int count) {
  5. setState(count);
  6. }
  7. int getCount() {
  8. return getState();
  9. }
  10. //尝试获取共享锁
  11. protected int tryAcquireShared(int acquires) {
  12. return (getState() == 0) ? 1 : -1;
  13. }
  14. //释放共享锁
  15. protected boolean tryReleaseShared(int releases) {
  16. // Decrement count; signal when transition to zero
  17. for (;;) {
  18. //获取AQS 同步状态的值
  19. int c = getState();
  20. //判断锁是否已经释放
  21. if (c == 0)
  22. return false;
  23. int nextc = c-1;
  24. //CAS 修改同步状态值
  25. if (compareAndSetState(c, nextc))
  26. return nextc == 0;
  27. }
  28. }
  29. }

3.3 await 方法

  1. public void await() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }
  • 以上其实是调用AQS 中的 acquireSharedInterruptibly (可打断获取共享锁)

    1. public final void acquireSharedInterruptibly(int arg)
    2. throws InterruptedException {
    3. //判断线程是否被打断
    4. if (Thread.interrupted())
    5. throw new InterruptedException();
    6. //尝试获取共享锁
    7. if (tryAcquireShared(arg) < 0)
    8. doAcquireSharedInterruptibly(arg);
    9. }
  • 打断模式获取共享锁

    1. //doAcquireSharedInterruptibly()会使当前线程一直等待,
    2. //直到当前线程获取到共享锁(或被中断)才返回。
    3. private void doAcquireSharedInterruptibly(int arg)
    4. throws InterruptedException {
    5. //以共享模式,创建节点加入等待队列
    6. final Node node = addWaiter(Node.SHARED);
    7. boolean failed = true;
    8. try {
    9. for (;;) {
    10. //获取节点的前一个节点
    11. final Node p = node.predecessor();
    12. //如果是CLH队列的头节点,则可以尝试获取共享锁
    13. if (p == head) {
    14. int r = tryAcquireShared(arg);
    15. if (r >= 0) {
    16. setHeadAndPropagate(node, r);
    17. p.next = null; // help GC
    18. failed = false;
    19. return;
    20. }
    21. }
    22. //如果不是表头,则自旋,直到获取到共享锁。
    23. if (shouldParkAfterFailedAcquire(p, node) &&
    24. parkAndCheckInterrupt())
    25. throw new InterruptedException();
    26. }
    27. } finally {
    28. if (failed)
    29. cancelAcquire(node);
    30. }
    31. }
  • 图示

3.4 countDown 方法

  1. public void countDown() {
  2. sync.releaseShared(1);
  3. }
  • 以上实际上是调用AQS 中的releaseShared方法 ```java public final boolean releaseShared(int arg) { //tryReleaseShared 尝试释放锁 if (tryReleaseShared(arg)) {
    1. //尝试失败,则通过doReleaseShared()去释放共享锁
    2. doReleaseShared();
    3. return true;
    } return false; }
  1. - CountDownLatch Sync 实现的tryReleaseShared 方法
  2. ```java
  3. //释放共享锁
  4. protected boolean tryReleaseShared(int releases) {
  5. // Decrement count; signal when transition to zero
  6. for (;;) {
  7. //获取AQS 同步状态的值
  8. int c = getState();
  9. //判断锁是否已经释放
  10. if (c == 0)
  11. return false;
  12. //计数器 -1
  13. int nextc = c-1;
  14. //CAS 修改同步状态值
  15. if (compareAndSetState(c, nextc))
  16. return nextc == 0;
  17. }
  18. }

3.5 总结

CountDownLatch 内部是 通过继承AQS 的共享锁 Sync 实现的同步辅助类。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是“锁计数器”的初始状态(实际赋值给AQS 的同步状态值 state ),表示该“共享锁”最多能被count 个线程同时获取。而“共享锁”可用的条件,就是“锁计数器”的值为0!而“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1;通过这种方式,必须有count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待线程才能继续运行!

四、示例(Java 8)

4.1 实现最大并行(默认用户同时请求)

  1. //实例化 CountDownLatch
  2. CountDownLatch countDownLatch = new CountDownLatch(1);
  3. List<Thread> threads = Stream.generate(() -> new Thread(() -> {
  4. try {
  5. System.out.println(Thread.currentThread().getName() + " 等待");
  6. countDownLatch.await();
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. System.out.println(Thread.currentThread().getName() + " 获取用户信息API ");
  11. }))
  12. .limit(100)
  13. .collect(Collectors.toList());
  14. //启动线程
  15. threads.forEach(Thread::start);
  16. //开始同时访问
  17. TimeUnit.SECONDS.sleep(5);
  18. countDownLatch.countDown();
  19. System.out.println("完成请求===> ");

4.2 “主线程”等待”5个子线程”全部都完成”指定的工作”之后,再继续运行。

  1. //实例化 CountDownLatch
  2. CountDownLatch countDownLatch = new CountDownLatch(5);
  3. List<Thread> threads = Stream.generate(() -> new Thread(() -> {
  4. System.out.println(Thread.currentThread().getName() + "处理任务");
  5. try {
  6. //模拟耗时
  7. TimeUnit.SECONDS.sleep(new Random().nextInt(3));
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. countDownLatch.countDown();
  12. System.out.println(Thread.currentThread().getName() + "完成任务");
  13. }))
  14. .limit(5)
  15. .collect(Collectors.toList());
  16. //启动线程
  17. threads.forEach(Thread::start);
  18. System.out.println("主线程等待");
  19. countDownLatch.await();
  20. System.out.println("主线程开始汇总任务");

参考