Java 高并发 CyclicBarrier

CyclicBarrier

CyclicBarrier是什么?把它拆开来翻译就是循环(Cycle)和屏障(Barrier)。它的主要作用其实和CountDownLanch差不多,都是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障会被打开,所有被屏障阻塞的线程才会继续执行,不过它是可以循环执行的,这是它与CountDownLanch最大的不同。CountDownLanch是只有当最后一个线程把计数器置为0的时候,其他阻塞的线程才会继续执行。
首先先来看下关于使用CyclicBarrier的一个demo:比如游戏中有个关卡的时候,每次进入下一关的时候都需要进行加载一些地图、特效背景音乐什么的只有全部加载完了才能够进行游戏:

  1. public class CyclicBarrierExample {
  2. static class PreTaskThread implements Runnable {
  3. private String task;
  4. private CyclicBarrier cyclicBarrier;
  5. public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
  6. this.task = task;
  7. this.cyclicBarrier = cyclicBarrier;
  8. }
  9. @Override
  10. public void run() {
  11. for (int i = 0; i < 4; i++) {
  12. Random random = new Random();
  13. try {
  14. Thread.sleep(random.nextInt(1000));
  15. System.out.println(String.format("关卡 %d 的任务 %s 完成", i, task));
  16. cyclicBarrier.await();
  17. } catch (InterruptedException | BrokenBarrierException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. public static void main(String[] args) {
  23. CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
  24. System.out.println("本关卡所有的前置任务完成,开始游戏... ...");
  25. });
  26. new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
  27. new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
  28. new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
  29. }
  30. }
  31. }

输出结果如下:
Java高并发编程基础三大利器之CyclicBarrier - 图1
可以看到每次游戏开始都会等当前关卡把游戏的人物模型,地图数据、背景音乐加载完成后才会开始进行游戏。并且还是可以循环控制的。

源码分析

结构组成

  1. /** The lock for guarding barrier entry */
  2. private final ReentrantLock lock = new ReentrantLock();
  3. /** Condition to wait on until tripped */
  4. private final Condition trip = lock.newCondition();
  5. /** The number of parties */
  6. private final int parties;
  7. /* The command to run when tripped */
  8. private final Runnable barrierCommand;
  9. /** The current generation */
  10. private Generation generation = new Generation();
  • lock:用于保护屏障入口的锁
  • trip:达到屏障并且不能放行的线程在trip条件变量上等待
  • parties:栅栏开启需要的到达线程总数
  • barrierCommand:最后一个线程到达屏障后执行的回调任务
  • generation:这是一个内部类,通过它实现CyclicBarrier重复利用,每当await达到最大次数的时候,就会重新new 一个,表示进入了下一个轮回。里面只有一个boolean型属性,用来表示当前轮回是否有线程中断。

    主要方法

    await方法

    1. public int await() throws InterruptedException, BrokenBarrierException {
    2. try {
    3. return dowait(false, 0L);
    4. } catch (TimeoutException toe) {
    5. throw new Error(toe); // cannot happen
    6. }
    7. }
    8. /**
    9. * Main barrier code, covering the various policies.
    10. */
    11. private int dowait(boolean timed, long nanos)
    12. throws InterruptedException, BrokenBarrierException, TimeoutException {
    13. final ReentrantLock lock = this.lock;
    14. lock.lock();
    15. try {
    16. //获取barrier当前的 “代”也就是当前循环
    17. final Generation g = generation;
    18. if (g.broken)
    19. throw new BrokenBarrierException();
    20. if (Thread.interrupted()) {
    21. breakBarrier();
    22. throw new InterruptedException();
    23. }
    24. // 每来一个线程调用await方法都会进行减1
    25. int index = --count;
    26. if (index == 0) { // tripped
    27. boolean ranAction = false;
    28. try {
    29. final Runnable command = barrierCommand;
    30. // new CyclicBarrier 传入 的barrierCommand, command.run()这个方法是同步的,如果耗时比较多的话,是否执行的时候需要考虑下是否异步来执行。
    31. if (command != null)
    32. command.run();
    33. ranAction = true;
    34. // 这个方法1. 唤醒所有阻塞的线程,2. 重置下count(count 每来一个线程都会进行减1)和generation,以便于下次循环。
    35. nextGeneration();
    36. return 0;
    37. } finally {
    38. if (!ranAction)
    39. breakBarrier();
    40. }
    41. }
    42. // loop until tripped, broken, interrupted, or timed out
    43. for (;;) {
    44. try {
    45. // 进入if条件,说明是不带超时的await
    46. if (!timed)
    47. // 当前线程会释放掉lock,然后进入到trip条件队列的尾部,然后挂起自己,等待被唤醒。
    48. trip.await();
    49. else if (nanos > 0L)
    50. //说明当前线程调用await方法时 是指定了 超时时间的!
    51. nanos = trip.awaitNanos(nanos);
    52. } catch (InterruptedException ie) {
    53. //Node节点在 条件队列内 时 收到中断信号时 会抛出中断异常!
    54. //g == generation 成立,说明当前代并没有变化。
    55. //! g.broken 当前代如果没有被打破,那么当前线程就去打破,并且抛出异常..
    56. if (g == generation && ! g.broken) {
    57. breakBarrier();
    58. throw ie;
    59. } else {
    60. // We're about to finish waiting even if we had not
    61. // been interrupted, so this interrupt is deemed to
    62. // "belong" to subsequent execution.
    63. //执行到else有几种情况?
    64. //1.代发生了变化,这个时候就不需要抛出中断异常了,因为 代已经更新了,这里唤醒后就走正常逻辑了..只不过设置下 中断标记。
    65. //2.代没有发生变化,但是代被打破了,此时也不用返回中断异常,执行到下面的时候会抛出 brokenBarrier异常。也记录下中断标记位。
    66. Thread.currentThread().interrupt();
    67. }
    68. }
    69. //唤醒后,执行到这里,有几种情况?
    70. //1.正常情况,当前barrier开启了新的一代(trip.signalAll())
    71. //2.当前Generation被打破,此时也会唤醒所有在trip上挂起的线程
    72. //3.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
    73. if (g.broken)
    74. throw new BrokenBarrierException();
    75. //唤醒后,执行到这里,有几种情况?
    76. //1.正常情况,当前barrier开启了新的一代(trip.signalAll())
    77. //2.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
    78. if (g != generation)
    79. return index;
    80. //唤醒后,执行到这里,有几种情况?
    81. //.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。
    82. if (timed && nanos <= 0L) {
    83. breakBarrier();
    84. throw new TimeoutException();
    85. }
    86. }
    87. } finally {
    88. lock.unlock();
    89. }
    90. }

    小结

    到了这里可以知道为啥CyclicBarrier可以进行循环计数?

  • CyclicBarrier采用一个内部类Generation来维护当前循环,每一个await方法都会存储当前的generation,获取到相同generation对象的属于同一组,每当count的次数耗尽就会重新new一个Generation并且重新设置count的值为parties,表示进入下一次新的循环。

从这个await方法可以知道只要有一个线程被中断了,当代的 generationbroken 就会被设置为true,所以会导致其他的线程也会被抛出BrokenBarrierException。相当于一个失败其他也必须失败,感觉有“强一致性“的味道。

总结

  • CountDownLanch是为计数器是设置一个值,当多次执行countdown后,计数器减为0的时候所有线程被唤醒,然后CountDownLanch失效,只能够使用一次。
  • CyclicBarrier是当count0时同样唤醒全部线程,同时会重新设置countparties,重新new一个generation来实现重复利用。