栅栏屏障,作用是让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。其实AQS里面另外一个工具 CountDownLantch 也可以实现该功能,只不过 CountDownLantch 的作用是一次性的,而 CyclicBarrier 可以重复起作用。

构造方法及成员属性

  1. //锁资源
  2. private final ReentrantLock lock = new ReentrantLock();
  3. //用于创建条件等待队列
  4. private final Condition trip = lock.newCondition();
  5. //每次拦截的线程数
  6. private final int parties;
  7. //线程计数器,当线程数达到指定数量后统一放行
  8. private int count;
  9. //代表栅栏的当前代,每次count变为0,代表重新开始新一代,重新拦截线程
  10. private Generation generation = new Generation();
  11. //每次换代前执行的任务
  12. private final Runnable barrierCommand;

CyclicBarrier 内部是通过条件队列 trip 来对线程进行阻塞的,并且其内部维护了两个int型的变量 parties 和 count,parties表示每次拦截的线程数,该值在构造时进行赋值。count是内部计数器,它的初始值和parties相同,以后随着每次 await() 方法的调用而减1,直到减为0就将所有线程唤醒。

CyclicBarrier 有一个静态内部类 Generation,该类的对象代表栅栏的当前代,就像玩游戏时代表的本局游戏,barrierCommand 表示换代前执行的任务,当 count 减为0时表示本局游戏结束,需要转到下一局。在转到下一局游戏之前会将所有阻塞的线程唤醒,在唤醒所有线程之前你可以通过指定barrierCommand来执行自己的任务

  1. public CyclicBarrier(int parties) {
  2. this(parties, null);
  3. }
  4. --------------------------------------
  5. public CyclicBarrier(int parties, Runnable barrierAction) {
  6. if (parties <= 0) throw new IllegalArgumentException();
  7. this.parties = parties;
  8. this.count = parties;
  9. this.barrierCommand = barrierAction;
  10. }

await() 等待

等待有两种方式,一种是定时等待,另一种是非定时等待

  1. public int await() throws InterruptedException, BrokenBarrierException {
  2. try {
  3. return dowait(false, 0L);
  4. } catch (TimeoutException toe) {
  5. throw new Error(toe); // cannot happen
  6. }
  7. }
  8. ---------------------------
  9. public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  10. return dowait(true, unit.toNanos(timeout));
  11. }

不管哪种方式最终都会走到 dowait() 方法,进行阻塞或放行

  1. private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
  2. final ReentrantLock lock = this.lock;
  3. //获取锁
  4. lock.lock();
  5. try {
  6. final Generation g = generation;
  7. //如果当前栅栏别标记为broken,直接抛异常
  8. if (g.broken)
  9. throw new BrokenBarrierException();
  10. //如果当前线程发生中断,则会打破栅栏
  11. if (Thread.interrupted()) {
  12. //打破栅栏,标记为broken,重置count,释放条件等待队列中的线程
  13. breakBarrier();
  14. throw new InterruptedException();
  15. }
  16. //count减1
  17. int index = --count;
  18. //如果count=0,说明拦截线程数以满足,开始放行
  19. if (index == 0) {
  20. boolean ranAction = false;
  21. try {
  22. final Runnable command = barrierCommand;
  23. //执行自定义的换代任务
  24. if (command != null)
  25. command.run();
  26. ranAction = true;
  27. //使条件等待队列中的节点都出队进入到CLH队列,重置count,开启下一轮拦截
  28. nextGeneration();
  29. return 0;
  30. } finally {
  31. if (!ranAction)
  32. breakBarrier();
  33. }
  34. }
  35. //count !=0,线程数还没到齐,走下面逻辑
  36. for (;;) {
  37. try {
  38. if (!timed)
  39. //调用 await()方法进入条件等待队列并进行阻塞
  40. trip.await();
  41. else if (nanos > 0L)
  42. //以超时的方式进入条件等待队列并进行阻塞
  43. nanos = trip.awaitNanos(nanos);
  44. } catch (InterruptedException ie) {
  45. if (g == generation && ! g.broken) {
  46. breakBarrier();
  47. throw ie;
  48. } else {
  49. Thread.currentThread().interrupt();
  50. }
  51. }
  52. if (g.broken)
  53. throw new BrokenBarrierException();
  54. if (g != generation)
  55. return index;
  56. //以超时的方式进入条件等待队列,如果是被超时唤醒,打破栅栏并报异常
  57. if (timed && nanos <= 0L) {
  58. breakBarrier();
  59. throw new TimeoutException();
  60. }
  61. }
  62. //释放锁
  63. } finally {
  64. lock.unlock();
  65. }
  66. }

阻塞等待线程

如果当前线程数未达到放行数量时,将count 减1,需要释放 lock 并且放入条件等待队列中进行阻塞
微信截图_20210830104022.png

通过调用 Condition对象的 await() 方法加入条件等待队列,主要干三件事

  1. 将当前线程加入条件等待队列
  2. 释放当前线程获得的锁,并唤醒CLH队头节点
  3. 阻塞当前线程

    1. public final void await() throws InterruptedException {
    2. if (Thread.interrupted())
    3. throw new InterruptedException();
    4. //创建节点并加入条件等待队列
    5. Node node = addConditionWaiter();
    6. //释放锁
    7. int savedState = fullyRelease(node);
    8. int interruptMode = 0;
    9. //如果当前节点不再CLH队列中
    10. while (!isOnSyncQueue(node)) {
    11. //将当前线程阻塞
    12. LockSupport.park(this);
    13. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    14. break;
    15. }
    16. //被唤醒后调用acquireQueued()继续尝试获取锁
    17. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    18. interruptMode = REINTERRUPT;
    19. if (node.nextWaiter != null) // clean up if cancelled
    20. unlinkCancelledWaiters();
    21. if (interruptMode != 0)
    22. reportInterruptAfterWait(interruptMode);
    23. }

    微信截图_20210830112336.png

栅栏放行

当线程达到放行数量时,即count=0,调动 nextGeneration(),将条件等待队列中的线程转移到CLH 队列中,并且重置 count,开启下一轮拦截

  1. private void nextGeneration() {
  2. //将条件等待队列中的线程放入到CLH队列中
  3. trip.signalAll();
  4. //重置count,开启下一轮拦截
  5. count = parties;
  6. generation = new Generation();
  7. }

将条件等待队列中的节点全部出队

  1. public final void signalAll() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. Node first = firstWaiter;
  5. if (first != null)
  6. //出队
  7. doSignalAll(first);
  8. }
  9. ----------------------------
  10. private void doSignalAll(Node first) {
  11. lastWaiter = firstWaiter = null;
  12. //通过while循环不断出队,知道头节点为null
  13. do {
  14. Node next = first.nextWaiter;
  15. first.nextWaiter = null;
  16. transferForSignal(first);
  17. first = next;
  18. } while (first != null);
  19. }
  20. ----------------------------
  21. final boolean transferForSignal(Node node) {
  22. //将节点的状态由CONDITION设置为0
  23. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  24. //设置失败直接return false
  25. return false;
  26. //将节点加入CLH队列,返回旧的tail节点
  27. Node p = enq(node);
  28. int ws = p.waitStatus;
  29. //如果节点时CANCELLED状态或者旧的tail节点waitStatus设置失败,则直接唤醒线程(做异常处理)
  30. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  31. LockSupport.unpark(node.thread);
  32. return true;
  33. }

最后一条到达的线程在 finally 中执行释放锁的操作,唤醒CLH队列中的所有节点。