一.使用

一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作。

  1. public class CyclicBarrier1 {
  2. public static void main(String[] args) {
  3. CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
  4. @SneakyThrows
  5. @Override
  6. public void run() {
  7. Thread.sleep(2000);
  8. System.out.println(Thread.currentThread().getName() + "-----------------------------");
  9. }
  10. });
  11. for (int i = 0; i < 5; i++) {
  12. new MyThread(cyclicBarrier).start();
  13. }
  14. }
  15. }
  16. class MyThread extends Thread {
  17. CyclicBarrier cyclicBarrier;
  18. public MyThread(CyclicBarrier cyclicBarrier) {
  19. this.cyclicBarrier = cyclicBarrier;
  20. }
  21. @SneakyThrows
  22. @Override
  23. public void run() {
  24. System.out.println(getName() + "await1");
  25. cyclicBarrier.await();
  26. System.out.println(getName() + "await2");
  27. cyclicBarrier.await();
  28. System.out.println(getName() + "end");
  29. }
  30. }
  31. 输出:
  32. Thread-0await1
  33. Thread-1await1
  34. Thread-2await1
  35. Thread-3await1
  36. Thread-4await1
  37. Thread-4-----------------------------
  38. Thread-4await2
  39. Thread-0await2
  40. Thread-2await2
  41. Thread-1await2
  42. Thread-3await2
  43. Thread-3-----------------------------
  44. Thread-3end
  45. Thread-4end
  46. Thread-0end
  47. Thread-1end
  48. Thread-2end

二.源码

1.构造方法

  1. private final ReentrantLock lock = new ReentrantLock();
  2. private final Condition trip = lock.newCondition();
  3. public CyclicBarrier(int parties, Runnable barrierAction) {
  4. // 每次拦截的线程数
  5. this.parties = parties;
  6. // 计数器
  7. this.count = parties;
  8. // 换代前执行的任务
  9. this.barrierCommand = barrierAction;
  10. }

2.await

  1. public int await() throws InterruptedException, BrokenBarrierException {
  2. return dowait(false, 0L);
  3. }
  4. private int dowait(boolean timed, long nanos)
  5. throws InterruptedException, BrokenBarrierException, TimeoutException {
  6. final ReentrantLock lock = this.lock;
  7. lock.lock();
  8. try {
  9. // 栅栏的当前代
  10. final Generation g = generation;
  11. int index = --count;
  12. // 最后1个线程到达
  13. if (index == 0) {
  14. final Runnable command = barrierCommand;
  15. if (command != null)
  16. command.run();
  17. //切换栅栏到下一代
  18. nextGeneration();
  19. return 0;
  20. }
  21. for (;;) {
  22. if (!timed)
  23. // 阻塞
  24. trip.await();
  25. if (g != generation)
  26. return index;
  27. }
  28. } finally {
  29. lock.unlock();
  30. }
  31. }
  32. private void nextGeneration() {
  33. // 唤醒所有线程
  34. trip.signalAll();
  35. count = parties;
  36. generation = new Generation();
  37. }