1 概述
1.1 定义
从字面意思理解,CyclicBarrier 是回环屏障的意思 ,它可以让一组线程全部达到一个状态后再全部同时执行,这里之所以叫作回环是因为当所 等待线程执行完毕,并重置 CyclicBarrier 的状态后它可以被重用。之所以 叫作屏障是因为线程调用 await 方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了 await 方法后,线程 就会冲破屏障,继续 下运行。
1.2 示例
1.2.1 简要示例
public class CyclicBarrierTest {
private static CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
//所有子线程执行完毕后再执行的任务
System.out.println("主线程任务...");
}
});
public static void main(String[] args) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
ExecutorService service = new ThreadPoolExecutor(4, 5, 1L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
service.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("task1阻塞前:" + barrier.getNumberWaiting());
Thread.sleep(3000);
barrier.await();
System.out.println("task1阻塞后:" + barrier.getNumberWaiting());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
service.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("task2阻塞前:" + barrier.getNumberWaiting());
barrier.await();
System.out.println("task2阻塞后:" + barrier.getNumberWaiting());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
service.shutdown();
}
}
输出:
其第一 数为计数器初始值,第二个参数Runable是当计数器值为0时需要执行的任务。在main 函数里面首先创建了 1个大小 为2的线程池,然后添加两个子任 程池, 任务在执行 自己的逻辑后会调 用await 方法。一开始计数器值为 当第一个线程调用 awai 方法时,计数器值会递减 为1。由于此时计数器值不为0 ,所以 前线程就到了屏障点而被阻塞 然后第2个线程调用 await时,会进入 障,计数器值也会递减,现在计数器值为0这时就会去执行CyclicBanier 造函数中的任务,执行完毕后退出屏障点,并且唤醒被阻塞的第2个线程,这时候第1个线程也会退出屏障点继续向下运行。
1.2.2 复杂示例
假设一个任务由阶段一、阶段二和阶段三组成,每个线程要串行地执行阶段一、阶段二和阶段三,当多个线程执行该任务时,必须要保证所有线程的阶段一全部完成后才能进入阶段二执行, 当所有线程的阶段三全部完成后才能进入阶段3执行 下面使用
CyclicBanier 成这个需求
public class CyclicBarrierTest2 {
private static CyclicBarrier barrier = new CyclicBarrier(2);
public static void main(String[] args) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
ExecutorService service =
new ThreadPoolExecutor(5, 8, 1,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 3; i++) {
int finalI = i;
service.submit(new Runnable() {
@SneakyThrows
@Override
public void run() {
System.out.println("线程池" + finalI + " begin:>>>" + Thread.currentThread().getName() + " number:" + barrier.getNumberWaiting() + " ;Parties:" + barrier.getParties());
barrier.await();
System.out.println("线程池" + finalI + " wait:>>>" + Thread.currentThread().getName() + " number:" + barrier.getNumberWaiting() + " ;Parties:" + barrier.getParties());
barrier.await();
System.out.println("线程池" + finalI + " after:>>>" + Thread.currentThread().getName() + " number:" + barrier.getNumberWaiting() + " ;Parties:" + barrier.getParties());
}
});
}
service.shutdown();
}
}
输出:
线程池0 begin:>>>demo-pool-0 number:0 ;Parties:2
线程池1 begin:>>>demo-pool-1 number:1 ;Parties:2
线程池1 wait:>>>demo-pool-1 number:0 ;Parties:2
线程池2 begin:>>>demo-pool-2 number:1 ;Parties:2
线程池0 wait:>>>demo-pool-0 number:1 ;Parties:2
线程池1 after:>>>demo-pool-1 number:1 ;Parties:2
线程池2 wait:>>>demo-pool-2 number:1 ;Parties:2
线程池2 after:>>>demo-pool-2 number:0 ;Parties:2
线程池0 after:>>>demo-pool-0 number:0 ;Parties:2