概述
Cyclic
意为循环的,Barrier
意为栅栏。CyclicBarrier 则意为周期性的栅栏。与 CountDownLatch 相比,对象是可以重复使用的。
底层通过组合 ReentrantLock 和 Condition 以实现周期性栅栏功能。CountDownLatch
基于 AQS 的共享模式实现,而 CyclicBarrier
基于 Condition
实现。基本流程如下:
源码分析
内部类 Generation
// java.util.concurrent.CyclicBarrier.Generation
/**
* CyclicBarrier 是可以重复使用的,我们把每次从开始使用到所有线程"穿过"栅栏看成一代,或一个周期
*
*/
private static class Generation {
boolean broken = false;
}
重要的变量
// 栅栏入口的需要从这里获得锁
private final ReentrantLock lock = new ReentrantLock();
// 这是 CyclicBarrier 的条件,只有当所有线程到达栅栏这一条件满足后,才会打开栅栏,
// 所有线程继续执行
private final Condition trip = lock.newCondition();
// 参与的线程数,由构造函数设置
private final int parties;
// 当打开栅栏后,由最后一个到达的线程执行里面的代码
// 比如可以合并之前线程的工作等等
private final Runnable barrierCommand;
// 当前线程所处的"代"或周期
private Generation generation = new Generation();
// 还没有到达栅栏的线程数。即还需要等待多少个线程到达栅栏。
private int count;
开启新的周期:nextGeneration
当最后一个线程到达栅栏时,调用这个方法唤醒其它线程,同时创建并初始化一个新的 Generation 对象,表示进入下一个新的周期。
// java.util.concurrent.CyclicBarrier#nextGeneration
private void nextGeneration() {
// #1 唤醒所有阻塞在 await() 方法的线程
trip.signalAll();
// #2 创建一个新的周期
count = parties;
generation = new Generation();
}
打破栅栏:breakBarrier
- 仅修改当前 Generation 对象的 broken 变量为 true,并不会创建一个新的 Generation 对象。
- 重置 count。
唤醒所有阻塞在
await()
方法调用的线程private void breakBarrier() { // #1 将当前周期的Generation对象设置为true generation.broken = true; // #2 重置 count 为初始值 parties count = parties; // #3 唤醒所有阻塞在 await() 方法调用的线程 trip.signalAll(); }
线程等待:await
// java.util.concurrent.CyclicBarrier#await() /** *不带超时机制的等待,实际上是调用 dowait() 方法 */ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
dowait
// java.util.concurrent.CyclicBarrier#dowait private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // #1 首先获取锁 lock.lock(); try { final Generation g = generation; // #2 检查栅栏是否被打破,如果被打破,抛出异常 if (g.broken) throw new BrokenBarrierException(); // #3 检查当前线程中断标志位,如果线程被中断,打破栅栏,并抛出异常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // #4 index 是这个方法的返回值,它的值是 count-- 得到的 int index = --count; // #5 如果等于0,说明所有线程已到达栅栏,可以打开栅栏 if (index == 0) { boolean ranAction = false; try { // #6 这里会执行构造参数传入的command,由最后一个线程先执行 // 因为这个线程没有被阻塞,所以可以快速执行完,免去唤醒动作 final Runnable command = barrierCommand; if (command != null) command.run(); // #7 command 可能出现异常,如果 ranAction 为 false 说明发生了异常 ranAction = true; // #8 唤醒所有线程,并创建一个新的 Generation 对象 nextGeneration(); // #9 返回0 return 0; } finally { // #8 如果执行 command 过程中出现异常,那么需要打破栅栏 if (!ranAction) breakBarrier(); } } // #10 死循环,线程执行到这里,意味着不是当前线程并非最后一个线程到达 // 所以要么阻塞等待,要么被中断、要么超时退出 for (;;) { try { // #11 如果有超时机制,调用带超时的Condition的await方法进行等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // #12 在调用 await() 期间被中断了 // Generation并没有重新创建且参数broken为false if (g == generation && ! g.broken) { // #13 撕破栅栏 breakBarrier(); // #14 抛出异常给上层调用的方法 throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. // #15 到这里,g != generation,说明已经生成了一个新的周期,即最后一个 // 线程调用了nextGeneration()方法。此刻就没必要抛出异常,记录下来这个中断信息 // 执行下一轮时会再判断该线程中断标志位,并做合理处理 Thread.currentThread().interrupt(); } } // #16 线程被唤醒,检查栅栏是否被打破 // 主要是在执行 command 过程中出现异常,它会调用 breakBarrier() // 方法打破栅栏:它不会创建一个新的Generation对象,仅重置count和修改旧的Generation#broken 为true // 然后唤醒所有线程。线程被唤醒后,执行到这里,然后抛出异常 if (g.broken) // #17 如果被打破,抛出异常 throw new BrokenBarrierException(); // #17 这里属于正常退出逻辑。 // 在 for(;;) 循环里被唤醒的线程,除了被中断或超时唤醒外,正常情况是最后一个线程 // 唤醒所有阻塞在 await() 方法调用的线程,并创建一个新的周期Generation对象 // 所以正常情况下,这里是不相等的。 if (g != generation) return index; // #18 这个 if 语句是对超时唤醒的。 // 当线程由于超时而被唤醒后,执行到这里,打破栅栏,抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // #19 释放锁 lock.unlock(); } }
获取多少线程已到达栅栏
首先获得锁。
再用
parties - count
就得到线程到达数量。public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
重置栅栏
栅栏是可以手动重置的:
先打破栅栏:① 所有阻塞于
await()
方法调用的线程都会被唤醒。② 抛出BrokerBarrierException
并返回。- 再生成一个新的 Generation 对象:① 重置 count 和 Generation 对象。
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }