什么是CyclicBarrier?

栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

CyclicBarrier字面意思是“可重复使用的栅栏”,CyclicBarrier 相比 CountDownLatch 来说,要简单很多,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。

CyclicBarrier 和 CountDownLatch 确实有一定的相似性,它们都能阻塞一个或者一组线程,直到某种预定的条件达到之后,这些之前在等待的线程才会统一出发,继续向下执行。正因为它们有这个相似点,你可能会认为它们的作用是完全一样的,其实并不是。

CyclicBarrier 可以构造出一个集结点,当某一个线程执行 await() 的时候,它就会到这个集结点开始等待,等待这个栅栏被撤销。直到预定数量的线程都到了这个集结点之后,这个栅栏就会被撤销,之前等待的线程就在此刻统一出发,继续去执行剩下的任务。

在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。这就是实现一组线程相互等待的原理,下面我们先看看CyclicBarrier有哪些成员变量。

官方释义:

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released __

  • > @since > _1.5 _> *

API解读:

image.png

  1. //防护栅栏入口的锁,同步操作锁
  2. private final ReentrantLock lock = new ReentrantLock();
  3. //等待直到跳闸的条件,线程拦截器
  4. private final Condition trip = lock.newCondition();
  5. //所有线程数量,每次拦截的线程数
  6. private final int parties;
  7. //换闸换代前执行的任务
  8. private final Runnable barrierCommand;
  9. //表示栅栏的当前代
  10. private Generation generation = new Generation();
  11. //计数器
  12. private int count;
  13. //静态内部类Generation
  14. /**
  15. * 屏障的每次使用都表示为一个生成实例。每当隔离栅被触发或重置时,生成都会更改。
  16. * 使用屏障可以与线程相关联的世代很多-由于非确定性方式可以将锁分配给等待的线程-但一次只能激活其中之一
  17. *({@code count}适用于那一代,当前计数) ),
  18. * 其余的全部损坏或绊倒。如果有中断但没有后续的重置,则不需要活跃的生成。
  19. */
  20. private static class Generation {
  21. boolean broken = false;
  22. }

