案例介绍

在日常开发中经常会遇到需要在主线程中开启多个线程去并行执行任务,并且主线程需要等待所有子线程执行完毕后再进行汇总的场景。在 CountDownLatch 出现之前一般都使用线程的 join()方法来实现这一点,但是 join 方法不够灵活,不能够满足不同场景的需要,所以 JDK 开发组提供了 CountDownLatch 这个类,我们前面介绍的例子使用 CountDownLatch 会更优雅。使用 CountDownLatch 的代码如下:

  1. public class JoinCountDownLatch {
  2. // 创建一个 CountDownLatch 实例
  3. private static volatile CountDownLatch countDownLatch = new CountDownLatch(2);
  4. public static void mainString[] args throws InterruptedException {
  5. Thread threadOne = new Thread(new Runnable() {
  6. @Override
  7. public void run() {
  8. try {
  9. Thread.sleep(1000);
  10. } catch InterruptedException e {
  11. // TODO Auto-generated catch block
  12. e.printStackTrace();
  13. }finally {
  14. countDownLatch.countDown();
  15. }
  16. System.out.println(「child threadOne over 」);
  17. }
  18. });
  19. Thread threadTwo = new Thread(new Runnable() {
  20. @Override
  21. public void run() {
  22. try {
  23. Thread.sleep(1000);
  24. } catch InterruptedException e {
  25. // TODO Auto-generated catch block
  26. e.printStackTrace();
  27. }finally {
  28. countDownLatch.countDown();
  29. }
  30. System.out.println(「child threadTwo over 」);
  31. }
  32. });
  33. // 启动子线程
  34. threadOne.start();
  35. threadTwo.start();
  36. System.out.println(「wait all child thread over 」);
  37. // 等待子线程执行完毕,返回
  38. countDownLatch.await();
  39. System.out.println(「all child thread over 」);
  40. }
  41. }

输出结果如下。

CountDownLatch 原理剖析 - 图1

在如上代码中,创建了一个 CountDownLatch 实例,因为有两个子线程所以构造函数的传参为 2。主线程调用 countDownLatch.await()方法后会被阻塞。子线程执行完毕后调用 countDownLatch.countDown()方法让 countDownLatch 内部的计数器减 1,所有子线程执行完毕并调用 countDown()方法后计数器会变为 0,这时候主线程的 await()方法才会返回。

其实上面的代码还不够优雅,在项目实践中一般都避免直接操作线程,而是使用 ExecutorService 线程池来管理。使用 ExecutorService 时传递的参数是 Runable 或者 Callable 对象,这时候你没有办法直接调用这些线程的 join()方法,这就需要选择使用 CountDownLatch 了。将上面代码修改为如下:

  1. public class JoinCountDownLatch2 {
  2. // 创建一个 CountDownLatch 实例
  3. private static CountDownLatch countDownLatch = new CountDownLatch(2);
  4. public static void mainString[] args throws InterruptedException {
  5. ExecutorService executorService = Executors.newFixedThreadPool(2);
  6. // 将线程 A 添加到线程池
  7. executorService.submit(new Runnable() {
  8. public void run() {
  9. try {
  10. Thread.sleep(1000);
  11. } catch InterruptedException e {
  12. // TODO Auto-generated catch block
  13. e.printStackTrace();
  14. } finally {
  15. countDownLatch.countDown();
  16. }
  17. System.out.println(「child threadOne over 」);
  18. }
  19. });
  20. // 将线程 B 添加到线程池
  21. executorService.submit(new Runnable() {
  22. public void run() {
  23. try {
  24. Thread.sleep(1000);
  25. } catch InterruptedException e {
  26. // TODO Auto-generated catch block
  27. e.printStackTrace();
  28. }finally {
  29. countDownLatch.countDown();
  30. }
  31. System.out.println(「child threadTwo over 」);
  32. }
  33. });
  34. System.out.println(「wait all child thread over 」);
  35. // 等待子线程执行完毕,返回
  36. countDownLatch.await();
  37. System.out.println(「all child thread over 」);
  38. executorService.shutdown();
  39. }
  40. }

