一、CyclicBarrier介绍

字面意思回环栅栏(循环屏障),通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
CountDownLatch与CyclicBarrier的区别:
1)CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
2)CyclicBarrier还提供getNumberWaiting(可以获得CyclicBarrier阻塞的线程数量)、 isBroken(用来知道阻塞的线程是否被中断)等方法。
3)CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。
4)CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同。CountDownLatch一般用于一个或多个线程,等待其他线程执行完任务后,再执行。 CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
5)CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。
6)CyclicBarrier是通过ReentrantLock的”独占锁”和Conditon来实现一组线程的阻塞唤醒 的,而CountDownLatch则是通过AQS的“共享锁”实现。

二、CyclicBarrier的使用

CyclicBarrier的构造方法,参数parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经 到达了屏障,然后当前线程被阻塞。

  1. public CyclicBarrier(int parties) {
  2. this(parties, null);
  3. }

第二个参数,用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行) 。在这个构造方法中,指定的线程数量被赋给了两个变量,count 和 parties,其中parties 就是副本,用来重置count,来到达可以循环的使用屏障的目的,count就是真正的计数器。

  1. public CyclicBarrier(int parties, Runnable barrierAction) {
  2. if (parties <= 0) throw new IllegalArgumentException();
  3. this.parties = parties;
  4. this.count = parties;
  5. this.barrierCommand = barrierAction;
  6. }

CyclicBarrier在初始化时会创建一个ReentrantLock,并生成一个条件队列,因为条件队列的await方法(入队阻塞)必须要在ReentrantLock.lock() 获取锁之后才可以执行,因为基于MESA模型,进入条件队列调用await方法是会释放锁的,所以需要先进行获取锁。

  1. private final ReentrantLock lock = new ReentrantLock();
  2. private final Condition trip = lock.newCondition();

1、基本使用

当CyclicBarrier实例化时传入一个数,执行到屏障(await方法)的线程达到这个数的时候,这一批线程才会继续执行后续逻辑,当没有达到这个数字时,到达屏障(await方法)的线程会在这里阻塞。

  1. public static void main(String[] args) {
  2. CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
  3. for (int i = 0; i < 5; i++) {
  4. new Thread(new Runnable() {
  5. @Override
  6. public void run() {
  7. try {
  8. System.out.println(Thread.currentThread().getName()
  9. + "开始等待其他线程");
  10. cyclicBarrier.await();
  11. System.out.println(Thread.currentThread().getName() + "开始执行");
  12. //TODO 模拟业务处理
  13. Thread.sleep(5000);
  14. System.out.println(Thread.currentThread().getName() + "执行完毕");
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }).start();
  20. }
  21. }

2、barrierAction参数

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。 在创建CyclicBarrier对象时,可以传入一个参数指定数量和一个任务,当有指定数量的线程调用await方法后,会执行传入的任务,这些线程会同时继续执行后续逻辑。

  1. //保存每个学生的平均成绩
  2. private ConcurrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>();
  3. private ExecutorService threadPool= Executors.newFixedThreadPool(3);
  4. private CyclicBarrier cb=new CyclicBarrier(3,()->{
  5. int result=0;
  6. Set<String> set = map.keySet();
  7. for(String s:set){
  8. result+=map.get(s);
  9. }
  10. System.out.println("三人平均成绩为:"+(result/3)+"分");
  11. });
  12. public void count(){
  13. for(int i=0;i<3;i++){
  14. threadPool.execute(new Runnable(){
  15. @Override
  16. public void run() {
  17. //获取学生平均成绩
  18. int score=(int)(Math.random()*40+60);
  19. map.put(Thread.currentThread().getName(), score);
  20. System.out.println(Thread.currentThread().getName()
  21. +"同学的平均成绩为:"+score);
  22. try {
  23. //执行完运行await(),等待所有学生平均成绩都计算完毕
  24. cb.await();
  25. } catch (InterruptedException | BrokenBarrierException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. });
  30. }
  31. }
  32. public static void main(String[] args) {
  33. CyclicBarrierTest2 cb=new CyclicBarrierTest2();
  34. cb.count();
  35. }

代码中,创建CyclicBarrier时传入了数字3和一个任务的表达式,代表需要有3个线程到达屏障才会执行任务,开启三个线程,每个线程生成一个成绩,然后保存到map中,调用CyclicBarrier的await方法,在没有三个线程调用await方法前,之前的线程会在这里阻塞,当三个线程全部执行到await方法后,CyclicBarrier会调用传入的任务,然后所有线程继续执行后续逻辑。
如果开启6个线程,那么每当有3个线程调用await方法时会调用一次任务,并使这3个线程一起后续执行,第4个线程达到await方法时,会阻塞再次等待满足3个线程到达的条件。

三、CyclicBarrier源码分析

await()方法
CyclicBarrier.png