方法

  • int await() 等待所有 parties已经在这个障碍上调用了 await 。

    1. public int await() throws InterruptedException, BrokenBarrierException {
    2. try {
    3. return dowait(false, 0L);
    4. } catch (TimeoutException toe) {
    5. throw new Error(toe); // cannot happen
    6. }
    7. }
  • int await(long timeout, TimeUnit unit) 等待所有 parties已经在此屏障上调用 await ,或指定的等待时间过去。

    1. public int await(long timeout, TimeUnit unit)
    2. throws InterruptedException,
    3. BrokenBarrierException,
    4. TimeoutException {
    5. return dowait(true, unit.toNanos(timeout));
    6. }
  • int getNumberWaiting() 返回目前正在等待障碍的各方的数量。

    1. public int getNumberWaiting() {
    2. final ReentrantLock lock = this.lock;
    3. lock.lock();
    4. try {
    5. return parties - count;
    6. } finally {
    7. lock.unlock();
    8. }
    9. }
  • int getParties() 返回旅行这个障碍所需的parties数量。

    1. public int getParties() {
    2. return parties;
    3. }
  • boolean isBroken() 查询这个障碍是否处于破碎状态。

    1. public boolean isBroken() {
    2. final ReentrantLock lock = this.lock;
    3. lock.lock();
    4. try {
    5. return generation.broken;
    6. } finally {
    7. lock.unlock();
    8. }
    9. }
  • void reset() 将屏障重置为初始状态。

    1. public void reset() {
    2. final ReentrantLock lock = this.lock;
    3. lock.lock();
    4. try {
    5. breakBarrier(); // break the current generation
    6. nextGeneration(); // start a new generation
    7. } finally {
    8. lock.unlock();
    9. }
    10. }
  • CyclicBarrier(int parties) 构造方法 指的是需要几个线程一起到达,才可以使所有线程取消等待

    1. public CyclicBarrier(int parties) {
    2. this(parties, null);
    3. }
  • CyclicBarrier(int parties, Runnable barrierAction) 构造方法 额外指定了一个参数,用于在所有线程达到屏障时,优先执行 barrierAction。 ```java /**

    • 创建一个新的 CyclicBarrier ,它将在给定数量的参与者(线程)等待它时将跳闸,
    • 并在屏障被跳闸时执行给定的屏障动作,由最后一个进入屏障的线程执行 */ public CyclicBarrier(int parties, Runnable barrierAction) {
      1. if (parties <= 0) throw new IllegalArgumentException();
      2. this.parties = parties;
      3. this.count = parties;
      4. this.barrierCommand = barrierAction;
      }
  1. - dowait()
  2. ```java
  3. private int dowait(boolean timed, long nanos)
  4. throws InterruptedException, BrokenBarrierException,
  5. TimeoutException {
  6. // 获取独占锁
  7. final ReentrantLock lock = this.lock;
  8. lock.lock();
  9. try {
  10. // 当前代
  11. final Generation g = generation;
  12. // 如果这代损坏了,抛出异常
  13. if (g.broken)
  14. throw new BrokenBarrierException();
  15. // 如果线程中断了,抛出异常
  16. if (Thread.interrupted()) {
  17. // 将损坏状态设置为true
  18. // 并通知其他阻塞在此栅栏上的线程
  19. breakBarrier();
  20. throw new InterruptedException();
  21. }
  22. // 获取下标
  23. int index = --count;
  24. // 如果是 0,说明最后一个线程调用了该方法
  25. if (index == 0) { // tripped
  26. boolean ranAction = false;
  27. try {
  28. final Runnable command = barrierCommand;
  29. // 执行栅栏任务
  30. if (command != null)
  31. command.run();
  32. ranAction = true;
  33. // 更新一代,将count重置,将generation重置
  34. // 唤醒之前等待的线程
  35. nextGeneration();
  36. return 0;
  37. } finally {
  38. // 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
  39. if (!ranAction)
  40. breakBarrier();
  41. }
  42. }
  43. // loop until tripped, broken, interrupted, or timed out
  44. for (;;) {
  45. try {
  46. // 如果没有时间限制,则直接等待,直到被唤醒
  47. if (!timed)
  48. trip.await();
  49. // 如果有时间限制,则等待指定时间
  50. else if (nanos > 0L)
  51. nanos = trip.awaitNanos(nanos);
  52. } catch (InterruptedException ie) {
  53. // 当前代没有损坏
  54. if (g == generation && ! g.broken) {
  55. // 让栅栏失效
  56. breakBarrier();
  57. throw ie;
  58. } else {
  59. // 上面条件不满足,说明这个线程不是这代的
  60. // 就不会影响当前这代栅栏的执行,所以,就打个中断标记
  61. Thread.currentThread().interrupt();
  62. }
  63. }
  64. // 当有任何一个线程中断了,就会调用breakBarrier方法
  65. // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
  66. if (g.broken)
  67. throw new BrokenBarrierException();
  68. // g != generation表示正常换代了,返回当前线程所在栅栏的下标
  69. // 如果 g == generation,说明还没有换代,那为什么会醒了?
  70. // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
  71. // 正是因为这个原因,才需要generation来保证正确。
  72. if (g != generation)
  73. return index;
  74. // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
  75. if (timed && nanos <= 0L) {
  76. breakBarrier();
  77. throw new TimeoutException();
  78. }
  79. }
  80. } finally {
  81. // 释放独占锁
  82. lock.unlock();
  83. }
  84. }
  • breakBarrier()将当前的障碍生成设置为已破坏并唤醒所有人。仅在保持锁定状态下调用。

    1. private void breakBarrier() {
    2. generation.broken = true;
    3. count = parties;
    4. trip.signalAll();
    5. }
  • nextGeneration()更新障碍旅行的状态并唤醒所有人。仅在锁定时调用

    1. private void nextGeneration() {
    2. // signal completion of last generation
    3. trip.signalAll();
    4. // set up next generation
    5. count = parties;
    6. generation = new Generation();
    7. }

代码示例:

场景模拟:众所周知,我们去做过山车或者大摆锤等一些娱乐游戏,都要等到一定的人数,设备才会启动,我们假设必须要5个人才会启动设备,即玩一次最少要5个人起步,代码如下:

  1. public static void main(String[] args) {
  2. CyclicBarrier cyclic = new CyclicBarrier(5);
  3. ExecutorService service = Executors.newFixedThreadPool(10);
  4. for (int i = 0; i < 10; i++) {
  5. int num = i + 1;
  6. Runnable runnable = () -> {
  7. try{
  8. Thread.sleep((long)(Math.random() * 10000));
  9. System.out.println("第"+num+"个人前来坐过山车,开始等待其他人");
  10. cyclic.await();
  11. System.out.println("第"+num+"开始坐过山车");
  12. }catch (Exception e) {
  13. e.printStackTrace();
  14. }
  15. };
  16. service.submit(runnable);
  17. }
  18. service.shutdown();
  19. }

