CountDownLcatch 允许一个线程或者多个线程等待其他线程操作完成。
假设我们有这样的需求,需要解析文件中所有的文件,然后汇总文件信息,此时可以考虑使用多线程,等所有的文件解析完成之后,在进行汇总操作。在这个需求中需要实现主线程等待其他所有线程完成解析操作!
简单的方式可以使用join() 方法实现,比如下面的代码中展示了使用join() 实现的逻辑,join() 用于等待线程操作结束。其实现的原理是不断地检查join() 线程是否存活,如果存在则让线程不停地等待,线程终止后,线程的this.notifyAll() 方法会被调用。
public class ThreadJoinExample {static File file = new File("/tmp");public static void main(String[] args) {File[] files = file.listFiles();List<Thread> readFileThread = new ArrayList<>(files.length);for (File file : files) {ReadThread thread = new ReadThread(file);thread.start();readFileThread.add(thread);}try {for (Thread thread : readFileThread) {thread.join();}} catch (Exception ignore) {}}static class ReadThread extends Thread {private final File file;public ReadThread(File file) {this.file = file;}@Overridepublic void run() {System.out.println(file.getAbsolutePath());}}}
JDK 1.5 开始提供的 CountDownLatch ,它能够更加简单清楚的实现 join 的功能,并且比 join 功能更多。
public class CountDownLatchExample {
static File file = new File("/tmp");
public static void main(String[] args) throws InterruptedException {
File[] files = file.listFiles();
int count = files.length;
CountDownLatch latch = new CountDownLatch(count);
for (File file : files) {
ReadThread thread = new ReadThread(file, latch);
thread.start();
}
// 等待,直到计数器的值变为 0
latch.await();
}
static class ReadThread extends Thread {
private final File file;
private final CountDownLatch latch;
public ReadThread(File file, CountDownLatch latch) {
this.file = file;
this.latch = latch;
}
@Override
public void run() {
System.out.println(file.getAbsolutePath());
// 执行完成之后计数器减 1
latch.countDown();
}
}
}
CountDownLatch 的构造方法接收一个整数作为计数器,如果需要等待 N 个线程执行完成,这里就传入N。当线程调用 countDown() 的时候,N的值就会减少 1 ,CountDownLatch 的 await() 方法会阻塞当前线程,直到N的值变为 0。
从 countDown() 方法的源码中可以看到,其本质还是一个共享锁,countDown() 实际上是释放锁,await() 实际上是尝试获取共享锁。
// 构造一个同步器 state的值设置为 count
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 每次释放一个锁
public void countDown() {
sync.releaseShared(1);
}
// 如果线程获取getState == 0 释放完成
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
// 如果不大于 0 ,则说明还有值未释放,直接进入等待队列
// 如果等于1 也即是 getState() == 0, 立即返回
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
当前CountDownLcatch 也提供了超时等待模型,用于超时等待线程执行完成,`await(long time,TimeUnit unit) 。 值得注意的是,这里N必须大于等于0,当N等于0 的时候,执行await 并不会阻塞,另外CountDownLatch不能重新初始化计数器N或者修改内部的计数器值。
需要说明的是,上文中提到 countDown() 实际上是对锁的解锁,await() 实际上是尝试获取共享锁锁,那么根据 happens-before 原则,我们可以知: 一个线程调用 countDown() 方法是 happens-before 另外一个线程调用 await() 方法的
