概述

Cyclic 意为循环的,Barrier 意为栅栏。CyclicBarrier 则意为周期性的栅栏。与 CountDownLatch 相比,对象是可以重复使用的。
底层通过组合 ReentrantLock 和 Condition 以实现周期性栅栏功能。
cyclicbarrier-2.png
CountDownLatch 基于 AQS 的共享模式实现,而 CyclicBarrier 基于 Condition 实现。基本流程如下:
cyclicbarrier 基本流程示意图.png

源码分析

内部类 Generation

  1. // java.util.concurrent.CyclicBarrier.Generation
  2. /**
  3. * CyclicBarrier 是可以重复使用的,我们把每次从开始使用到所有线程"穿过"栅栏看成一代,或一个周期
  4. *
  5. */
  6. private static class Generation {
  7. boolean broken = false;
  8. }

重要的变量

  1. // 栅栏入口的需要从这里获得锁
  2. private final ReentrantLock lock = new ReentrantLock();
  3. // 这是 CyclicBarrier 的条件,只有当所有线程到达栅栏这一条件满足后,才会打开栅栏,
  4. // 所有线程继续执行
  5. private final Condition trip = lock.newCondition();
  6. // 参与的线程数,由构造函数设置
  7. private final int parties;
  8. // 当打开栅栏后,由最后一个到达的线程执行里面的代码
  9. // 比如可以合并之前线程的工作等等
  10. private final Runnable barrierCommand;
  11. // 当前线程所处的"代"或周期
  12. private Generation generation = new Generation();
  13. // 还没有到达栅栏的线程数。即还需要等待多少个线程到达栅栏。
  14. private int count;

开启新的周期:nextGeneration

当最后一个线程到达栅栏时,调用这个方法唤醒其它线程,同时创建并初始化一个新的 Generation 对象,表示进入下一个新的周期。

  1. // java.util.concurrent.CyclicBarrier#nextGeneration
  2. private void nextGeneration() {
  3. // #1 唤醒所有阻塞在 await() 方法的线程
  4. trip.signalAll();
  5. // #2 创建一个新的周期
  6. count = parties;
  7. generation = new Generation();
  8. }

