引言

CountDownLatch是一个很经典的多线程工具。它提供的能力很好理解,就是让一个或者多个线程在其他线程完成一些操作前处于等待状态。它的内部实现中用到了AQS。

一个例子

先来看一个使用CountDownLatch的例子:

  1. public class CountDownLatchTest {
  2. public static void main(String[] args) {
  3. CountDownLatch cdl = new CountDownLatch(10);
  4. for(int i=0;i<10;i++){
  5. int finalI = i;
  6. Thread thread = new Thread(() -> {
  7. System.out.println("线程"+Thread.currentThread().getName()+"开始执行");
  8. try {
  9. Thread.sleep(finalI *1000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. cdl.countDown();
  14. },"thread_"+i);
  15. thread.start();
  16. }
  17. long awaitStartTime = System.currentTimeMillis();
  18. try {
  19. cdl.await();
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. System.out.println(System.currentTimeMillis()-awaitStartTime+"秒之后"+"main方法开始执行");
  24. }
  25. }

在这个例子中,主线程调用CountDownLatch的await方法来等待,它等待的是10个线程完成他们的睡眠(睡眠用来替代实际业务中的执行逻辑)。十个线程中的每一个完成睡眠操作之后,都需要调用CountDownLatch的countDown方法。
因为最后一个线程需要睡眠9秒钟,所以大概九秒钟之后main线程会被唤醒继续执行。

重要方法与实现逻辑

从上面的例子可以看到CountDownLatch三个很重要的方法,第一个是构造方法,第二个是await方法,第三个是countDown方法。我们一一分析这三个方法:

构造方法

构造方法的逻辑如下:

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. this.sync = new Sync(count);
  4. }

传入的参数count就代表在执行await方法的线程返回前,需要执行countDown方法的次数。
我们看它实际上是调用了内部的同步器的构造方法:

  1. Sync(int count) {
  2. setState(count);
  3. }

这个同步器的构造方法中直接将同步器的状态设置为前面给的参数,相当于锁被持有的数量,要想await方法返回,必须解锁相应的次数。

await方法

  1. public void await() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }

await方法实际上是调用的同步器的acquireSharedInterruptibly()方法,也就是说,await是一个请求锁的过程。我们来请求锁的逻辑:

  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. if (tryAcquireShared(arg) < 0)
  6. doAcquireSharedInterruptibly(arg);
  7. }

tryAcquireShared方法在Sync给出了实现:

  1. protected int tryAcquireShared(int acquires) {
  2. return (getState() == 0) ? 1 : -1;
  3. }

很简单,就是判断state是否等于0。因为我们初始化的时候,state已经被设置成了给定的参数,不为零,所以会调用doAcquireSharedInterruptibly方法,这个方法我们已经讲过,它会将当前线程构造成一个Node放入到等待队列中,然后当前线程会通过调用LockSupport.park方法处于等待状态。

countDown方法

既然await方法是请求锁的操作,那么countDown就是释放锁的操作了:

  1. public void countDown() {
  2. sync.releaseShared(1);
  3. }

它调用的是同步器的releaseShared方法:

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

tryReleaseShared方法在Sync给出了实现:

  1. protected boolean tryReleaseShared(int releases) {
  2. // Decrement count; signal when transition to zero
  3. for (;;) {
  4. int c = getState();
  5. if (c == 0)
  6. return false;
  7. int nextc = c-1;
  8. if (compareAndSetState(c, nextc))
  9. return nextc == 0;
  10. }
  11. }
  12. }

逻辑很简单,就是判断state是否等于0;

需要注意的点

执行await的多个线程会一起唤醒

从上面我们看到,不管是加锁还是获取锁,都是跟shared相关的,这意味着执行请求锁操作也就是执行await方法的线程,他们之间是不会相互阻塞的,只要countDown方法执行的次数达到数量,这些线程都会被唤醒,看下面的例子:

  1. public class CountDownLatchTest {
  2. public static void main(String[] args) {
  3. CountDownLatch cdl = new CountDownLatch(10);
  4. for(int i=0;i<10;i++){
  5. int finalI = i;
  6. Thread thread = new Thread(() -> {
  7. System.out.println("线程"+Thread.currentThread().getName()+"开始执行");
  8. try {
  9. Thread.sleep(finalI *1000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. cdl.countDown();
  14. },"thread_"+i);
  15. thread.start();
  16. }
  17. Thread threadCopyMain = new Thread(new Runnable() {
  18. @Override
  19. public void run() {
  20. long l = System.currentTimeMillis();
  21. try {
  22. cdl.await();
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. System.out.println(System.currentTimeMillis()-l+"毫秒后"+"copyMain方法开始执行");
  27. }
  28. },"copyMain");
  29. threadCopyMain.start();
  30. long awaitStartTime = System.currentTimeMillis();
  31. try {
  32. cdl.await();
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. System.out.println(System.currentTimeMillis()-awaitStartTime+"毫秒后"+"main方法开始执行");
  37. }
  38. }

主线程和threadCopyMain这个线程都执行了await方法,在10个线程执行了countDown之后,这两个线程都会继续执行,输出如下:

  1. 线程thread_0开始执行
  2. 线程thread_2开始执行
  3. 线程thread_1开始执行
  4. 线程thread_4开始执行
  5. 线程thread_3开始执行
  6. 线程thread_5开始执行
  7. 线程thread_6开始执行
  8. 线程thread_7开始执行
  9. 线程thread_8开始执行
  10. 线程thread_9开始执行
  11. 9001毫秒后main方法开始执行
  12. 9000毫秒后copyMain方法开始执行

可以看出,两者差不多同时执行。因为releaseShared方法会唤醒后续的处于共享状态节点对应的线程。

countDown方法执行的数量不一定是线程的数量

当countDown方法执行到构造方法中给定的数量,等待的线程就会被唤醒,所以对于上面的例子,如果是一个线程执行countDown方法十次,也会有同样的效果,不一定要十个线程每个执行一次countDown方法,看下面的例子:

  1. public class CountDownLatchTest {
  2. public static void main(String[] args) {
  3. CountDownLatch cdl = new CountDownLatch(10);
  4. for(int i=0;i<5;i++){
  5. int finalI = i;
  6. Thread thread = new Thread(() -> {
  7. System.out.println("线程"+Thread.currentThread().getName()+"开始执行");
  8. try {
  9. Thread.sleep(finalI *1000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. cdl.countDown();
  14. cdl.countDown();
  15. },"thread_"+i);
  16. thread.start();
  17. }
  18. long awaitStartTime = System.currentTimeMillis();
  19. try {
  20. cdl.await();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. System.out.println(System.currentTimeMillis()-awaitStartTime+"毫秒后"+"main方法开始执行");
  25. }
  26. }

CountDownLatch构造方法中的数量仍然是10,但是不再是十个线程分别调用countDown,而是五个线程每个线程调用两次,输出如下:

  1. 线程thread_0开始执行
  2. 线程thread_1开始执行
  3. 线程thread_2开始执行
  4. 线程thread_3开始执行
  5. 线程thread_4开始执行
  6. 4000毫秒后main方法开始执行

在实际使用中,这种逻辑可能并不存在,一般都是每个线程做一定的工作然后执行countDown。

小结

CountDownLatch在很多场景中都会用到,例如主线程等待多个线程,每个线程完成自己的部分之后,主线程再执行。CountDownLatch构造方法中的参数,也就是countDown需要调用的次数在初始化之后就不能再修改。下一篇文章,我们来看CyclicBarrier。