1、简介

看下CountDownLatch类图
image.png
看到CountDownLatch类有一个静态内部类Sync ,他是继承了AQS类,使用AQS的共享模式,那么它就可以调用AQS的相关方法。
对于CountdownLatch,简而言之就是,该类能实现让一个线程等待其他线程做完一些操作之后,再继续执行;也可以让一组线程去等待某一个线程执行完毕,然后再让一组线程继续执行。

2、 构造函数

CountDownLatch只提供了一个构造函数,传递一个int类型的初始值,该值可以称之为计数器,或者倒计时数,通过调用CountDownLatch.countDown()方法,能使之计数器或者倒计时数 减1,等到倒计时数==0时候,代表资源已经全部释放,需要唤醒阻塞队列里等待的线程

  1. public CountDownLatch(int count) {
  2. //边界值判断
  3. if (count < 0) throw new IllegalArgumentException("count < 0");
  4. //实例化Sync类
  5. this.sync = new Sync(count);
  6. }
  1. Sync(int count) {
  2. //调用AQS的setState方法
  3. setState(count);
  4. }

这里看到是最终给AQS的state值赋值,我们在ReentrantLock中知道,state值对应的是锁资源

  1. protected final void setState(int newState) {
  2. state = newState;
  3. }

3、方法

  • await() 线程调用该方法,如果锁资源未全部释放,会将线程以AQS的共享模式封装为node节点,添加到阻塞队列里,然后自旋被阻塞掉
  • countDown() 线程调用该方法,会让倒计时数减1,等待减一后 state ==0的时候,代表锁资源完全释放,会先唤醒阻塞队列的head.next节点,然后让head.next节点去唤醒后继阻塞的节点

    4、await

    是当前线程添加到阻塞队列挂起,直到锁资源完全释放被唤醒或者 该线程阻塞期间被中断唤醒

    1. public void await() throws InterruptedException {
    2. //调用sync方法
    3. sync.acquireSharedInterruptibly(1);
    4. }

    5、AQS.acquireSharedInterruptibly

    先判断是否被中断过,如果有直接抛出中断异常
    调用 tryAcquireShared(arg) 方法判断锁资源state是否等于0,没有说明锁资源未被释放,将线程入阻塞队列阻塞,等待唤醒

    1. public final void acquireSharedInterruptibly(int arg)
    2. throws InterruptedException {
    3. //条件成立: 说明调用await的线程已经是 中断状态了 直接抛出异常
    4. if (Thread.interrupted())
    5. throw new InterruptedException();
    6. //条件成立 AQS.state >0 此时将线程入队,然后阻塞等待被唤醒
    7. //条件不成立 AQS.state == 0 对应业务CountDownLatch已经countdown完了
    8. if (tryAcquireShared(arg) < 0)
    9. //将当前线程添加到共享锁阻塞队列
    10. doAcquireSharedInterruptibly(arg);
    11. }

    6、CountDownLatch.tryAcquireShared

    判断锁资源是否==0 ? 是返回1 ,不是返回 -1

    1. protected int tryAcquireShared(int acquires) {
    2. return (getState() == 0) ? 1 : -1;
    3. }

    7.AQS.doAcquireSharedInterruptibly

    将线程添加到阻塞队列里挂起逻辑

  • 线程封装为node,设置为共享模式

  • 判断是否是head.next节点,是的话尝试获取锁,成功之后调用 setHeadAndPropagate 方法,cas修改node状态为0,并唤醒阻塞队列的后继节点
  • 不是head.next节点,则给自己找一个好爸爸,节点修改 -1 (signal),然后阻塞挂起线程,挂起过程中被中断唤醒,修改node节点为取消状态,走移除node节点逻辑

    1. //AQS的doAcquireSharedInterruptibly方法
    2. private void doAcquireSharedInterruptibly(int arg)
    3. throws InterruptedException {
    4. //将调用await方法的线程封装为node 添加到AQS的阻塞队列中
    5. final Node node = addWaiter(Node.SHARED);
    6. boolean failed = true;
    7. try {
    8. for (;;) {
    9. //获取当前node的前驱节点
    10. final Node p = node.predecessor();
    11. //条件成立 说明当前node节点是head.next节点,有权利获取 共享锁
    12. if (p == head) {
    13. int r = tryAcquireShared(arg);
    14. if (r >= 0) {
    15. setHeadAndPropagate(node, r);
    16. p.next = null; // help GC
    17. failed = false;
    18. return;
    19. }
    20. }
    21. //给当前线程找到好爸爸,将好爸爸的状态设置为singal -1 ,返回true
    22. //parkAndCheckInterrupt 挂起node对应的线程
    23. if (shouldParkAfterFailedAcquire(p, node) &&
    24. parkAndCheckInterrupt())
    25. throw new InterruptedException();
    26. }
    27. } finally {
    28. //阻塞过程中被中断唤醒之后,会抛出中断异常,然后将node节点设置为取消状态,走清除节点逻辑
    29. if (failed)
    30. cancelAcquire(node);
    31. }
    32. }

    假设初始化了CountDownLatch(1),state==1,有三个线程(线程A,线程B,线程C)调用了await方法,那么这三个线程都会被添加到AQS的阻塞队列,然后被阻塞。那么此时阻塞队列的情况如下图所示:
    image.png

    8、countDown

    递减倒计时数,待倒计时数==0时候,释放所有阻塞的线程

    1. public void countDown() {
    2. //调用AQS的方法
    3. sync.releaseShared(1);
    4. }

    9、AQS.releaseShared

  • 调用tryReleaseShared 去让state 减1,state减为0时候返回 true,否则正常让state = state-1

  • state锁资源释放完之后,需要唤醒那些调用await方法而被阻塞挂起的线程
    1. //AQS的releaseShared
    2. public final boolean releaseShared(int arg) {
    3. //条件成立 说明调用latch.countdown()方法的线程是让 state-1 ==0的线程,需要做 唤醒await状态的线程逻辑
    4. if (tryReleaseShared(arg)) {
    5. //调用countdown方法的线程 只有一个会进入到这个里面,去做唤醒 阻塞状态的线程逻辑
    6. doReleaseShared();
    7. return true;
    8. }
    9. return false;
    10. }

    10、CountDownLatch.tryReleaseShared

    每调用一次该方法让AQS的state 减一 逻辑。
    自旋让state减一,待state 减为0 返回true,否则返回false
    1. //countdownlatch内部类sync的tryReleaseShared方法
    2. //更新AQS的state值,每调用一次,state减1,当state==0时 返回true
    3. protected boolean tryReleaseShared(int releases) {
    4. for (;;) {
    5. int c = getState();
    6. //条件成立 说明前面已经有线程 触发 唤醒操作了, 返回false
    7. if (c == 0)
    8. return false;
    9. //执行到这里,state>0
    10. int nextc = c-1;
    11. //cas 修改state
    12. if (compareAndSetState(c, nextc))
    13. //nextc == 0 说明当前调用countdown的线程是 需要触发 唤醒操作的线程
    14. return nextc == 0;
    15. }
    16. }

    11、AQS.doReleaseShared

    state锁释放完之后,唤醒 调用await方法而被阻塞的线程