执行结果:

第9个人前来坐过山车,开始等待其他人 第8个人前来坐过山车,开始等待其他人 第5个人前来坐过山车,开始等待其他人 第3个人前来坐过山车,开始等待其他人

第4个人前来坐过山车,开始等待其他人

第4开始坐过山车

第9开始坐过山车

第5开始坐过山车

第8开始坐过山车

第3开始坐过山车

第1个人前来坐过山车,开始等待其他人

第2个人前来坐过山车,开始等待其他人

第7个人前来坐过山车,开始等待其他人

第10个人前来坐过山车,开始等待其他人

第6个人前来坐过山车,开始等待其他人

第6开始坐过山车

第1开始坐过山车

第7开始坐过山车

第2开始坐过山车

第10开始坐过山车

image.png
image.png
此时到了这里,相信有的杠精就开始抬杠了,假如人多的时候,需要排队,不可能来一个进一个,其他人等待吧,应该是排队的时候,一次进去几个,进行通知一下,每次进去多少人,其让人有序排队;在这我们就可以使用另一个构造函数了,代码如下:

  1. public static void main(String[] args) {
  2. CyclicBarrier cyclic = new CyclicBarrier(5, () -> {
  3. try {
  4. System.out.println("一波完了,下一波进来吧!");
  5. } catch (Exception e) {
  6. e.printStackTrace();
  7. }
  8. });
  9. ExecutorService service = Executors.newFixedThreadPool(10);
  10. for (int i = 0; i < 10; i++) {
  11. int num = i + 1;
  12. Runnable runnable = () -> {
  13. try {
  14. Thread.sleep((long)(Math.random() * 10000));
  15. System.out.println("第"+num+"个人排队坐过山车,等待放行。。。。。。");
  16. cyclic.await();
  17. System.out.println("第"+num+"开始坐过山车");
  18. }catch (Exception e) {
  19. e.printStackTrace();
  20. }
  21. };
  22. service.submit(runnable);
  23. }
  24. service.shutdown();
  25. }

执行结果:

第3个人排队坐过山车,等待放行。。。。。。

第7个人排队坐过山车,等待放行。。。。。。

第4个人排队坐过山车,等待放行。。。。。。

第8个人排队坐过山车,等待放行。。。。。。

第9个人排队坐过山车,等待放行。。。。。。

一波完了,下一波进来吧!

第9开始坐过山车

第7开始坐过山车

第3开始坐过山车

第8开始坐过山车

第4开始坐过山车

第1个人排队坐过山车,等待放行。。。。。。

第10个人排队坐过山车,等待放行。。。。。。

第5个人排队坐过山车,等待放行。。。。。。

第6个人排队坐过山车,等待放行。。。。。。

第2个人排队坐过山车,等待放行。。。。。。

一波完了,下一波进来吧!

第2开始坐过山车

第1开始坐过山车

第5开始坐过山车

第10开始坐过山车

第6开始坐过山车

总结 CyclicBarrier 和 CountDownLatch 的异同:

  • 相同点:都能阻塞一个或一组线程,直到某个预设的条件达成发生,再统一出发。
  • 但是它们也有很多不同点,具体如下。
    • 作用对象不同:CyclicBarrier 要等固定数量的线程都到达了栅栏位置才能继续执行,而 CountDownLatch 只需等待数字倒数到 0,也就是说 CountDownLatch 作用于事件,但 CyclicBarrier 作用于线程;CountDownLatch 是在调用了 countDown 方法之后把数字倒数减 1,而 CyclicBarrier 是在某线程开始等待后把计数减 1。
    • 可重用性不同:CountDownLatch 在倒数到 0 并且触发门闩打开后,就不能再次使用了,除非新建一个新的实例;而 CyclicBarrier 可以重复使用。CyclicBarrier 还可以随时调用 reset 方法进行重置,如果重置时有线程已经调用了 await 方法并开始等待,那么这些线程则会抛出 BrokenBarrierException 异常。
    • 执行动作不同:CyclicBarrier 有执行动作 barrierAction,而 CountDownLatch 没这个功能。