CountDownLatch是什么
CountDownLatch是基于AQS的阻塞工具,阻塞一个或者多个线程,直到所有的线程都执行完成。
CountDownLatch解决了什么问题
当一个任务运算量比较大的时候,需要拆分为各种子任务,必须要所有子任务完成后才能汇总为总任务。
使用并发模拟的时候可以使用CountDownLatch.也可以设置超时等待时间,
CountDownLatch 用法
1.阻塞所有线程执行完成后再执行
@Slf4jpublic class CountDownLatchExample {//线程数量private static final int THREAD_NUM = 10;// CountdownLatch阻塞模拟public static void main(String[] args) throws InterruptedException {// 创建线程池 用于执行线程ExecutorService executorService = Executors.newCachedThreadPool();//创建countDownLatchfinal CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);long startTime = System.currentTimeMillis();//循环创建线程for (int i = 0; i < THREAD_NUM; i++) {final int a = i;executorService.execute(() -> {try {test(a);} catch (Exception e) {log.error("Exception", e);} finally {countDownLatch.countDown();}});}countDownLatch.await();long endTime = System.currentTimeMillis();log.info("执行完毕,{}-{}",startTime,endTime);executorService.shutdown();}private static void test(int num) throws InterruptedException {Thread.sleep(100);log.info("{}-{}", num,System.currentTimeMillis());Thread.sleep(100);}}
结果
10:56:02.544 [pool-1-thread-5] INFO AQSExample.CountDownLatchExampleTimeOutTest - 4-1559271362542
10:56:02.543 [pool-1-thread-2] INFO AQSExample.CountDownLatchExampleTimeOutTest - 1-1559271362541
10:56:02.548 [pool-1-thread-10] INFO AQSExample.CountDownLatchExampleTimeOutTest - 9-1559271362548
10:56:02.544 [pool-1-thread-7] INFO AQSExample.CountDownLatchExampleTimeOutTest - 6-1559271362543
10:56:02.543 [pool-1-thread-4] INFO AQSExample.CountDownLatchExampleTimeOutTest - 3-1559271362542
10:56:02.544 [pool-1-thread-3] INFO AQSExample.CountDownLatchExampleTimeOutTest - 2-1559271362541
10:56:02.544 [pool-1-thread-8] INFO AQSExample.CountDownLatchExampleTimeOutTest - 7-1559271362543
10:56:02.544 [pool-1-thread-6] INFO AQSExample.CountDownLatchExampleTimeOutTest - 5-1559271362543
10:56:02.543 [pool-1-thread-1] INFO AQSExample.CountDownLatchExampleTimeOutTest - 0-1559271362541
10:56:02.548 [pool-1-thread-9] INFO AQSExample.CountDownLatchExampleTimeOutTest - 8-1559271362548
10:56:02.548 [main] INFO AQSExample.CountDownLatchExampleTimeOutTest - 执行完毕,1559271362441-1559271362548
上述结果可以看到,所有的线程执行完毕后主线程才打印出“执行完毕”。
2.按照超时时间阻塞所有线程执行,到时间后直接释放。
如果我们设置超时时间之后
@Slf4jpublic class CountDownLatchExampleTimeOutTest {//线程数量private static final int THREAD_NUM = 10;// CountdownLatch阻塞模拟public static void main(String[] args) throws InterruptedException {// 创建线程池 用于执行线程ExecutorService executorService = Executors.newCachedThreadPool();//创建countDownLatchfinal CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);//循环创建线程long startTime = System.currentTimeMillis();for (int i = 0; i < THREAD_NUM; i++) {final int a = i;executorService.execute(() -> {try {test(a);} catch (Exception e) {log.error("Exception", e);} finally {countDownLatch.countDown();}});}countDownLatch.await(10,TimeUnit.MILLISECONDS);long endTime = System.currentTimeMillis();log.info("执行完毕,{}-{}",startTime,endTime);executorService.shutdown();}private static void test(int num) throws InterruptedException {Thread.sleep(50);log.info("{}-{}", num,System.currentTimeMillis());}}
由于每个线程延迟50毫秒之后再执行,count已经超时了所以优先打印出了执行完毕的结果。然后在继续执行线程中的内容。
结果
11:14:55.509 [main] INFO AQSExample.CountDownLatchExampleTimeOutTest - 执行完毕,1559272495373-1559272495506
11:14:55.542 [pool-1-thread-1] INFO AQSExample.CountDownLatchExampleTimeOutTest - 0-1559272495542
11:14:55.542 [pool-1-thread-2] INFO AQSExample.CountDownLatchExampleTimeOutTest - 1-1559272495542
11:14:55.543 [pool-1-thread-3] INFO AQSExample.CountDownLatchExampleTimeOutTest - 2-1559272495543
11:14:55.543 [pool-1-thread-4] INFO AQSExample.CountDownLatchExampleTimeOutTest - 3-1559272495543
11:14:55.543 [pool-1-thread-5] INFO AQSExample.CountDownLatchExampleTimeOutTest - 4-1559272495543
11:14:55.544 [pool-1-thread-6] INFO AQSExample.CountDownLatchExampleTimeOutTest - 5-1559272495544
11:14:55.544 [pool-1-thread-7] INFO AQSExample.CountDownLatchExampleTimeOutTest - 6-1559272495544
11:14:55.545 [pool-1-thread-9] INFO AQSExample.CountDownLatchExampleTimeOutTest - 8-1559272495545
11:14:55.545 [pool-1-thread-8] INFO AQSExample.CountDownLatchExampleTimeOutTest - 7-1559272495545
11:14:55.545 [pool-1-thread-10] INFO AQSExample.CountDownLatchExampleTimeOutTest - 9-1559272495545
CountDownLatch源码解析
CountDownLatch源码中的方法和属性并不多,下面我们来一一解析。
1.AQS框架以及构造方法
//当前对象中私有阻塞工具private final Sync sync;// 模板方法模式重写AQS工具private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;// 共享阻塞AQSSync(int count) {setState(count);}// 获取当前还剩多少资源可以使用int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}//构造方法创建一个锁对象public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}
2.countDown()方法解析
该方法用于线程执行完毕后减计统计数量,
// 该方法时释放一个共享锁。当所有锁都被释放完成后主线程就能继续执行了。public void countDown() {sync.releaseShared(1);}
3.await()方法解析
//拦截主线程的方法。主线程在这里等待条件达成后继续执行。public void await() throws InterruptedException {//在这里阻塞线程的执行sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//这里判断是否还有可以共享的资源// 如果有则返回-1 否则返回 1,重写AQS的方法参见(1.AQS框架以及构造方法)if (tryAcquireShared(arg) < 0)// 有资源则运行阻塞自旋等待所有线程执行完毕doAcquireSharedInterruptibly(arg);// 无资源可用就让线程继续执行}// 带延迟的减少数据拦截方法// 返回的结果是没有跑完全部线程就继续执行下一步了。public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {//线程如果被中断则抛出异常if (Thread.interrupted())throw new InterruptedException();// 表示如果线程被执行完了直接返回成功,如果没有执行完则看等待时间来决定是否要继续执行。return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}
CountDownLatch 总结
CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。 在分散计算统一合成结果,按某个流程加载资源的方面有着非诚好用的效果。下一篇我们讲解像蓄水池一样功能的Semphore。
