CountDownLcatch 允许一个线程或者多个线程等待其他线程操作完成。

    假设我们有这样的需求,需要解析文件中所有的文件,然后汇总文件信息,此时可以考虑使用多线程,等所有的文件解析完成之后,在进行汇总操作。在这个需求中需要实现主线程等待其他所有线程完成解析操作!

    简单的方式可以使用join() 方法实现,比如下面的代码中展示了使用join() 实现的逻辑,join() 用于等待线程操作结束。其实现的原理是不断地检查join() 线程是否存活,如果存在则让线程不停地等待,线程终止后,线程的this.notifyAll() 方法会被调用。

    1. public class ThreadJoinExample {
    2. static File file = new File("/tmp");
    3. public static void main(String[] args) {
    4. File[] files = file.listFiles();
    5. List<Thread> readFileThread = new ArrayList<>(files.length);
    6. for (File file : files) {
    7. ReadThread thread = new ReadThread(file);
    8. thread.start();
    9. readFileThread.add(thread);
    10. }
    11. try {
    12. for (Thread thread : readFileThread) {
    13. thread.join();
    14. }
    15. } catch (Exception ignore) {
    16. }
    17. }
    18. static class ReadThread extends Thread {
    19. private final File file;
    20. public ReadThread(File file) {
    21. this.file = file;
    22. }
    23. @Override
    24. public void run() {
    25. System.out.println(file.getAbsolutePath());
    26. }
    27. }
    28. }

    JDK 1.5 开始提供的 CountDownLatch ,它能够更加简单清楚的实现 join 的功能,并且比 join 功能更多。

    public class CountDownLatchExample {
    
      static File file = new File("/tmp");
    
      public static void main(String[] args) throws InterruptedException {
        File[] files = file.listFiles();
        int count = files.length;
        CountDownLatch latch = new CountDownLatch(count);
        for (File file : files) {
          ReadThread thread = new ReadThread(file, latch);
          thread.start();
        }
    
        // 等待,直到计数器的值变为 0
        latch.await();
      }
    
      static class ReadThread extends Thread {
    
        private final File file;
    
        private final CountDownLatch latch;
    
        public ReadThread(File file, CountDownLatch latch) {
          this.file = file;
          this.latch = latch;
        }
    
        @Override
        public void run() {
          System.out.println(file.getAbsolutePath());
          // 执行完成之后计数器减 1
          latch.countDown();
        }
      }
    }
    

    CountDownLatch 的构造方法接收一个整数作为计数器,如果需要等待 N 个线程执行完成,这里就传入N。当线程调用 countDown() 的时候,N的值就会减少 1 ,CountDownLatch 的 await() 方法会阻塞当前线程,直到N的值变为 0。

    从 countDown() 方法的源码中可以看到,其本质还是一个共享锁,countDown() 实际上是释放锁,await() 实际上是尝试获取共享锁。

    // 构造一个同步器 state的值设置为 count
    public CountDownLatch(int count) {
         if (count < 0) throw new IllegalArgumentException("count < 0");
         this.sync = new Sync(count);
     }    
    
    // 每次释放一个锁
    public void countDown() {
         sync.releaseShared(1);
     }
    
    // 如果线程获取getState == 0 释放完成
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        // 如果不大于 0 ,则说明还有值未释放,直接进入等待队列
        // 如果等于1 也即是 getState() == 0, 立即返回
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    

    当前CountDownLcatch 也提供了超时等待模型,用于超时等待线程执行完成,`await(long time,TimeUnit unit) 。 值得注意的是,这里N必须大于等于0,当N等于0 的时候,执行await 并不会阻塞,另外CountDownLatch不能重新初始化计数器N或者修改内部的计数器值。

    需要说明的是,上文中提到 countDown() 实际上是对锁的解锁,await() 实际上是尝试获取共享锁锁,那么根据 happens-before 原则,我们可以知: 一个线程调用 countDown() 方法是 happens-before 另外一个线程调用 await() 方法的