什么是CyclicBarrier?
栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。
CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 相比 CountDownLatch 来说,要简单很多,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。
CyclicBarrier 和 CountDownLatch 确实有一定的相似性,它们都能阻塞一个或者一组线程,直到某种预定的条件达到之后,这些之前在等待的线程才会统一出发,继续向下执行。正因为它们有这个相似点,你可能会认为它们的作用是完全一样的,其实并不是。
CyclicBarrier 可以构造出一个集结点,当某一个线程执行 await() 的时候,它就会到这个集结点开始等待,等待这个栅栏被撤销。直到预定数量的线程都到了这个集结点之后,这个栅栏就会被撤销,之前等待的线程就在此刻统一出发,继续去执行剩下的任务。
在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理,下面我们先看看CyclicBarrier有哪些成员变量。
官方释义:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released __
- > @since > _1.5 _> *
API解读:

//防护栅栏入口的锁,同步操作锁private final ReentrantLock lock = new ReentrantLock();//等待直到跳闸的条件,线程拦截器private final Condition trip = lock.newCondition();//所有线程数量,每次拦截的线程数private final int parties;//换闸换代前执行的任务private final Runnable barrierCommand;//表示栅栏的当前代private Generation generation = new Generation();//计数器private int count;//静态内部类Generation/*** 屏障的每次使用都表示为一个生成实例。每当隔离栅被触发或重置时,生成都会更改。* 使用屏障可以与线程相关联的世代很多-由于非确定性方式可以将锁分配给等待的线程-但一次只能激活其中之一*({@code count}适用于那一代,当前计数) ),* 其余的全部损坏或绊倒。如果有中断但没有后续的重置,则不需要活跃的生成。*/private static class Generation {boolean broken = false;}
方法
int await() 等待所有 parties已经在这个障碍上调用了 await 。
public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}
int await(long timeout, TimeUnit unit) 等待所有 parties已经在此屏障上调用 await ,或指定的等待时间过去。
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));}
int getNumberWaiting() 返回目前正在等待障碍的各方的数量。
public int getNumberWaiting() {final ReentrantLock lock = this.lock;lock.lock();try {return parties - count;} finally {lock.unlock();}}
int getParties() 返回旅行这个障碍所需的parties数量。
public int getParties() {return parties;}
boolean isBroken() 查询这个障碍是否处于破碎状态。
public boolean isBroken() {final ReentrantLock lock = this.lock;lock.lock();try {return generation.broken;} finally {lock.unlock();}}
void reset() 将屏障重置为初始状态。
public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {breakBarrier(); // break the current generationnextGeneration(); // start a new generation} finally {lock.unlock();}}
CyclicBarrier(int parties) 构造方法 指的是需要几个线程一起到达,才可以使所有线程取消等待
public CyclicBarrier(int parties) {this(parties, null);}
CyclicBarrier(int parties, Runnable barrierAction) 构造方法 额外指定了一个参数,用于在所有线程达到屏障时,优先执行 barrierAction。 ```java /**
- 创建一个新的 CyclicBarrier ,它将在给定数量的参与者(线程)等待它时将跳闸,
- 并在屏障被跳闸时执行给定的屏障动作,由最后一个进入屏障的线程执行
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
}if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;
- dowait()```javaprivate int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {// 获取独占锁final ReentrantLock lock = this.lock;lock.lock();try {// 当前代final Generation g = generation;// 如果这代损坏了,抛出异常if (g.broken)throw new BrokenBarrierException();// 如果线程中断了,抛出异常if (Thread.interrupted()) {// 将损坏状态设置为true// 并通知其他阻塞在此栅栏上的线程breakBarrier();throw new InterruptedException();}// 获取下标int index = --count;// 如果是 0,说明最后一个线程调用了该方法if (index == 0) { // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;// 执行栅栏任务if (command != null)command.run();ranAction = true;// 更新一代,将count重置,将generation重置// 唤醒之前等待的线程nextGeneration();return 0;} finally {// 如果执行栅栏任务的时候失败了,就将损坏状态设置为trueif (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {try {// 如果没有时间限制,则直接等待,直到被唤醒if (!timed)trip.await();// 如果有时间限制,则等待指定时间else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// 当前代没有损坏if (g == generation && ! g.broken) {// 让栅栏失效breakBarrier();throw ie;} else {// 上面条件不满足,说明这个线程不是这代的// 就不会影响当前这代栅栏的执行,所以,就打个中断标记Thread.currentThread().interrupt();}}// 当有任何一个线程中断了,就会调用breakBarrier方法// 就会唤醒其他的线程,其他线程醒来后,也要抛出异常if (g.broken)throw new BrokenBarrierException();// g != generation表示正常换代了,返回当前线程所在栅栏的下标// 如果 g == generation,说明还没有换代,那为什么会醒了?// 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。// 正是因为这个原因,才需要generation来保证正确。if (g != generation)return index;// 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {// 释放独占锁lock.unlock();}}
breakBarrier()将当前的障碍生成设置为已破坏并唤醒所有人。仅在保持锁定状态下调用。
private void breakBarrier() {generation.broken = true;count = parties;trip.signalAll();}
nextGeneration()更新障碍旅行的状态并唤醒所有人。仅在锁定时调用
private void nextGeneration() {// signal completion of last generationtrip.signalAll();// set up next generationcount = parties;generation = new Generation();}
代码示例:
场景模拟:众所周知,我们去做过山车或者大摆锤等一些娱乐游戏,都要等到一定的人数,设备才会启动,我们假设必须要5个人才会启动设备,即玩一次最少要5个人起步,代码如下:
public static void main(String[] args) {CyclicBarrier cyclic = new CyclicBarrier(5);ExecutorService service = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {int num = i + 1;Runnable runnable = () -> {try{Thread.sleep((long)(Math.random() * 10000));System.out.println("第"+num+"个人前来坐过山车,开始等待其他人");cyclic.await();System.out.println("第"+num+"开始坐过山车");}catch (Exception e) {e.printStackTrace();}};service.submit(runnable);}service.shutdown();}
执行结果:
第9个人前来坐过山车,开始等待其他人 第8个人前来坐过山车,开始等待其他人 第5个人前来坐过山车,开始等待其他人 第3个人前来坐过山车,开始等待其他人
第4个人前来坐过山车,开始等待其他人
第4开始坐过山车
第9开始坐过山车
第5开始坐过山车
第8开始坐过山车
第3开始坐过山车
第1个人前来坐过山车,开始等待其他人
第2个人前来坐过山车,开始等待其他人
第7个人前来坐过山车,开始等待其他人
第10个人前来坐过山车,开始等待其他人
第6个人前来坐过山车,开始等待其他人
第6开始坐过山车
第1开始坐过山车
第7开始坐过山车
第2开始坐过山车
第10开始坐过山车


此时到了这里,相信有的杠精就开始抬杠了,假如人多的时候,需要排队,不可能来一个进一个,其他人等待吧,应该是排队的时候,一次进去几个,进行通知一下,每次进去多少人,其让人有序排队;在这我们就可以使用另一个构造函数了,代码如下:
public static void main(String[] args) {CyclicBarrier cyclic = new CyclicBarrier(5, () -> {try {System.out.println("一波完了,下一波进来吧!");} catch (Exception e) {e.printStackTrace();}});ExecutorService service = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {int num = i + 1;Runnable runnable = () -> {try {Thread.sleep((long)(Math.random() * 10000));System.out.println("第"+num+"个人排队坐过山车,等待放行。。。。。。");cyclic.await();System.out.println("第"+num+"开始坐过山车");}catch (Exception e) {e.printStackTrace();}};service.submit(runnable);}service.shutdown();}
执行结果:
第3个人排队坐过山车,等待放行。。。。。。
第7个人排队坐过山车,等待放行。。。。。。
第4个人排队坐过山车,等待放行。。。。。。
第8个人排队坐过山车,等待放行。。。。。。
第9个人排队坐过山车,等待放行。。。。。。
一波完了,下一波进来吧!
第9开始坐过山车
第7开始坐过山车
第3开始坐过山车
第8开始坐过山车
第4开始坐过山车
第1个人排队坐过山车,等待放行。。。。。。
第10个人排队坐过山车,等待放行。。。。。。
第5个人排队坐过山车,等待放行。。。。。。
第6个人排队坐过山车,等待放行。。。。。。
第2个人排队坐过山车,等待放行。。。。。。
一波完了,下一波进来吧!
第2开始坐过山车
第1开始坐过山车
第5开始坐过山车
第10开始坐过山车
第6开始坐过山车
总结 CyclicBarrier 和 CountDownLatch 的异同:
- 相同点:都能阻塞一个或一组线程,直到某个预设的条件达成发生,再统一出发。
- 但是它们也有很多不同点,具体如下。
- 作用对象不同:CyclicBarrier 要等固定数量的线程都到达了栅栏位置才能继续执行,而 CountDownLatch 只需等待数字倒数到 0,也就是说 CountDownLatch 作用于事件,但 CyclicBarrier 作用于线程;CountDownLatch 是在调用了 countDown 方法之后把数字倒数减 1,而 CyclicBarrier 是在某线程开始等待后把计数减 1。
- 可重用性不同:CountDownLatch 在倒数到 0 并且触发门闩打开后,就不能再次使用了,除非新建一个新的实例;而 CyclicBarrier 可以重复使用。CyclicBarrier 还可以随时调用 reset 方法进行重置,如果重置时有线程已经调用了 await 方法并开始等待,那么这些线程则会抛出 BrokenBarrierException 异常。
- 执行动作不同:CyclicBarrier 有执行动作 barrierAction,而 CountDownLatch 没这个功能。