共享锁唤醒逻辑 跟独占锁唤醒一样,都需要先将头节点状态 cas方式从 signal 修改为0,然后使用unpark方式 唤醒头节点的下一个节点

  1. //AQS的doReleaseShared方法
  2. //有哪几种情况会调用当前方法?
  3. //1.latch.countdown让AQS.state==0 然后调用doReleaseShared方法去唤醒head.next对应的线程
  4. //2.被唤醒的线程在doAcquireSharedInterruptibly方法中调用setHeadAndPropagate方法,然后会调用doReleaseShared该方法
  5. private void doReleaseShared() {
  6. for (;;) {
  7. //获取当前AQS的head节点
  8. Node h = head;
  9. //条件一 h != null 成立 说明阻塞队列不为空
  10. //条件二 h != tail 成立说明当前阻塞队列不只有head一个节点
  11. //h ==tail 表示head和tail指向同一个node节点,什么时候会出现?
  12. //1.正常唤醒情况,唤醒最后一个node节点时候,head是等于tail的
  13. //2.第一个调用await的线程在准备addWaiter入队时给head节点擦屁股后还没有把自己放进阻塞队列时候 这时候与countdown方法发生并发了
  14. if (h != null && h != tail) {
  15. //执行到这里 说明当前head一定有后继节点
  16. int ws = h.waitStatus;
  17. if (ws == Node.SIGNAL) {
  18. //将当前node状态改为0
  19. //为什么用cas 多个线程唤醒head.next节点时候, 可能会失败
  20. //案例:t3线程在if (h == head) 返回false时 t3不会退出循环,会继续自旋 参与到唤醒下一个head.next逻辑
  21. //t3此时执行cas 成功..t4(head节点线程)在t3修改成功之前,也进入到这里代码块,t4会compareAndSetWaitStatus 修改失败,因为t3改过了
  22. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  23. continue;
  24. //唤醒head的后继节点
  25. unparkSuccessor(h);
  26. }
  27. else if (ws == 0 &&
  28. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  29. continue;
  30. }
  31. //条件成立
  32. //1. 说明刚被unaprk唤醒的后继节点还没有执行到setHeadAndPropagate方法里面的 设置当前后继node为head节点逻辑
  33. //2. h==null
  34. //3. h==tail head==tail指向一个node对象
  35. //条件不成立
  36. //被唤醒的节点 很积极 直接将自己设置为head节点, 此时 唤醒它的节点(前驱节点) 执行h==head不成立
  37. //此时head节点的前驱节点 不会跳出doReleaseShared方法,会继续唤醒 新head节点的后继节点..
  38. if (h == head) // loop if head changed
  39. break;
  40. }
  41. }

12、AQS.doAcquireSharedInterruptibly

线程A被unpark唤醒之后,线程A又会回到 doAcquireSharedInterruptibly 方法里面,会自旋判断当前节点是否是head.next节点,如果是并且满足state==0的条件,会调用setHeadAndPropagate方法

  1. //AQS的doAcquireSharedInterruptibly方法
  2. private void doAcquireSharedInterruptibly(int arg)
  3. throws InterruptedException {
  4. //将调用await方法的线程封装为node 添加到AQS的阻塞队列中
  5. final Node node = addWaiter(Node.SHARED);
  6. boolean failed = true;
  7. try {
  8. for (;;) {
  9. //获取当前node的前驱节点
  10. final Node p = node.predecessor();
  11. //条件成立 说明当前node节点是head.next节点,有权利获取 共享锁
  12. if (p == head) {
  13. //尝试去获取锁,共享锁获取锁是判断state==0是否成立
  14. int r = tryAcquireShared(arg);
  15. //条件成立,说明锁资源已释放完毕,
  16. if (r >= 0) {
  17. //让当前节点成为头节点,并让当前节点继续向后唤醒下一个节点
  18. setHeadAndPropagate(node, r);
  19. p.next = null; // help GC
  20. failed = false;
  21. return;
  22. }
  23. }
  24. //给当前线程找到好爸爸,将好爸爸的状态设置为singal -1 ,返回true
  25. //parkAndCheckInterrupt 挂起node对应的线程
  26. if (shouldParkAfterFailedAcquire(p, node) &&
  27. parkAndCheckInterrupt())
  28. throw new InterruptedException();
  29. }
  30. } finally {
  31. //阻塞过程中被中断唤醒之后,会抛出中断异常,然后将node节点设置为取消状态,走清除节点逻辑
  32. if (failed)
  33. cancelAcquire(node);
  34. }
  35. }

13、AQS.setHeadAndPropagate

该方法逻辑是将线程A设置为头节点,并唤醒线程A的下一个节点;一旦线程A被唤醒,并且被设置为头节点之后,就会继续唤醒线程A的后一个节点线程B。

  1. //AQS的方法 设置当前node为 head节点,并向后传播(依次唤醒!)
  2. private void setHeadAndPropagate(Node node, int propagate) {
  3. Node h = head;
  4. //将当前节点设置为head节点
  5. setHead(node);
  6. //propagate 是1 一定成立
  7. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  8. (h = head) == null || h.waitStatus < 0) {
  9. //获取当前节点的后继节点
  10. Node s = node.next;
  11. //条件一:s == null 成立 当前node节点已经是tail节点了,条件一成立 doReleaseShared会处理这种情况
  12. //条件二:前置条件 s!=null 那么要求s节点必须是 共享模式
  13. if (s == null || s.isShared())
  14. //基本上所有情况都会执行到这里
  15. doReleaseShared();
  16. }
  17. }

14、使用

  1. public static void main(String[] args) {
  2. CountDownLatch latch = new CountDownLatch(5);
  3. ExecutorService executorService = Executors.newFixedThreadPool(5);
  4. for (int i = 0; i < 5; i++) {
  5. executorService.submit(new Worker(String.format("线程%s", i), latch));
  6. }
  7. try {
  8. long startTime = System.currentTimeMillis();
  9. System.out.println("主线程开始等待其他组件执行....");
  10. latch.await();
  11. long endTime = System.currentTimeMillis();
  12. System.out.println("主线程开始执行后续逻辑....,耗时" + (endTime - startTime));
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. static class Worker implements Runnable {
  18. private String threadName;
  19. private CountDownLatch latch;
  20. public Worker(String threadName, CountDownLatch latch) {
  21. this.threadName = threadName;
  22. this.latch = latch;
  23. }
  24. @Override
  25. public void run() {
  26. try {
  27. System.out.println(this.threadName + "开始执行逻辑...");
  28. Thread.sleep(5000);
  29. System.out.println(this.threadName + "执行完毕逻辑...");
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. } finally {
  33. latch.countDown();
  34. }
  35. }