• #">1.CountDownLatch的使用#
  • #">2.CountDownLatch的内部实现#
    • #">2.1 await方法#
  • #">3. 小结#

    1.CountDownLatch的使用#

      假设现在,我们要起一个3块钱的集资项目,并且限定每个人一次只能捐1块钱当募集到3块钱的时候立马就把这笔钱捐给我自己,如果凑齐之后你还想捐,那么我会跟你说,项目已经完成了,你这一块钱我不受理,自己去买雪糕吃吧;如果没凑齐,那么我这个募集箱就一直挂在这里。这个场景用CountDownLatch可以很契合的模拟出来。

    1. public static void main(String[] args) throws InterruptedException {
    2. // 集资项目==========>启动,目标3块钱
    3. CountDownLatch countDownLatch = new CountDownLatch(3);
    4. ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
    5. executor.execute(() -> {
    6. try {
    7. TimeUnit.MILLISECONDS.sleep(100);
    8. System.err.println("张1准备捐一块钱");
    9. countDownLatch.countDown();
    10. System.err.println("张1捐了一块钱");
    11. } catch (InterruptedException e) {
    12. e.printStackTrace();
    13. }
    14. });
    15. executor.execute(() -> {
    16. try {
    17. TimeUnit.MILLISECONDS.sleep(100);
    18. System.err.println("张2准备捐一块钱");
    19. countDownLatch.countDown();
    20. System.err.println("张2捐了一块钱");
    21. } catch (InterruptedException e) {
    22. e.printStackTrace();
    23. }
    24. });
    25. executor.execute(() -> {
    26. try {
    27. TimeUnit.MILLISECONDS.sleep(100);
    28. System.err.println("张3准备捐一块钱");
    29. countDownLatch.countDown();
    30. System.err.println("张3捐了一块钱");
    31. } catch (InterruptedException e) {
    32. e.printStackTrace();
    33. }
    34. });
    35. System.err.println("我项目启动后,就在这里等人捐钱,不够3块我不走了");
    36. countDownLatch.await();
    37. System.err.println("3块钱到手,直接跑路");
    38. executor.shutdown();
    39. }

    多线程之CountDownLatch - 图1
    这个结果,em,可以看到countDownLatch使用的几个注意点:
    调用countDownLatch的await()方法的线程会阻塞,直到凑够3块钱为止
    跟CyclicBarrier不同,其计完数之后并不会阻塞,而是直接执行接下来的操作
    每次调用countDown()方法都会捐一块钱(计数一次),满了之后调用await()方法的线程不再阻塞
     另外,在上面的代码中,在countDown方法之后还打印信息是为了验证countDown方法不会阻塞当前线程,执行结果不一定如上图那样有顺序的,例如可能出现下方的结果:
    多线程之CountDownLatch - 图2因为最后一个countDown之后,await所在的线程不再阻塞了,又正好赶上JVM线程调度,所以就会出现上方的结果。

    2.CountDownLatch的内部实现#

      刚才已经讲了CountDownLatch的用法,用起来还是不难的。那来看下内部是怎么实现的,又是怎么做到计数之后不跟CyclicBarrier一样阻塞的呢?
      首先来看构造函数吧,CountDownLatch只有一个构造函数,如下

    1. public CountDownLatch(int count) {
    2. if (count < 0) throw new IllegalArgumentException("count < 0");
    3. this.sync = new Sync(count);
    4. }

    所做的事情也就只有初始化内部对象sync一件事情(校验总不能算一件事吧?),那来看下初始化了个啥玩意

    1. // 变量sync,是不是看起来很眼熟?
    2. private final Sync sync;
    3. // 内部类Sync,又是一个AQS的产物
    4. private static final class Sync extends AbstractQueuedSynchronizer {
    5. private static final long serialVersionUID = 4982264981922014374L;
    6. // 构造方法,就是设置了AQS的state值
    7. Sync(int count) {
    8. setState(count);
    9. }
    10. int getCount() {
    11. return getState();
    12. }
    13. /*
    14. * 可以知道countDownLatch使用的是AQS的共享模式
    15. * 获取资源方法,正数表示成功,负数表示失败
    16. */
    17. protected int tryAcquireShared(int acquires) {
    18. return (getState() == 0) ? 1 : -1;
    19. }
    20. // 释放方法
    21. protected boolean tryReleaseShared(int releases) {
    22. for (;;) {
    23. // state的状态,在countDownLatch中表示剩余计数量,如果为0则表示可以被获取,即await方法不再阻塞
    24. int c = getState();
    25. if (c == 0)
    26. return false;
    27. // 本次计数后还剩余的及数量
    28. int nextc = c-1;
    29. // CAS设置剩余计数量
    30. if (compareAndSetState(c, nextc))
    31. // ==0表示锁释放了,之后state的值将一直是0,意思就是之后的await方法都不再阻塞
    32. return nextc == 0;
    33. }
    34. }

    1.CountDownLatch构造函数count的参数作用就是设置其内部的AQS的状态state,假设count3,那么每次进行countDownAQSstate就减1,减到0的时候await方法就不再阻塞,注意这时候await方法就不再阻塞了,无论你调多少次。
    2.CountDownLatch里边的Sync实现的AQS的共享模式(从tryReleaseShared方法可以看出)
     到这里对其CountDownLatch的内部有个差不多印象了,接下来看下其最重要的awaitcountDown方法。

    2.1 await方法#

    1. public void await() throws InterruptedException {
    2. sync.acquireSharedInterruptibly(1);
    3. }

    直接调用了AQS的顶级方法,再进去就是AQS的模块了

    1. public final void acquireSharedInterruptibly(int arg)
    2. throws InterruptedException {
    3. if (Thread.interrupted())
    4. throw new InterruptedException();
    5. // 获取资源,成功直接返回,失败执行下方方法(进入同步队列)
    6. if (tryAcquireShared(arg) < 0)
    7. doAcquireSharedInterruptibly(arg);
    8. }

    关键的方法还是在资源的控制上——tryReleaseShared,代码如下(上方也有):

    1. protected boolean tryReleaseShared(int releases) {
    2. for (;;) {
    3. /*
    4. * state的状态,在countDownLatch中表示剩余计数量
    5. * 如果为0则表示可以被获取,即await方法不再阻塞
    6. */
    7. int c = getState();
    8. // 这里的意思是如果资源已经释放的情况下,就不能再次释放了,释放成功的代码在最后一行
    9. if (c == 0)
    10. return false;
    11. // 本次计数后还剩余的及数量
    12. int nextc = c-1;
    13. // CAS设置剩余计数量
    14. if (compareAndSetState(c, nextc))
    15. // ==0表示锁释放了,之后state的值将一直是0,意思就是之后的await方法都不再阻塞
    16. return nextc == 0;
    17. }
    18. }

    到这里countDown方法的迷雾也看清了,每一次调用countDown方法就相当于调用tryReleaseShared方法,如果当前资源还没释放的话,将state-1,判断是否为0如果为0的话表示资源释放唤醒await方法的线程,否则的话只是更新state的值。
     整理一下整个CountDownLatch的流程。

    1.创建一个CountDownLatch,并赋予一个数值,这个值表示需要计数的次数,每次countDown算一次 2.在主线程调用await方法,表示需要计数器完成之前都不能动await方法的内部实现依赖于内部的AQS,调用await方法的时候会尝试去获取资源,成功条件是state=0,也就是说除非countDowncount(构造函数赋予)次之后,才能成功,失败的话当前线程进行休眠。 3.在子线程调用countDown方法,每次调用都会使内部的state-1state0的时候资源释放await方法不再阻塞(即使再次调用也是)

    3. 小结#

      如果理解AQS的话,不止CountDownLatch,其他衍生物例如ReentrantLock都能轻易的看懂。如果不了解的话也没关系,这篇文章应该能让你对CountDownLatch的内部实现有了大概的轮廓。
      简单总结一下,CountDownLatch就三个点:构造函数的值、awaitcountDown。构造函数的值表示计数的次数,每次countDown都会使计数减一,减到0的时候await方法所在的线程就不再阻塞。