一、 CyclicBarrier 的介绍
1.1 介绍
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(common barrier point)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
1.2 CountDownLatch 与 CyclicBarrier 的区别?
- CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
- CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
- CountDownLatch 内部自行采用 AQS实现的共享锁 ;而 CyclicBarrier内部采用 可重入锁 ReentrantLock 和Condition
1.3 CyclicBarrier 的API
//构造方法//创建一个新的CyclicBarrier, 当给定数量parties的线程全部到达Barrier时//Barrier(栏栅)将会被绊倒(放行全部线程执行), 而预定义执行线程为空public CyclicBarrier(int parties);//创建一个新的CyclicBarrier, 当给定数量parties的线程全部到达Barrier时//Barrier(栏栅)将会被绊倒(放行全部线程执行),//而当/Barrier(栏栅)将会被绊倒后, 预定义执行线程barrierAction将会执行public CyclicBarrier(int parties, Runnable barrierAction);//返回要求启动此 barrier 的参与者数目。public int getParties();//在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。public int await();//在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。public int await(long timeout, TimeUnit unit);//查询此屏障是否处于损坏状态。public boolean isBroken() ;public void reset();//返回当前在屏障处等待的参与者数目。public int getNumberWaiting();
二、 CyclicBarrier 的内部结构

CyclicBarrier是包含了”ReentrantLock对象lock(不公平锁)”和”Condition对象trip”,它是通过独占锁实现的。
三、CyclicBarrier 源码解析
3.1 成员变量
//屏障 可重入锁private final ReentrantLock lock = new ReentrantLock();//条件等待,直到屏障被绊倒private final Condition trip = lock.newCondition();//启动屏障的数量private final int parties;//屏障启动时执行的线程private final Runnable barrierCommand;//当前一代//屏障每次被绊倒,都会改变generationprivate Generation generation = new Generation();//返回当前在屏障处等待的参与者数目。private int count;
3.2 构造方法
public CyclicBarrier(int parties) {this(parties, null);}//parties:在屏障处等待的参与者数目//barrierAction: 在屏障被绊倒时,执行的预定义操作public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;//返回当前在屏障处等待的参与者数目。this.count = parties;this.barrierCommand = barrierAction;}
3.3 await 方法
await 方法图示
源码解析
//await()是通过dowait()实现的。private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;//获取独占锁lock.lock();try {//保存当前generationfinal Generation g = generation;//判断当前栏栅是否 ”被损坏“if (g.broken)throw new BrokenBarrierException();//判断当前线程是否被打断if (Thread.interrupted()) {// 则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。breakBarrier();throw new InterruptedException();}//当前在屏障处等待的参与者数目 -1int index = --count;//如果 当前在屏障处等待的参与者数目等于 0if (index == 0) { // trippedboolean ranAction = false;try {final Runnable command = barrierCommand;//预定义操作不为空,则执行if (command != null)command.run();ranAction = true;// 唤醒所有等待线程,并更新generation。nextGeneration();return 0;} finally {if (!ranAction)breakBarrier();}}// loop until tripped, broken, interrupted, or timed out// 当前线程一直阻塞,直到以下情况//1. “有parties个线程到达barrier”//2. “当前线程 或等待线程 被中断”//3. “超时等待”//4. CyclicBarrier 被重置// 当前线程才继续执行。for (;;) {try {// 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待。if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// 如果等待过程中,线程被中断,则执行下面的函数。if (g == generation && ! g.broken) {//breakBarrier 主要设置当前CyclicBarrier为 broken//唤醒所有等待线程breakBarrier();throw ie;} else {// We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}// 如果“当前generation已经损坏”,则抛出异常。if (g.broken)throw new BrokenBarrierException();// 如果“generation已经换代”,则返回index。(CyclicBarrier被重置)if (g != generation)return index;// 如果是“超时等待”,并且时间已到,//则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {//释放锁lock.unlock();}}
四、CyclicBarrier 的使用示例
4.1 用于多线程计算数据,最后合并计算结果的场景
//结果ConcurrentHashMap<String, Integer> result = new ConcurrentHashMap<>();//屏障CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {@Overridepublic void run() {int total = 0;//撤除屏障,汇总结果for (Map.Entry<String, Integer> entry : result.entrySet()) {total += entry.getValue();}System.out.println("总结果==>" + total);}});//创建4个线程计算for (int i = 0; i < 4; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(new Random().nextInt(5));//计算result.put(Thread.currentThread().getName(), 1);System.out.println(Thread.currentThread().getName() + "计算完成, 等待");//计算完成,插入屏障cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}).start();}
- CountDownLatch 版
```java
ConcurrentHashMap
result = new ConcurrentHashMap<>();
CountDownLatch countDownLatch = new CountDownLatch(4); //创建4个线程计算 for (int i = 0; i < 4; i++) { new Thread(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(new Random().nextInt(5)); //计算 result.put(Thread.currentThread().getName(), 1); System.out.println(Thread.currentThread().getName() + “计算完成, 等待”);
countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}}).start();
} new Thread(new Runnable() { @Override public void run() { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); }
int total = 0;//撤除屏障,汇总结果for (Map.Entry<String, Integer> entry : result.entrySet()) {total += entry.getValue();}System.out.println("总结果==>" + total);}
参考
- https://www.cnblogs.com/skywang12345/p/3533995.html
- 《Java 并发编程的艺术》
