一、应用

与CountDownLatch类似,也是基于AQS,实现线程等待,达到指定线程数量后,线程被唤醒继续执行。
车站流水发车场景:
CyclicBarrier认为是一个车站,每辆车可乘坐人数为CyclicBarrier的阈值;把乘客认为是线程。
未达到阈值前,所有乘客在车中等待(线程阻塞)。
当线程达到阈值,则进行发车(即,唤醒线程继续执行)。然后阈值重置,重新等待到来线程达到阈值。

应用示例

  1. //统计工厂员工平均工作量
  2. static ThreadPoolExecutor executor = new ThreadPoolExecutor(
  3. 3,
  4. 5,
  5. 100,
  6. TimeUnit.SECONDS,
  7. new ArrayBlockingQueue<>(100),
  8. (ThreadFactory) Thread::new);
  9. private Map<String, Integer> workCount = new ConcurrentHashMap<>();
  10. public static void main(String[] args) {
  11. new CyclicBarrierDemo().count();
  12. }
  13. private CyclicBarrier cb = new CyclicBarrier(3, () -> {
  14. int result = 0;
  15. for (String key : workCount.keySet()) {
  16. result += workCount.get(key);
  17. }
  18. System.out.println("工厂工人平均工作量为:" + (result / 3) );
  19. });
  20. private void count() {
  21. for (int i = 0; i < 5; i++) {
  22. executor.execute(() -> {
  23. //统计工人工作量
  24. int count = (int) (Math.random() * 40 + 60);
  25. workCount.put(Thread.currentThread().getName(), count);
  26. System.out.println(Thread.currentThread().getName()
  27. + "工人工作量:" + count);
  28. try {
  29. //执行完运行await(),等待所有学生平均成绩都计算完毕
  30. cb.await();
  31. } catch (InterruptedException | BrokenBarrierException e) {
  32. e.printStackTrace();
  33. }
  34. });
  35. }
  36. executor.shutdown();
  37. }
  38. }
  39. Thread-2工人工作量:92
  40. Thread-0工人工作量:89
  41. Thread-1工人工作量:95
  42. 工厂工人平均工作量为:92

二、原理分析

await()方法

  1. /**
  2. *
  3. * 如果当前线程不是最后一个到达的线程,则出于线程调度目的,将其休眠,
  4. *直到发生以下情况之一:
  5. * 1.最后一个线程到达;(正常情况)
  6. * 2.其他线程中断当前线程;(当前线程抛出InterruptedException,并清除当前线程的中断状态;其他线程抛出BrokenBarrierException)
  7. * 3.另一个线程中断另一个等待的线程;
  8. * 4.其他线程在等待屏障时超时;(引发BrokenBarrierException)
  9. * 5.其他一些线程在此屏障上调用重置。
  10. **/
  11. public int await() throws InterruptedException, BrokenBarrierException {
  12. try {
  13. return dowait(false, 0L);
  14. } catch (TimeoutException toe) {
  15. throw new Error(toe); // cannot happen
  16. }
  17. }

dowait(false, 0L)核心逻辑

  1. private int dowait(boolean timed, long nanos)
  2. throws InterruptedException, BrokenBarrierException,
  3. TimeoutException {
  4. final ReentrantLock lock = this.lock;
  5. lock.lock();
  6. try {
  7. final Generation g = generation;
  8. //如果之前有线程被中断,则后续线程抛出BrokenBarrierException
  9. if (g.broken)
  10. throw new BrokenBarrierException();
  11. //检查当前线程是否被中断。中断则抛出InterruptedException。并使后续线程抛出BrokenBarrierException
  12. if (Thread.interrupted()) {
  13. breakBarrier();//【1.】设置标识,使得后续线程抛出BrokenBarrierException;【2.】并唤醒所有已阻塞的线程
  14. throw new InterruptedException();
  15. }
  16. //count记录阈值,每到一个线程则【自减一】
  17. int index = --count;
  18. //如果是最后一个线程,(如果有)则执行初始化时,设置的命令
  19. if (index == 0) { // tripped
  20. boolean ranAction = false;
  21. try {
  22. final Runnable command = barrierCommand;
  23. if (command != null)
  24. command.run();
  25. ranAction = true;
  26. //唤醒所有阻塞线程。开始“新朝代”循环
  27. nextGeneration();
  28. return 0;
  29. } finally {
  30. //处理未知异常
  31. if (!ranAction)
  32. breakBarrier();
  33. }
  34. }
  35. // loop until tripped, broken, interrupted, or timed out
  36. /**
  37. * 执行到此处的线程
  38. * 1.最后一个线程
  39. * 2.非最后一个(会被阻塞)
  40. **/
  41. for (;;) {
  42. try {
  43. if (!timed)
  44. trip.await();//不是最后一个线程,则会被阻塞
  45. else if (nanos > 0L)
  46. nanos = trip.awaitNanos(nanos);
  47. } catch (InterruptedException ie) {
  48. if (g == generation && ! g.broken) {
  49. breakBarrier();
  50. throw ie;
  51. } else {
  52. // We're about to finish waiting even if we had not
  53. // been interrupted, so this interrupt is deemed to
  54. // "belong" to subsequent execution.
  55. Thread.currentThread().interrupt();
  56. }
  57. }
  58. if (g.broken)
  59. throw new BrokenBarrierException();
  60. //执行到此处线程。1.最后一个线程。2.重新被唤醒的线程
  61. //最后一个线程会开始“新朝代”循环,则g != generation为true,跳出循环
  62. if (g != generation)
  63. return index;
  64. if (timed && nanos <= 0L) {
  65. breakBarrier();
  66. throw new TimeoutException();
  67. }
  68. }
  69. } finally {
  70. lock.unlock();
  71. }
  72. }