打破栅栏:breakBarrier

  1. 仅修改当前 Generation 对象的 broken 变量为 true,并不会创建一个新的 Generation 对象。
  2. 重置 count。
  3. 唤醒所有阻塞在 await() 方法调用的线程

    private void breakBarrier() {
     // #1 将当前周期的Generation对象设置为true
     generation.broken = true;
    
     // #2 重置 count 为初始值 parties
     count = parties;
    
     // #3 唤醒所有阻塞在 await() 方法调用的线程
     trip.signalAll();
    }
    

    线程等待:await

    // java.util.concurrent.CyclicBarrier#await()
    /**
    *不带超时机制的等待,实际上是调用 dowait() 方法
    */
    public int await() throws InterruptedException, BrokenBarrierException {
     try {
         return dowait(false, 0L);
     } catch (TimeoutException toe) {
         throw new Error(toe); // cannot happen
     }
    }
    

    dowait

    // java.util.concurrent.CyclicBarrier#dowait
    private int dowait(boolean timed, long nanos)
     throws InterruptedException, BrokenBarrierException,
    TimeoutException {
     final ReentrantLock lock = this.lock;
     // #1 首先获取锁
     lock.lock();
     try {
         final Generation g = generation;
    
         // #2 检查栅栏是否被打破,如果被打破,抛出异常
         if (g.broken)
             throw new BrokenBarrierException();
    
         // #3 检查当前线程中断标志位,如果线程被中断,打破栅栏,并抛出异常
         if (Thread.interrupted()) {
             breakBarrier();
             throw new InterruptedException();
         }
    
         // #4 index 是这个方法的返回值,它的值是 count-- 得到的
         int index = --count;
    
         // #5 如果等于0,说明所有线程已到达栅栏,可以打开栅栏
         if (index == 0) {
             boolean ranAction = false;
             try {
                 // #6 这里会执行构造参数传入的command,由最后一个线程先执行
                 // 因为这个线程没有被阻塞,所以可以快速执行完,免去唤醒动作
                 final Runnable command = barrierCommand;
                 if (command != null)
                     command.run();
    
                 // #7 command 可能出现异常,如果 ranAction 为 false 说明发生了异常
                 ranAction = true;
    
                 // #8 唤醒所有线程,并创建一个新的 Generation 对象
                 nextGeneration();
    
                 // #9 返回0
                 return 0;
             } finally {
                 // #8 如果执行 command 过程中出现异常,那么需要打破栅栏
                 if (!ranAction)
                     breakBarrier();
             }
         }
    
         // #10 死循环,线程执行到这里,意味着不是当前线程并非最后一个线程到达
         // 所以要么阻塞等待,要么被中断、要么超时退出
         for (;;) {
             try {
                 // #11 如果有超时机制,调用带超时的Condition的await方法进行等待
                 if (!timed)
                     trip.await();
                 else if (nanos > 0L)
                     nanos = trip.awaitNanos(nanos);
             } catch (InterruptedException ie) {
                 // #12 在调用 await() 期间被中断了
                 // Generation并没有重新创建且参数broken为false
                 if (g == generation && ! g.broken) {
                     // #13 撕破栅栏
                     breakBarrier();
                     // #14 抛出异常给上层调用的方法
                     throw ie;
                 } else {
                     // We're about to finish waiting even if we had not
                     // been interrupted, so this interrupt is deemed to
                     // "belong" to subsequent execution.
                     // #15 到这里,g != generation,说明已经生成了一个新的周期,即最后一个
                     // 线程调用了nextGeneration()方法。此刻就没必要抛出异常,记录下来这个中断信息
                     // 执行下一轮时会再判断该线程中断标志位,并做合理处理
                     Thread.currentThread().interrupt();
                 }
             }
    
             // #16 线程被唤醒,检查栅栏是否被打破
             // 主要是在执行 command 过程中出现异常,它会调用 breakBarrier()
             // 方法打破栅栏:它不会创建一个新的Generation对象,仅重置count和修改旧的Generation#broken 为true
             // 然后唤醒所有线程。线程被唤醒后,执行到这里,然后抛出异常
             if (g.broken)
                 // #17 如果被打破,抛出异常
                 throw new BrokenBarrierException();
    
             // #17 这里属于正常退出逻辑。
             // 在 for(;;) 循环里被唤醒的线程,除了被中断或超时唤醒外,正常情况是最后一个线程
             // 唤醒所有阻塞在 await() 方法调用的线程,并创建一个新的周期Generation对象
             // 所以正常情况下,这里是不相等的。
             if (g != generation)
                 return index;
    
             // #18 这个 if 语句是对超时唤醒的。
             // 当线程由于超时而被唤醒后,执行到这里,打破栅栏,抛出异常
             if (timed && nanos <= 0L) {
                 breakBarrier();
                 throw new TimeoutException();
             }
         }
     } finally {
         // #19 释放锁
         lock.unlock();
     }
    }
    

    另外,别忘记从 await() 方法返回时需要重新获得锁。

    获取多少线程已到达栅栏

  4. 首先获得锁。

  5. 再用 parties - count 就得到线程到达数量。

    public int getNumberWaiting() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         return parties - count;
     } finally {
         lock.unlock();
     }
    }
    

    重置栅栏

    栅栏是可以手动重置的:

  6. 先打破栅栏:① 所有阻塞于 await() 方法调用的线程都会被唤醒。② 抛出 BrokerBarrierException 并返回。

  7. 再生成一个新的 Generation 对象:① 重置 count 和 Generation 对象。
    public void reset() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         breakBarrier();   // break the current generation
         nextGeneration(); // start a new generation
     } finally {
         lock.unlock();
     }
    }