一、 CyclicBarrier 的介绍
1.1 介绍
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(common barrier point)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
1.2 CountDownLatch 与 CyclicBarrier 的区别?
- CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
- CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
- CountDownLatch 内部自行采用 AQS实现的共享锁 ;而 CyclicBarrier内部采用 可重入锁 ReentrantLock 和Condition
1.3 CyclicBarrier 的API
//构造方法
//创建一个新的CyclicBarrier, 当给定数量parties的线程全部到达Barrier时
//Barrier(栏栅)将会被绊倒(放行全部线程执行), 而预定义执行线程为空
public CyclicBarrier(int parties);
//创建一个新的CyclicBarrier, 当给定数量parties的线程全部到达Barrier时
//Barrier(栏栅)将会被绊倒(放行全部线程执行),
//而当/Barrier(栏栅)将会被绊倒后, 预定义执行线程barrierAction将会执行
public CyclicBarrier(int parties, Runnable barrierAction);
//返回要求启动此 barrier 的参与者数目。
public int getParties();
//在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
public int await();
//在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
public int await(long timeout, TimeUnit unit);
//查询此屏障是否处于损坏状态。
public boolean isBroken() ;
public void reset();
//返回当前在屏障处等待的参与者数目。
public int getNumberWaiting();
二、 CyclicBarrier 的内部结构
CyclicBarrier是包含了”ReentrantLock对象lock(不公平锁)”和”Condition对象trip”,它是通过独占锁实现的。
三、CyclicBarrier 源码解析
3.1 成员变量
//屏障 可重入锁
private final ReentrantLock lock = new ReentrantLock();
//条件等待,直到屏障被绊倒
private final Condition trip = lock.newCondition();
//启动屏障的数量
private final int parties;
//屏障启动时执行的线程
private final Runnable barrierCommand;
//当前一代
//屏障每次被绊倒,都会改变generation
private Generation generation = new Generation();
//返回当前在屏障处等待的参与者数目。
private int count;
3.2 构造方法
public CyclicBarrier(int parties) {
this(parties, null);
}
//parties:在屏障处等待的参与者数目
//barrierAction: 在屏障被绊倒时,执行的预定义操作
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
//返回当前在屏障处等待的参与者数目。
this.count = parties;
this.barrierCommand = barrierAction;
}
3.3 await 方法
await 方法图示
源码解析
//await()是通过dowait()实现的。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//获取独占锁
lock.lock();
try {
//保存当前generation
final Generation g = generation;
//判断当前栏栅是否 ”被损坏“
if (g.broken)
throw new BrokenBarrierException();
//判断当前线程是否被打断
if (Thread.interrupted()) {
// 则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
breakBarrier();
throw new InterruptedException();
}
//当前在屏障处等待的参与者数目 -1
int index = --count;
//如果 当前在屏障处等待的参与者数目等于 0
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//预定义操作不为空,则执行
if (command != null)
command.run();
ranAction = true;
// 唤醒所有等待线程,并更新generation。
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 当前线程一直阻塞,直到以下情况
//1. “有parties个线程到达barrier”
//2. “当前线程 或等待线程 被中断”
//3. “超时等待”
//4. CyclicBarrier 被重置
// 当前线程才继续执行。
for (;;) {
try {
// 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待。
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果等待过程中,线程被中断,则执行下面的函数。
if (g == generation && ! g.broken) {
//breakBarrier 主要设置当前CyclicBarrier为 broken
//唤醒所有等待线程
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 如果“当前generation已经损坏”,则抛出异常。
if (g.broken)
throw new BrokenBarrierException();
// 如果“generation已经换代”,则返回index。(CyclicBarrier被重置)
if (g != generation)
return index;
// 如果是“超时等待”,并且时间已到,
//则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放锁
lock.unlock();
}
}
四、CyclicBarrier 的使用示例
4.1 用于多线程计算数据,最后合并计算结果的场景
//结果
ConcurrentHashMap<String, Integer> result = new ConcurrentHashMap<>();
//屏障
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
int total = 0;
//撤除屏障,汇总结果
for (Map.Entry<String, Integer> entry : result.entrySet()) {
total += entry.getValue();
}
System.out.println("总结果==>" + total);
}
});
//创建4个线程计算
for (int i = 0; i < 4; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
//计算
result.put(Thread.currentThread().getName(), 1);
System.out.println(Thread.currentThread().getName() + "计算完成, 等待");
//计算完成,插入屏障
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}
- CountDownLatch 版
```java
ConcurrentHashMap
result = new ConcurrentHashMap<>();
CountDownLatch countDownLatch = new CountDownLatch(4); //创建4个线程计算 for (int i = 0; i < 4; i++) { new Thread(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(new Random().nextInt(5)); //计算 result.put(Thread.currentThread().getName(), 1); System.out.println(Thread.currentThread().getName() + “计算完成, 等待”);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
} new Thread(new Runnable() { @Override public void run() { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); }
int total = 0;
//撤除屏障,汇总结果
for (Map.Entry<String, Integer> entry : result.entrySet()) {
total += entry.getValue();
}
System.out.println("总结果==>" + total);
}
参考
- https://www.cnblogs.com/skywang12345/p/3533995.html
- 《Java 并发编程的艺术》