什么是CyclicBarrier?
栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。
CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 相比 CountDownLatch 来说,要简单很多,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。
CyclicBarrier 和 CountDownLatch 确实有一定的相似性,它们都能阻塞一个或者一组线程,直到某种预定的条件达到之后,这些之前在等待的线程才会统一出发,继续向下执行。正因为它们有这个相似点,你可能会认为它们的作用是完全一样的,其实并不是。
CyclicBarrier 可以构造出一个集结点,当某一个线程执行 await() 的时候,它就会到这个集结点开始等待,等待这个栅栏被撤销。直到预定数量的线程都到了这个集结点之后,这个栅栏就会被撤销,之前等待的线程就在此刻统一出发,继续去执行剩下的任务。
在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理,下面我们先看看CyclicBarrier有哪些成员变量。
官方释义:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released __
- > @since > _1.5 _> *
API解读:
//防护栅栏入口的锁,同步操作锁
private final ReentrantLock lock = new ReentrantLock();
//等待直到跳闸的条件,线程拦截器
private final Condition trip = lock.newCondition();
//所有线程数量,每次拦截的线程数
private final int parties;
//换闸换代前执行的任务
private final Runnable barrierCommand;
//表示栅栏的当前代
private Generation generation = new Generation();
//计数器
private int count;
//静态内部类Generation
/**
* 屏障的每次使用都表示为一个生成实例。每当隔离栅被触发或重置时,生成都会更改。
* 使用屏障可以与线程相关联的世代很多-由于非确定性方式可以将锁分配给等待的线程-但一次只能激活其中之一
*({@code count}适用于那一代,当前计数) ),
* 其余的全部损坏或绊倒。如果有中断但没有后续的重置,则不需要活跃的生成。
*/
private static class Generation {
boolean broken = false;
}
方法
int await() 等待所有 parties已经在这个障碍上调用了 await 。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
int await(long timeout, TimeUnit unit) 等待所有 parties已经在此屏障上调用 await ,或指定的等待时间过去。
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
int getNumberWaiting() 返回目前正在等待障碍的各方的数量。
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
int getParties() 返回旅行这个障碍所需的parties数量。
public int getParties() {
return parties;
}
boolean isBroken() 查询这个障碍是否处于破碎状态。
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
void reset() 将屏障重置为初始状态。
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
CyclicBarrier(int parties) 构造方法 指的是需要几个线程一起到达,才可以使所有线程取消等待
public CyclicBarrier(int parties) {
this(parties, null);
}
CyclicBarrier(int parties, Runnable barrierAction) 构造方法 额外指定了一个参数,用于在所有线程达到屏障时,优先执行 barrierAction。 ```java /**
- 创建一个新的 CyclicBarrier ,它将在给定数量的参与者(线程)等待它时将跳闸,
- 并在屏障被跳闸时执行给定的屏障动作,由最后一个进入屏障的线程执行
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
}if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
- dowait()
```java
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 当前代
final Generation g = generation;
// 如果这代损坏了,抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 如果线程中断了,抛出异常
if (Thread.interrupted()) {
// 将损坏状态设置为true
// 并通知其他阻塞在此栅栏上的线程
breakBarrier();
throw new InterruptedException();
}
// 获取下标
int index = --count;
// 如果是 0,说明最后一个线程调用了该方法
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 执行栅栏任务
if (command != null)
command.run();
ranAction = true;
// 更新一代,将count重置,将generation重置
// 唤醒之前等待的线程
nextGeneration();
return 0;
} finally {
// 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 如果没有时间限制,则直接等待,直到被唤醒
if (!timed)
trip.await();
// 如果有时间限制,则等待指定时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 当前代没有损坏
if (g == generation && ! g.broken) {
// 让栅栏失效
breakBarrier();
throw ie;
} else {
// 上面条件不满足,说明这个线程不是这代的
// 就不会影响当前这代栅栏的执行,所以,就打个中断标记
Thread.currentThread().interrupt();
}
}
// 当有任何一个线程中断了,就会调用breakBarrier方法
// 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
if (g.broken)
throw new BrokenBarrierException();
// g != generation表示正常换代了,返回当前线程所在栅栏的下标
// 如果 g == generation,说明还没有换代,那为什么会醒了?
// 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
// 正是因为这个原因,才需要generation来保证正确。
if (g != generation)
return index;
// 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放独占锁
lock.unlock();
}
}
breakBarrier()将当前的障碍生成设置为已破坏并唤醒所有人。仅在保持锁定状态下调用。
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
nextGeneration()更新障碍旅行的状态并唤醒所有人。仅在锁定时调用
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
代码示例:
场景模拟:众所周知,我们去做过山车或者大摆锤等一些娱乐游戏,都要等到一定的人数,设备才会启动,我们假设必须要5个人才会启动设备,即玩一次最少要5个人起步,代码如下:
public static void main(String[] args) {
CyclicBarrier cyclic = new CyclicBarrier(5);
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
int num = i + 1;
Runnable runnable = () -> {
try{
Thread.sleep((long)(Math.random() * 10000));
System.out.println("第"+num+"个人前来坐过山车,开始等待其他人");
cyclic.await();
System.out.println("第"+num+"开始坐过山车");
}catch (Exception e) {
e.printStackTrace();
}
};
service.submit(runnable);
}
service.shutdown();
}
执行结果:
第9个人前来坐过山车,开始等待其他人 第8个人前来坐过山车,开始等待其他人 第5个人前来坐过山车,开始等待其他人 第3个人前来坐过山车,开始等待其他人
第4个人前来坐过山车,开始等待其他人
第4开始坐过山车
第9开始坐过山车
第5开始坐过山车
第8开始坐过山车
第3开始坐过山车
第1个人前来坐过山车,开始等待其他人
第2个人前来坐过山车,开始等待其他人
第7个人前来坐过山车,开始等待其他人
第10个人前来坐过山车,开始等待其他人
第6个人前来坐过山车,开始等待其他人
第6开始坐过山车
第1开始坐过山车
第7开始坐过山车
第2开始坐过山车
第10开始坐过山车
此时到了这里,相信有的杠精就开始抬杠了,假如人多的时候,需要排队,不可能来一个进一个,其他人等待吧,应该是排队的时候,一次进去几个,进行通知一下,每次进去多少人,其让人有序排队;在这我们就可以使用另一个构造函数了,代码如下:
public static void main(String[] args) {
CyclicBarrier cyclic = new CyclicBarrier(5, () -> {
try {
System.out.println("一波完了,下一波进来吧!");
} catch (Exception e) {
e.printStackTrace();
}
});
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
int num = i + 1;
Runnable runnable = () -> {
try {
Thread.sleep((long)(Math.random() * 10000));
System.out.println("第"+num+"个人排队坐过山车,等待放行。。。。。。");
cyclic.await();
System.out.println("第"+num+"开始坐过山车");
}catch (Exception e) {
e.printStackTrace();
}
};
service.submit(runnable);
}
service.shutdown();
}
执行结果:
第3个人排队坐过山车,等待放行。。。。。。
第7个人排队坐过山车,等待放行。。。。。。
第4个人排队坐过山车,等待放行。。。。。。
第8个人排队坐过山车,等待放行。。。。。。
第9个人排队坐过山车,等待放行。。。。。。
一波完了,下一波进来吧!
第9开始坐过山车
第7开始坐过山车
第3开始坐过山车
第8开始坐过山车
第4开始坐过山车
第1个人排队坐过山车,等待放行。。。。。。
第10个人排队坐过山车,等待放行。。。。。。
第5个人排队坐过山车,等待放行。。。。。。
第6个人排队坐过山车,等待放行。。。。。。
第2个人排队坐过山车,等待放行。。。。。。
一波完了,下一波进来吧!
第2开始坐过山车
第1开始坐过山车
第5开始坐过山车
第10开始坐过山车
第6开始坐过山车
总结 CyclicBarrier 和 CountDownLatch 的异同:
- 相同点:都能阻塞一个或一组线程,直到某个预设的条件达成发生,再统一出发。
- 但是它们也有很多不同点,具体如下。
- 作用对象不同:CyclicBarrier 要等固定数量的线程都到达了栅栏位置才能继续执行,而 CountDownLatch 只需等待数字倒数到 0,也就是说 CountDownLatch 作用于事件,但 CyclicBarrier 作用于线程;CountDownLatch 是在调用了 countDown 方法之后把数字倒数减 1,而 CyclicBarrier 是在某线程开始等待后把计数减 1。
- 可重用性不同:CountDownLatch 在倒数到 0 并且触发门闩打开后,就不能再次使用了,除非新建一个新的实例;而 CyclicBarrier 可以重复使用。CyclicBarrier 还可以随时调用 reset 方法进行重置,如果重置时有线程已经调用了 await 方法并开始等待,那么这些线程则会抛出 BrokenBarrierException 异常。
- 执行动作不同:CyclicBarrier 有执行动作 barrierAction,而 CountDownLatch 没这个功能。