输出结果如下。

CountDownLatch 原理剖析 - 图2

这里总结下 CountDownLatch 与 join 方法的区别。一个区别是,调用一个子线程的 join()方法后,该线程会一直被阻塞直到子线程运行完毕,而 CountDownLatch 则使用计数器来允许子线程运行完毕或者在运行中递减计数,也就是 CountDownLatch 可以在子线程运行的任何时候让 await 方法返回而不一定必须等到线程结束。另外,使用线程池来管理线程时一般都是直接添加 Runable 到线程池,这时候就没有办法再调用线程的 join 方法了,就是说 countDownLatch 相比 join 方法让我们对线程同步有更灵活的控制。

实现原理探究

从 CountDownLatch 的名字就可以猜测其内部应该有个计数器,并且这个计数器是递减的。下面就通过源码看看 JDK 开发组在何时初始化计数器,在何时递减计数器,当计数器变为 0 时做了什么操作,多个线程是如何通过计时器值实现同步的。为了一览 CountDownLatch 的内部结构,我们先看它的类图(如图 10-1 所示)。

CountDownLatch 原理剖析 - 图3

图 10-1

从类图可以看出,CountDownLatch 是使用 AQS 实现的。通过下面的构造函数,你会发现,实际上是把计数器的值赋给了 AQS 的状态变量 state,也就是这里使用 AQS 的状态值来表示计数器值。

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }
  5. Sync(int count) {
  6. setState(count);
  7. }

下面我们来研究 CountDownLatch 中的几个重要的方法,看它们是如何调用 AQS 来实现功能的。

1.void await()方法

当线程调用 CountDownLatch 对象的 await 方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:当所有线程都调用了 CountDownLatch 对象的 countDown 方法后,也就是计数器的值为 0 时;其他线程调用了当前线程的 interrupt()方法中断了当前线程,当前线程就会抛出 InterruptedException 异常,然后返回。

下面看下在 await()方法内部是如何调用 AQS 的方法的。

  1. //CountDownLatch 的 await()方法
  2. public void await() throws InterruptedException {
  3. sync.acquireSharedInterruptibly(1);
  4. }

从以上代码可以看到,await()方法委托 sync 调用了 AQS 的 acquireSharedInterruptibly 方法,后者的代码如下:

  1. //AQS 获取共享资源时可被中断的方法
  2. public final void acquireSharedInterruptiblyint arg
  3. throws InterruptedException {
  4. //如果线程被中断则抛出异常
  5. if Thread.interrupted())
  6. throw new InterruptedException();
  7. //查看当前计数器值是否为 0,为 0 则直接返回,否则进入 AQS 的队列等待
  8. if tryAcquireShared(arg) < 0
  9. doAcquireSharedInterruptiblyarg);
  10. }
  11. //sync 类实现的 AQS 的接口
  12. protected int tryAcquireSharedint acquires {
  13. return getState() == 0 1 : -1
  14. }

由如上代码可知,该方法的特点是线程获取资源时可以被中断,并且获取的资源是共享资源。acquireSharedInterruptibly 首先判断当前线程是否已被中断,若是则抛出异常,否则调用 sync 实现的 tryAcquireShared 方法查看当前状态值(计数器值)是否为 0,是则当前线程的 await()方法直接返回,否则调用 AQS 的 doAcquireSharedInterruptibly 方法让当前线程阻塞。另外可以看到,这里 tryAcquireShared 传递的 arg 参数没有被用到,调用 tryAcquireShared 的方法仅仅是为了检查当前状态值是不是为 0,并没有调用 CAS 让当前状态值减 1。

2.boolean await(long timeout, TimeUnit unit)方法