nextGeneration()方法

  1. /**
  2. * 唤醒所有阻塞线程。开始“新朝代”循环
  3. * private final Condition trip = lock.newCondition();
  4. **/
  5. private void nextGeneration() {
  6. // signal completion of last generation
  7. trip.signalAll();
  8. // set up next generation
  9. count = parties;
  10. generation = new Generation();
  11. }

await()方法

AbstractQueuedSynchronizer。加入条件队列被阻塞

  1. /**
  2. * 1.将当前节点添加到条件队列中。
  3. * 2.并释放已经获取的锁。(state变量归零,并唤醒因争抢锁被阻塞的线程)。
  4. * 3.调用park方法,阻塞当前线程
  5. **/
  6. public final void await() throws InterruptedException {
  7. if (Thread.interrupted())
  8. throw new InterruptedException();
  9. //添加的条件队列中(条件队列要素:firstWaiter、lastWaiter,条件队列中waitState=-2)
  10. Node node = addConditionWaiter();
  11. //释放已经持有的锁,并唤醒因争抢锁被阻塞的线程
  12. int savedState = fullyRelease(node);
  13. int interruptMode = 0;
  14. //如果不在等待队列中,则进行阻塞
  15. while (!isOnSyncQueue(node)) {
  16. LockSupport.park(this);
  17. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  18. break;
  19. }
  20. //线程被唤醒后的逻辑。重新获取锁(独占锁逻辑)
  21. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  22. interruptMode = REINTERRUPT;
  23. if (node.nextWaiter != null) // clean up if cancelled
  24. unlinkCancelledWaiters();
  25. if (interruptMode != 0)
  26. reportInterruptAfterWait(interruptMode);
  27. }

signalAll()方法

AbstractQueuedSynchronizer。从条件队列中被唤醒

  1. /**
  2. *
  3. * 1.将条件队列中元素,转移到等待队列中
  4. * 2.并将条件队列中的waitstate。-2修改成0,最后修改成-1
  5. **/
  6. public final void signalAll() {
  7. //如果当前线程,不是持有锁的线程,则抛出异常
  8. if (!isHeldExclusively())
  9. throw new IllegalMonitorStateException();
  10. Node first = firstWaiter;
  11. if (first != null)
  12. doSignalAll(first);
  13. }
  14. //【循环】条件队列,转移到等待队列中
  15. private void doSignalAll(Node first) {
  16. lastWaiter = firstWaiter = null;
  17. do {
  18. Node next = first.nextWaiter;
  19. first.nextWaiter = null;
  20. transferForSignal(first);
  21. first = next;
  22. } while (first != null);
  23. }
  24. final boolean transferForSignal(Node node) {
  25. /*
  26. * 将-2修改为0
  27. */
  28. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  29. return false;
  30. /*
  31. * 1.将节点插入等待队列尾部
  32. * 2.将0修改为-1(等待当前线程释放锁,唤醒等待队列中元素)
  33. *
  34. * 异常情况下,才立即唤醒
  35. */
  36. Node p = enq(node);
  37. int ws = p.waitStatus;
  38. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  39. LockSupport.unpark(node.thread);
  40. return true;
  41. }