一,使用

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续工作。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 当前线程已经到达了屏障,然后当前线程被阻塞。

  1. public class CyclicBarrierTest01 {
  2. /**
  3. * 案例:
  4. * 模拟过气游戏 “王者荣耀” 游戏开始逻辑
  5. */
  6. public static void main(String[] args) {
  7. //第一步:定义玩家,定义5个
  8. String[] heros = {"安琪拉","亚瑟","马超","张飞", "刘备"};
  9. //第二步:创建固定线程数量的线程池,线程数量为5
  10. ExecutorService service = Executors.newFixedThreadPool(5);
  11. //第三步:创建barrier,parties 设置为5
  12. CyclicBarrier barrier = new CyclicBarrier(5);
  13. //第四步:通过for循环开启5任务,模拟开始游戏,传递给每个任务 英雄名称和barrier
  14. for(int i = 0; i < 5; i++) {
  15. service.execute(new Player(heros[i], barrier));
  16. }
  17. service.shutdown();
  18. }
  19. static class Player implements Runnable {
  20. private String hero;
  21. private CyclicBarrier barrier;
  22. public Player(String hero, CyclicBarrier barrier) {
  23. this.hero = hero;
  24. this.barrier = barrier;
  25. }
  26. @Override
  27. public void run() {
  28. try {
  29. //每个玩家加载进度不一样,这里使用随机数来模拟!
  30. TimeUnit.SECONDS.sleep(new Random().nextInt(10));
  31. System.out.println(hero + ":加载进度100%,等待其他玩家加载完成中...");
  32. barrier.await();
  33. System.out.println(hero + ":发现所有英雄加载完成,开始战斗吧!");
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. } catch (BrokenBarrierException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }
  41. }
  1. 对于指定计数值 parties,若由于某种原因,没有足够的线程调用 CyclicBarrier 的 await,则所有调用 await 的线程都会被阻塞;

  2. 同样的 CyclicBarrier 也可以调用 await(timeout, unit),设置超时时间,在设定时间内,如果没有足够线程到达,则解除阻塞状态,继续工作;

  3. 通过 reset 重置计数,会使得进入 await 的线程出现BrokenBarrierException;

  4. 如 果 采 用 是 CyclicBarrier(int parties, RunnablebarrierAction) 构造方法,执行 barrierAction 操作的是最后一个到达的线程。

二,源码

CyclicBarrier工作原理.png
CyclicBarrier工作原理02.png

2.1 成员变量 属性 内部类

  1. /**
  2. * 每个barrier都表示为一个generation实例。当barrier触发trip条件或重置时generation随之改变。
  3. * 使用barrier时有很多generation与线程关联-由于不确定性的方式,锁可能分配给等待的线程。
  4. * 但是在同一时间只有一个是活跃的generation(通过count变量确定),并且其余的要么被销毁,要么被trip条件等待。
  5. * 如果有一个中断,但没有随后的重置,就不需要有活跃的generation。
  6. */
  7. private static class Generation {
  8. boolean broken = false;
  9. }
  10. //因为barrier实现是依赖于Condition条件队列的,condition条件队列必须依赖lock才能使用。
  11. private final ReentrantLock lock = new ReentrantLock();
  12. //线程挂起实现使用的 condition 队列。条件:当前代所有线程到位,这个条件队列内的线程 才会被唤醒。
  13. private final Condition trip = lock.newCondition();
  14. //Barrier需要参与进来的线程数量
  15. private final int parties;
  16. //当前代 最后一个到位的线程 需要执行的事件
  17. private final Runnable barrierCommand;
  18. //表示barrier对象 当前 “代”
  19. private Generation generation = new Generation();
  20. //表示当前“代”还有多少个线程 未到位。
  21. //初始值为parties
  22. private int count;

2.2 构造器

  1. //构造函数,指定参与线程数,并在所有线程到达barrier之后执行给定的barrierAction逻辑
  2. public CyclicBarrier(int parties, Runnable barrierAction) {
  3. if (parties <= 0) throw new IllegalArgumentException();
  4. this.parties = parties;
  5. this.count = parties;
  6. this.barrierCommand = barrierAction;
  7. }
  8. //构造函数,指定参与线程数
  9. public CyclicBarrier(int parties) {
  10. this(parties, null);
  11. }

2.3 await

  1. //等待所有的参与者到达barrier
  2. public int await() throws InterruptedException, BrokenBarrierException {
  3. try {
  4. return dowait(false, 0L);
  5. } catch (TimeoutException toe) {
  6. throw new Error(toe); // cannot happen
  7. }
  8. }

2.4 dowait

  1. /**
  2. * Main barrier code, covering the various policies.
  3. * timed:表示当前调用await方法的线程是否指定了 超时时长,如果true 表示 线程是响应超时的
  4. * nanos:线程等待超时时长 纳秒,如果timed == false ,nanos == 0
  5. */
  6. private int dowait(boolean timed, long nanos)
  7. throws InterruptedException, BrokenBarrierException,
  8. TimeoutException {
  9. //获取barrier全局锁对象
  10. final ReentrantLock lock = this.lock;
  11. //加锁
  12. //为什么要加锁呢?
  13. //因为 barrier的挂起 和 唤醒 依赖的组件是 condition。
  14. lock.lock();
  15. try {
  16. //获取barrier当前的 “代”
  17. final Generation g = generation;
  18. //如果当前代是已经被打破状态,则当前调用await方法的线程,直接抛出Broken异常
  19. if (g.broken)
  20. throw new BrokenBarrierException();
  21. //如果当前线程的中断标记位 为 true,则打破当前代,然后当前线程抛出 中断异常
  22. if (Thread.interrupted()) {
  23. //1.设置当前代的状态为broken状态 2.唤醒在trip 条件队列内的线程
  24. breakBarrier();
  25. throw new InterruptedException();
  26. }
  27. //执行到这里,说明 当前线程中断状态是正常的 false, 当前代的broken为 false(未打破状态)
  28. //正常逻辑...
  29. //假设 parties 给的是 5,那么index对应的值为 4,3,2,1,0
  30. int index = --count;
  31. //条件成立:说明当前线程是最后一个到达barrier的线程,此时需要做什么呢?
  32. if (index == 0) { // tripped
  33. //标记:true表示 最后一个线程 执行cmd时未抛异常。 false,表示最后一个线程执行cmd时抛出异常了.
  34. //cmd就是创建 barrier对象时 指定的第二个 Runnable接口实现,这个可以为null
  35. boolean ranAction = false;
  36. try {
  37. final Runnable command = barrierCommand;
  38. //条件成立:说明创建barrier对象时 指定 Runnable接口了,这个时候最后一个到达的线程 就需要执行这个接口
  39. if (command != null)
  40. command.run();
  41. //command.run()未抛出异常的话,那么线程会执行到这里。
  42. ranAction = true;
  43. //开启新的一代
  44. //1.唤醒trip条件队列内挂起的线程,被唤醒的线程 会依次 获取到lock,然后依次退出await方法。
  45. //2.重置count 为 parties
  46. //3.创建一个新的generation对象,表示新的一代
  47. nextGeneration();
  48. //返回0,因为当前线程是此 代 最后一个到达的线程,所以Index == 0
  49. return 0;
  50. } finally {
  51. if (!ranAction)
  52. //如果command.run()执行抛出异常的话,会进入到这里。
  53. breakBarrier();
  54. }
  55. }
  56. //执行到这里,说明当前线程 并不是最后一个到达Barrier的线程..此时需要进入一个自旋中.
  57. // loop until tripped, broken, interrupted, or timed out
  58. //自旋,一直到 条件满足、当前代被打破、线程被中断,等待超时
  59. for (;;) {
  60. try {
  61. //条件成立:说明当前线程是不指定超时时间的
  62. if (!timed)
  63. //当前线程 会 释放掉lock,然后进入到trip条件队列的尾部,然后挂起自己,等待被唤醒。
  64. trip.await();
  65. else if (nanos > 0L)
  66. //说明当前线程调用await方法时 是指定了 超时时间的!
  67. nanos = trip.awaitNanos(nanos);
  68. } catch (InterruptedException ie) {
  69. //抛出中断异常,会进来这里。
  70. //什么时候会抛出InterruptedException异常呢?
  71. //Node节点在 条件队列内 时 收到中断信号时 会抛出中断异常!
  72. //条件一:g == generation 成立,说明当前代并没有变化。
  73. //条件二:! g.broken 当前代如果没有被打破,那么当前线程就去打破,并且抛出异常..
  74. if (g == generation && ! g.broken) {
  75. breakBarrier();
  76. throw ie;
  77. } else {
  78. //执行到else有几种情况?
  79. //1.代发生了变化,这个时候就不需要抛出中断异常了,因为 代已经更新了,这里唤醒后就走正常逻辑了..只不过设置下 中断标记。
  80. //2.代没有发生变化,但是代被打破了,此时也不用返回中断异常,执行到下面的时候会抛出 brokenBarrier异常。也记录下中断标记位。
  81. // We're about to finish waiting even if we had not
  82. // been interrupted, so this interrupt is deemed to
  83. // "belong" to subsequent execution.
  84. Thread.currentThread().interrupt();
  85. }
  86. }
  87. //唤醒后,执行到这里,有几种情况?
  88. //1.正常情况,当前barrier开启了新的一代(trip.signalAll())
  89. //2.当前Generation被打破,此时也会唤醒所有在trip上挂起的线程
  90. //3.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
  91. //条件成立:当前代已经被打破
  92. if (g.broken)
  93. //线程唤醒后依次抛出BrokenBarrier异常。
  94. throw new BrokenBarrierException();
  95. //唤醒后,执行到这里,有几种情况?
  96. //1.正常情况,当前barrier开启了新的一代(trip.signalAll())
  97. //3.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
  98. //条件成立:说明当前线程挂起期间,最后一个线程到位了,然后触发了开启新的一代的逻辑,此时唤醒trip条件队列内的线程。
  99. if (g != generation)
  100. //返回当前线程的index。
  101. return index;
  102. //唤醒后,执行到这里,有几种情况?
  103. //3.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
  104. if (timed && nanos <= 0L) {
  105. //打破barrier
  106. breakBarrier();
  107. //抛出超时异常.
  108. throw new TimeoutException();
  109. }
  110. }
  111. } finally {
  112. lock.unlock();
  113. }
  114. }

2.5 breakBarrier

打破barrier屏障,再屏障内的线程 都会抛出异常。

  1. private void breakBarrier() {
  2. //将代中的broken设置为true,表示这一代是被打破了的,再来到这一代的线程,直接抛出异常.
  3. generation.broken = true;
  4. //重置count为parties
  5. count = parties;
  6. //将在trip条件队列内挂起的线程 全部唤醒,唤醒后的线程 会检查当前代 是否是打破的,
  7. //如果是打破的话,接下来的逻辑和 开启下一代 唤醒的逻辑不一样.
  8. trip.signalAll();
  9. }

2.6 nextGeneration

开启下一代,当这一代所有的线程到位后(假设barrierCommand不为空,还需要最后一个线程执行完事件),会调用nextGeneration开启新的一代。

  1. private void nextGeneration() {
  2. //将在trip条件队列内挂起的线程 全部唤醒
  3. // signal completion of last generation
  4. trip.signalAll();
  5. //重置count为parties
  6. // set up next generation
  7. count = parties;
  8. //开启新的一代..使用一个新的 generation对象,表示新的一代,新的一代和上一代 没有任何关系。
  9. generation = new Generation();
  10. }