当线程调用了 CountDownLatch 对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:当所有线程都调用了 CountDownLatch 对象的 countDown 方法后,也就是计数器值为 0 时,这时候会返回 true;设置的 timeout 时间到了,因为超时而返回 false;其他线程调用了当前线程的 interrupt()方法中断了当前线程,当前线程会抛出 InterruptedException 异常,然后返回。

  1. public boolean await(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  4. }

3.void countDown() 方法

线程调用该方法后,计数器的值递减,递减后如果计数器值为 0 则唤醒所有因调用 await 方法而被阻塞的线程,否则什么都不做。下面看下 countDown()方法是如何调用 AQS 的方法的。

  1. //CountDownLatch 的 countDown()方法
  2. public void countDown() {
  3. //委托 sync 调用 AQS 的方法
  4. sync.releaseShared(1);
  5. }

由如上代码可知,CountDownLatch 的 countDown()方法委托 sync 调用了 AQS 的 releaseShared 方法,后者的代码如下。

  1. //AQS 的方法
  2. public final boolean releaseSharedint arg {
  3. //调用 sync 实现的 tryReleaseShared
  4. if tryReleaseShared(arg)) {
  5. //AQS 的释放资源方法
  6. doReleaseShared();
  7. return true
  8. }
  9. return false
  10. }

在如上代码中,releaseShared 首先调用了 sync 实现的 AQS 的 tryReleaseShared 方法,其代码如下。

  1. //sync 的方法
  2. protected boolean tryReleaseSharedint releases {
  3. //循环进行 CAS,直到当前线程成功完成 CAS 使计数器值(状态值 state)减 1 并更新到 state
  4. for (; ; {
  5. int c = getState();
  6. //如果当前状态值为 0 则直接返回(1)
  7. if c == 0
  8. return false
  9. //使用 CAS 让计数器值减 1(2)
  10. int nextc = c-1;
  11. if (compareAndSetState(c, nextc))
  12. return nextc == 0;
  13. }
  14. }

如上代码首先获取当前状态值(计数器值)。代码(1)判断如果当前状态值为 0 则直接返回 false,从而 countDown()方法直接返回;否则执行代码(2)使用 CAS 将计数器值减 1,CAS 失败则循环重试,否则如果当前计数器值为 0 则返回 true,返回 true 说明是最后一个线程调用的 countdown 方法,那么该线程除了让计数器值减 1 外,还需要唤醒因调用 CountDownLatch 的 await 方法而被阻塞的线程,具体是调用 AQS 的 doReleaseShared 方法来激活阻塞的线程。这里代码(1)貌似是多余的,其实不然,之所以添加代码(1)是为了防止当计数器值为 0 后,其他线程又调用了 countDown 方法,如果没有代码(1),状态值就可能会变成负数。

4.long getCount() 方法

获取当前计数器的值,也就是 AQS 的 state 的值,一般在测试时使用该方法。下面看下代码。

  1. public long getCount() {
  2. return sync.getCount();
  3. }
  4. int getCount() {
  5. return getState();
  6. }

由如上代码可知,在其内部还是调用了 AQS 的 getState 方法来获取 state 的值(计数器当前值)。

小结

本节首先介绍了 CountDownLatch 的使用,相比使用 join 方法来实现线程间同步,前者更具有灵活性和方便性。另外还介绍了 CountDownLatch 的原理,CountDownLatch 是使用 AQS 实现的。使用 AQS 的状态变量来存放计数器的值。首先在初始化 CountDownLatch 时设置状态值(计数器值),当多个线程调用 countdown 方法时实际是原子性递减 AQS 的状态值。当线程调用 await 方法后当前线程会被放入 AQS 的阻塞队列等待计数器为 0 再返回。其他线程调用 countdown 方法让计数器值递减 1,当计数器值变为 0 时,当前线程还要调用 AQS 的 doReleaseShared 方法来激活由于调用 await()方法而被阻塞的线程。