引言

这篇文章,我们来看CyclicBarrier。CyclicBarrier可以让多个线程都运行到同一点,在所有的线程都到了该点之后再继续执行。CyclicBarrier内部的实现用到了Lock和Condition。

一个例子

  1. public class CyclicBarrierTest {
  2. public static void main(String[] args) throws InterruptedException {
  3. CyclicBarrier cyclicBarrier = new CyclicBarrier(8, new Runnable() {
  4. @Override
  5. public void run() {
  6. System.out.println("线程"+Thread.currentThread().getName()+"最后完成任务");
  7. }
  8. });
  9. for(int i=0;i<8;i++){
  10. Thread thread = new Thread(new Task(cyclicBarrier),"thread_"+i);
  11. thread.start();
  12. }
  13. }
  14. static class Task implements Runnable{
  15. public Task(CyclicBarrier cyclicBarrier) {
  16. this.cyclicBarrier = cyclicBarrier;
  17. }
  18. private CyclicBarrier cyclicBarrier;
  19. @Override
  20. public void run() {
  21. try {
  22. Thread.sleep(1000);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. System.out.println("线程"+Thread.currentThread().getName()+"到达栅栏A");
  27. try {
  28. cyclicBarrier.await();
  29. } catch (InterruptedException | BrokenBarrierException e) {
  30. e.printStackTrace();
  31. }
  32. System.out.println("线程"+Thread.currentThread().getName()+"离开栅栏A");
  33. try {
  34. Thread.sleep(1000);
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. System.out.println("线程"+Thread.currentThread().getName()+"到达栅栏B");
  39. try {
  40. cyclicBarrier.await();
  41. } catch (InterruptedException | BrokenBarrierException e) {
  42. e.printStackTrace();
  43. }
  44. System.out.println("线程"+Thread.currentThread().getName()+"离开栅栏B");
  45. }
  46. }
  47. }

在这个例子中,cyclicBarrier设置了两个栅栏,所有的线程先到达栅栏A,然后再到达栅栏B。运行结果如下:

  1. 线程thread_4到达栅栏A
  2. 线程thread_1到达栅栏A
  3. 线程thread_3到达栅栏A
  4. 线程thread_5到达栅栏A
  5. 线程thread_2到达栅栏A
  6. 线程thread_7到达栅栏A
  7. 线程thread_6到达栅栏A
  8. 线程thread_0到达栅栏A
  9. 线程thread_0最后完成任务
  10. 线程thread_0离开栅栏A
  11. 线程thread_4离开栅栏A
  12. 线程thread_3离开栅栏A
  13. 线程thread_1离开栅栏A
  14. 线程thread_5离开栅栏A
  15. 线程thread_7离开栅栏A
  16. 线程thread_6离开栅栏A
  17. 线程thread_2离开栅栏A
  18. 线程thread_5到达栅栏B
  19. 线程thread_4到达栅栏B
  20. 线程thread_7到达栅栏B
  21. 线程thread_0到达栅栏B
  22. 线程thread_3到达栅栏B
  23. 线程thread_6到达栅栏B
  24. 线程thread_2到达栅栏B
  25. 线程thread_1到达栅栏B
  26. 线程thread_1最后完成任务
  27. 线程thread_1离开栅栏B
  28. 线程thread_5离开栅栏B
  29. 线程thread_0离开栅栏B
  30. 线程thread_7离开栅栏B
  31. 线程thread_4离开栅栏B
  32. 线程thread_2离开栅栏B
  33. 线程thread_6离开栅栏B
  34. 线程thread_3离开栅栏B

实现分析

构造方法

  1. public CyclicBarrier(int parties, Runnable barrierAction) {
  2. if (parties <= 0) throw new IllegalArgumentException();
  3. this.parties = parties;
  4. this.count = parties;
  5. this.barrierCommand = barrierAction;
  6. }

这个是我们上面例子中用到的构造方法,它初始化了CyclicBarrier中几个重要的字段,这里我先列出来CyclicBarrier所有的字段:

  1. /** The lock for guarding barrier entry */
  2. private final ReentrantLock lock = new ReentrantLock();
  3. /** Condition to wait on until tripped */
  4. private final Condition trip = lock.newCondition();
  5. /** The number of parties */
  6. private final int parties;
  7. /* The command to run when tripped */
  8. private final Runnable barrierCommand;
  9. /** The current generation */
  10. private Generation generation = new Generation();
  11. /**
  12. * Number of parties still waiting. Counts down from parties to 0
  13. * on each generation. It is reset to parties on each new
  14. * generation or when broken.
  15. */
  16. private int count;

前两个是重入锁和Condition,它俩用来实现线程等待和唤醒的逻辑,这个我们稍后会看到。parties可以认为是等待的线程继续执行前需要执行await方法的次数,对于同一个CyclicBarrier,它设置的每个屏障的parties都是一样的。count是在一个屏障中,仍然处于等待状态的数量,所以初始化CyclicBarrier时,count=parties,但是之后count会发生变化而parties不会。barrierCommand用来指定最后到达屏障的线程要执行的命令。generation用来分代,因为一个CyclicBarrier可以重复使用,每次使用都认为是新的一代。

await方法

await方法的实现如下:

  1. public int await() throws InterruptedException, BrokenBarrierException {
  2. try {
  3. return dowait(false, 0L);
  4. } catch (TimeoutException toe) {
  5. throw new Error(toe); // cannot happen
  6. }
  7. }

看dowait:

  1. private int dowait(boolean timed, long nanos)
  2. throws InterruptedException, BrokenBarrierException,
  3. TimeoutException {
  4. final ReentrantLock lock = this.lock;
  5. //加锁
  6. lock.lock();
  7. try {
  8. final Generation g = generation;
  9. if (g.broken)
  10. throw new BrokenBarrierException();
  11. if (Thread.interrupted()) {
  12. breakBarrier();
  13. throw new InterruptedException();
  14. }
  15. int index = --count;
  16. //index=0说明最后一个线程执行await方法
  17. if (index == 0) { // tripped
  18. boolean ranAction = false;
  19. try {
  20. final Runnable command = barrierCommand;
  21. //如果command不为空 执行
  22. if (command != null)
  23. command.run();
  24. ranAction = true;
  25. //新的分代 也就是重新设置屏障 nextGeneration方法中会执行signalAll方法唤醒所有等待的线程
  26. nextGeneration();
  27. return 0;
  28. } finally {
  29. if (!ranAction)
  30. breakBarrier();
  31. }
  32. }
  33. //如果count不等于0 说明当前线程不是最后执行await方法的线程
  34. // loop until tripped, broken, interrupted, or timed out
  35. for (;;) {
  36. try {
  37. if (!timed)
  38. //等待
  39. trip.await();
  40. else if (nanos > 0L)
  41. nanos = trip.awaitNanos(nanos);
  42. } catch (InterruptedException ie) {
  43. if (g == generation && ! g.broken) {
  44. breakBarrier();
  45. throw ie;
  46. } else {
  47. // We're about to finish waiting even if we had not
  48. // been interrupted, so this interrupt is deemed to
  49. // "belong" to subsequent execution.
  50. Thread.currentThread().interrupt();
  51. }
  52. }
  53. if (g.broken)
  54. throw new BrokenBarrierException();
  55. if (g != generation)
  56. return index;
  57. if (timed && nanos <= 0L) {
  58. breakBarrier();
  59. throw new TimeoutException();
  60. }
  61. }
  62. } finally {
  63. lock.unlock();
  64. }
  65. }

首先,它会使用lock进行加锁,也就是await方法是互斥执行的。然后判断当前线程是否是最后执行await方法的线程,如果是,它会首先执行构造方法中传入的Runnable,然后调用nextGeneration方法,该方法很关键:

  1. private void nextGeneration() {
  2. // signal completion of last generation
  3. trip.signalAll();
  4. // set up next generation
  5. count = parties;
  6. generation = new Generation();
  7. }

首先,它会使用Condition的signalAll来唤醒所有等待的线程,然后重置count。
如果当前线程不是最后执行await的线程,它就会调用Condition的await方法来等待。
所以基于lock、condition,await实现了最后一个线程到达屏障点,然后唤醒所有之前等待的线程的逻辑。
CyclicBarrier还提供了其他很有用的方法,这里不再一一介绍。

小结

CyclicBarrier能够实现CountDownLatch的功能,同时也提供了CountDownLatch没有的功能,例如多次设置屏障等,并且它俩的实现原理有很大差别,CountDownLatch基于AQS共享模式,CyclicBarrier基于Lock和Condition。根据场景的不同,我们需要合理的选择这两个工具类来达到目的。