CountDownLatch

CountDownLatch 是一个同步辅助工具类,通过它可以完成类似于阻塞当前线程的功能,即允许一个或多个线程一直等待,直到其他线程执行完以后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

CountDownLatch 底层依赖 AQS,通过共享锁来实现 await 和 countDown 方法。在创建 CountDownLatch 实例时,需要传递一个 int 类型的参数 count,该参数为计数器的初始值,CountDownLatch 对该计数器的操作是原子操作,即同时只能有一个线程操作该计数器。计数器 count 是闭锁需要等待的线程数量,只能被设置一次,且 CountDownLatch 没有提供任何机制去重新设置计数器 count,如果需要重置则使用 CyclicBarrier

  1. public CountDownLatch(int count) {
  2. if (count < 0) throw new IllegalArgumentException("count < 0");
  3. // count即为AQS中state变量的初始值
  4. this.sync = new Sync(count);
  5. }

调用该类的 await 方法的线程会一直处于阻塞状态,直到其他线程调用 countDown 时计数器值变成 0,每次调用 countDown 方法时计数器的值会减 1,当计数器的值为 0 时所有因 await 方法而处于等待状态的线程就会继续执行。

1. 常用方法

  1. // 等待计数器的值为0,若计数器值为0则该方法返回
  2. // 若等待期间该线程被中断,则抛出InterruptedException并清除该线程的中断状态
  3. public void await() throws InterruptedException {
  4. sync.acquireSharedInterruptibly(1);
  5. }
  6. // 在指定的时间内等待计数器的值为0,若在指定时间内计数器的值仍未变为0则返回false
  7. // 若指定时间内计数器的值变为0之前当前线程被中断,则抛出InterruptedException并清除该线程的中断状态
  8. public boolean await(long timeout, TimeUnit unit)
  9. throws InterruptedException {
  10. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  11. }
  12. // 如果当前计数器的值大于1,则将其减 1
  13. // 若当前值为1,则将其置为0并唤醒所有通过await等待的线程;若当前值为0,则什么也不做直接返回
  14. public void countDown() {
  15. sync.releaseShared(1);
  16. }

2. 代码示例

  1. public class CountDownLunchDemo implements Runnable {
  2. // 初始化等待线程数
  3. private static final CountDownLatch countDownLatch = new CountDownLatch(5);
  4. @Override
  5. public void run() {
  6. try {
  7. int costTime = new Random().nextInt(10) * 1000;
  8. Thread.sleep(costTime);
  9. System.out.println("work done! cost:" + costTime);
  10. // 表示完成任务
  11. countDownLatch.countDown();
  12. } catch (InterruptedException e) {}
  13. }
  14. public static void main(String[] args) throws InterruptedException{
  15. final CountDownLunchDemo demo = new CountDownLunchDemo();
  16. ExecutorService pool = Executors.newFixedThreadPool(5);
  17. for (int i = 0; i < 5; i++) {
  18. pool.execute(demo);
  19. }
  20. countDownLatch.await();
  21. System.out.println("all work already done!");
  22. pool.shutdown();
  23. }
  24. }

CyclicBarrier

CyclicBarrier 和 CountDownLatch 非常类似,也是一个同步辅助工具类,CyclicBarrier 核心的概念在于设置一个等待线程的数量边界,到达了此边界之后再进行执行。它允许一组线程相互等待直到到达某个公共的屏障点,通过它可以完成多个线程之间相互等待时,只有当每个线程都准备就绪后才能各自继续执行后面的操作。

CyclicBarrier 也是通过计数器来实现,当某个线程调用 await 方法后就进入等待状态,计数器加一。当计数器的值达到了设置的初始值时等待状态的线程会被唤醒继续执行。通过调用 CyclicBarrier 对象的 await() 方法,两个线程可以实现互相等待。一旦 N 个线程在等待 CyclicBarrier 达成,所有线程将被释放掉去继续执行。由于 CyclicBarrier 在释放等待线程后可以重用,所以又称为循环栅栏。

CyclicBarrier 特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

  1. public CyclicBarrier(int parties) {
  2. this(parties, null);
  3. }
  4. public CyclicBarrier(int parties, Runnable barrierAction) {
  5. if (parties <= 0) throw new IllegalArgumentException();
  6. this.parties = parties;
  7. this.count = parties;
  8. this.barrierCommand = barrierAction;
  9. }
  • parties:表示拦截线程的数量。

  • barrierAction:为 CyclicBarrier 接收的 Runnable 命令,用于在线程到达屏障时,优先执行该 barrierAction 的逻辑,该操作由最后一个进入 barrier 的线程执行。

1. 常用方法

CyclicBarrier 并没有自己去实现 AQS 框架的 API,而是利用了 ReentrantLock 和 Condition 来实现。

  1. public int await() throws InterruptedException, BrokenBarrierException

await() 等方法用于等待其它参与方(线程)的到来。如果当前调用是最后一个调用,则唤醒所有其它的线程的等待,如果在构造 CyclicBarrier 时指定了 barrierAction,则当前线程会去执行该 barrierAction,然后该方法返回该线程调用 await 的次序(为 1 则表示该线程是第一个调用 await 的,为 0 则表示该线程是最后一个调用 await 的),接着该线程继续执行 await 后的代码。

如果该调用不是最后一个调用,则阻塞等待;如果等待过程中当前线程被中断则抛出 InterruptedException。大部分迫使线程等待的方法都可能会抛出这个异常,使得线程在等待时依然可以响应外部紧急事件。

如果等待过程中,其它等待的线程被中断,或其它线程等待超时,或者该 barrier 被 reset,或者当前线程在执行 barrier 构造时注册的 action 时因为抛出异常而失败,则抛出 BrokenBarrierException。表示当前的 CyclicBarrier 已经破损了。

  1. // 与await()唯一的不同点在于设置了等待超时时间,等待超时时会抛出TimeoutException
  2. public int await(long timeout, TimeUnit unit)
  3. throws InterruptedException,
  4. BrokenBarrierException,
  5. TimeoutException
  6. // 将barrier重置为它的初始状态,并使得所有对该barrier的await调用抛出BrokenBarrierException
  7. public void reset()

2. 代码示例

  1. public class CyclicBarrierDemo implements Runnable {
  2. public static class worker implements Runnable {
  3. private CyclicBarrier cyclicBarrier;
  4. public worker(CyclicBarrier barrier) {
  5. this.cyclicBarrier = barrier;
  6. }
  7. @Override
  8. public void run() {
  9. try {
  10. int costTime = new Random().nextInt(10) * 1000;
  11. Thread.sleep(costTime);
  12. System.out.println("work done! cost:" + costTime);
  13. cyclicBarrier.await();
  14. } catch (Exception e) {
  15. //
  16. }
  17. }
  18. }
  19. @Override
  20. public void run() {
  21. System.out.println("all work already done!");
  22. }
  23. public static void main(String[] args) {
  24. final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new CyclicBarrierDemo());
  25. final worker worker = new worker(cyclicBarrier);
  26. ExecutorService pool = Executors.newFixedThreadPool(5);
  27. for (int i = 0; i < 5; i++) {
  28. pool.execute(worker);
  29. }
  30. pool.shutdown();
  31. }
  32. }

3. 与 CountDownLatch 区别

CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。

此外,CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器可以使用 reset() 方法循环利用,且具备自动重置的功能,一旦计数器减到 0 会自动重置到设置的初始值。并且 CyclicBarrier 还可以设置回调函数,功能更加丰富。

正常情况下,CyclicBarrier 的重置都是自动发生的,如果我们调用 reset 方法时还有线程在等待,就会导致等待线程被打扰,抛出 BrokenBarrierException 异常。CyclicBarrier 侧重点是线程,而不是调用事件,它的典型应用场景是用来等待并发线程结束。

Semaphore

Semaphore 翻译为“信号量”,是一个控制同时访问共享资源的计数器。和 CountDownLatch 一样,其本质上是一个共享锁,底层也是基于 AQS 中的 state 字段来实现加锁和解锁操作。

Semaphore 通常用于限制同时访问某些资源的线程数量。比如出于系统性能的考虑需要限流,或者保证合理的使用公共资源。当有线程想要访问共享资源时,需要先 acquire 许可;如果许可不够了,线程需要一直等待,直到许可可用。当线程使用完共享资源后,可以归还 release 许可,以供其它需要的线程使用。实际上 Semaphore 只是对可用的数量进行管理维护。

  1. // permits为,默认使用非公平锁的实现
  2. public Semaphore(int permits)
  3. public Semaphore(int permits, boolean fair)
  • permits:初始化的许可证数量,即同时能申请多少个许可

  • fair:选择公平还是非公平的模式,Semaphore 默认使用非公平模式。公平模式就是调用 acquire 的顺序就是获取许可证的顺序,遵循 FIFO;而非公平模式是抢占式的,简单说就是随机选取新线程来运行

1. 常用方法

Semaphore 内部包含公平(FairSync)和非公平(NonfairSync)两种实现,继承内部类 Sync,其中 Sync 继承 AQS,作为 Semaphore 的公平锁和非公平锁的基类。

  1. // 当前线程尝试阻塞性的获取1个许可证。如果成功获取1个可用的许可证则会停止等待
  2. // 如果在等待过程中当前线程被中断,则抛出InterruptedException异常,并停止等待
  3. public void acquire() throws InterruptedException
  4. // 和acquire方法类似,区别是获取permits个许可证
  5. public void acquire(int permits) throws InterruptedException
  6. // 和acquire方法类似,但是不会响应中断,直到成功获取1个可用的许可证
  7. public void acquireUninterruptibly()
  8. public void acquireUninterruptibly(int permits)
  9. // 当前线程尝试非阻塞性的获取1个许可证,它只是在方法调用时进行一次尝试。
  10. // 如果当前线程成功获取了1个可用的许可证,则会返回true,否则会返回false
  11. public boolean tryAcquire ()
  12. public boolean tryAcquire(int permits)
  13. // 当前线程在给定时间内,阻塞性的尝试去获取1个许可证
  14. // 如果成功获取了1个可用的许可证,则返回true;否则等待timeout后超时返回false
  15. // 如果当前线程在timeout时间内被中断,则会抛出InterruptedException异常
  16. public boolean tryAcquire(long timeout ,TimeUnit unit) throws InterruptedException
  17. public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
  18. // 当前线程释放1个可用的许可证,以使其他等待许可证的线程可以进行资源访问
  19. public void release()
  20. public void release(int permits)

2. 代码示例

  1. public class SemaphoreTest implements Runnable {
  2. private final Semaphore semaphore = new Semaphore(5);
  3. @Override
  4. public void run() {
  5. try {
  6. semaphore.acquire();
  7. Thread.sleep(2000);
  8. System.out.println(Thread.currentThread().getName() + ":done");
  9. } catch (InterruptedException e) {
  10. //
  11. } finally {
  12. semaphore.release();
  13. }
  14. }
  15. public static void main(String[] args) throws InterruptedException{
  16. ExecutorService executorService = Executors.newFixedThreadPool(20);
  17. final SemaphoreTest t = new SemaphoreTest();
  18. for (int i = 0; i < 20; i++) {
  19. executorService.submit(t);
  20. }
  21. executorService.shutdown();
  22. }
  23. }

Exchanger

Exchanger(交换器)是 JDK 1.5 开始提供的一个用于两个工作线程之间交换数据的封装工具类。Exchanger 有点类似于 CyclicBarrier,CyclicBarrier 是一个栅栏,而 Exchanger 可以看成是一个双向栅栏。它提供一个同步点,在这个同步点的两个线程可以交换彼此的数据。

可简单地将 Exchanger 对象理解为一个包含两个格子的容器,通过 exchange 方法可以向两个格子中填充信息。当两个格子均被填充时,该对象会自动将两个格子的信息交换,然后返回给线程,从而实现两个线程的信息交换。如果第一个线程先执行 exchange 方法,它会一直等待第二个线程也执行 exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

1. 常用方法

  1. // 当前线程跟另外一个线程交换数据x,如果另外一个线程的数据准备好,当前线程会立刻返回,并获得另外一个线程的数据
  2. // 否则当前线程会进入等待状态
  3. public V exchange(V x) throws InterruptedException
  4. // 给定一个超时时间,如果在等待时间超时了,而且还没有收到对方的数据的话,则抛出TimeoutException异常
  5. public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

可以看出,当一个线程到达 exchang 调用点时,如果其他线程此前已经调用了此方法,则其他线程会被调度唤醒并与之进行数据交换,然后各自返回;如果其他线程还没到达交换点,则当前线程会被挂起,直至其他线程到达才会完成交换并正常返回,或者当前线程在等待过程中被中断或超时返回。

2. 代码示例

  1. public class ExchangerExample {
  2. private static final Integer WORK_COUNT = 2;
  3. public static void main(String[] args) {
  4. Exchanger<String> exchanger = new Exchanger<>();
  5. ExecutorService executorService = Executors.newCachedThreadPool();
  6. for (int i = 0; i < WORK_COUNT; i++) {
  7. executorService.execute(() -> {
  8. String beforeObj = Thread.currentThread().getName();
  9. try {
  10. String afterObj = exchanger.exchange(Thread.currentThread().getName());
  11. System.out.println(String.format("currentThread %s , before exchange %s , after exchange %s", Thread.currentThread().getName(), beforeObj, afterObj));
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. });
  16. }
  17. executorService.shutdown();
  18. }
  19. }

Phaser

CountDownLatch 和 CyclicBarrier 都是在 JDK 1.5 中引入的,而 Phaser 是 JDK 1.7 中引入的。Phaser 设计的初衷是实现多个线程类似步骤、阶段场景的协调,线程注册等待屏障条件触发,进而协调彼此间行动。它几乎可以取代 CountDownLatch 和 CyclicBarrier,功能更灵活、强大,并且支持动态调整需要控制的线程数。

Phaser 又被成为多阶段栅栏,它把多个线程协作执行的任务划分为多个阶段(phase),编程时需要明确各个阶段的任务,每个阶段都可以有任意个参与者,线程可以随时注册(registry)并参与到某个阶段,当到达的参与者数量满足栅栏设定的数量后,会进行阶段升级(advance)。
image.png
Phaser 比较适合这样一种场景,一种任务可以分为多个阶段,对于每个阶段,多个线程可以并发进行,但希望保证只有前面一个阶段的任务完成后才能开始后面的任务。虽然这种场景可以用多个 CyclicBarrier 来实现,但使用 CyclicBarrier 需要提前明确知道总共有多少个阶段,同时并行的任务数需要提前预定义好,且无法进行动态修改。而 Phaser 可同时解决这两个问题。

  1. public Phaser()
  2. public Phaser(int parties)
  3. public Phaser(Phaser parent)
  4. public Phaser(Phaser parent, int parties)
  • parties:进入下一阶段所需的参与方数量,默认为 0,可以通过 register() 方法来动态增加
  • parent:父 Phaser

1. 常用方法

  1. // 注册当前阶段需要的参与者,使parties加一
  2. public int register()
  3. // 该方法不作任何等待,直接返回下一阶段的序号
  4. public int arrive()
  5. // 当前线程当前阶段执行完毕,等待其它线程完成当前阶段。
  6. // 如果当前线程是该阶段最后一个未到达的,则直接返回下一阶段的序号,同时其它线程的该方法也返回下一个阶段的序号
  7. public int arriveAndAwaitAdvance()
  8. // 该方法立即返回下一阶段的序号,并且其它线程需要等待的个数减一,并且把当前线程从之后需要等待的成员中移除,即总的parties减一
  9. // 如果该Phaser是另外一个Phaser的子Phaser,并且该操作导致当前Phaser的成员数为0,则该操作也会将当前Phaser从其父Phaser中移除
  10. public int arriveAndDeregister()
  11. // 该方法等待某一阶段执行完毕。如果当前阶段不等于指定的阶段或者该Phaser已经被终止,则立即返回
  12. // 该阶段数一般由arrive()方法或arriveAndDeregister()方法返回
  13. // 返回下一阶段的序号,或者返回参数指定的值(如果该参数为负数),或者直接返回当前阶段序号(如果当前Phaser已经被终止)。
  14. public int awaitAdvance(int phase)
  15. public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
  16. public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
  17. throws InterruptedException, TimeoutException

2. 实现 CountDownLatch 功能

  1. public class PhaserExample {
  2. public static void main(String[] args) throws Exception {
  3. final int totalRequestCount = 10;
  4. // 注册主线程, 当外部条件满足时, 由主线程打开开关
  5. Phaser phaser = new Phaser(1);
  6. ExecutorService executorService = Executors.newCachedThreadPool();
  7. for (int i = 0; i < totalRequestCount; i++) {
  8. // 注册各个参与者线程
  9. phaser.register();
  10. executorService.execute(() -> {
  11. // 等待其它参与者线程到达
  12. int j = phaser.arriveAndAwaitAdvance();
  13. System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
  14. });
  15. }
  16. // 打开开关 [parties 共 11 个, 主线程从之后需要等待的成员中移除, 即 parties 还剩 10]
  17. phaser.arriveAndAwaitAdvance();
  18. System.out.println("主线程打开了开关");
  19. executorService.shutdown();
  20. }
  21. }

可以看到,上面的代码总共执行了 11 次 phaser.register(),可以把 11 理解为 CountDownLatch 中的 count 和 CyclicBarrier 中的 parties。而 phaser.arriveAndAwaitAdvance() 是一个阻塞方法,直到该方法被调用 11 次后所有的线程才能同时通过。

3. 实现 CyclicBarrier 功能

  1. public class PhaserExample {
  2. public static void main(String[] args) {
  3. final int totalRequestCount = 10;
  4. Phaser phaser = new Phaser();
  5. ExecutorService executorService = Executors.newCachedThreadPool();
  6. for (int i = 0; i < totalRequestCount; i++) {
  7. phaser.register();
  8. executorService.execute(() -> {
  9. // 等待其它参与者线程到达
  10. int j = phaser.arriveAndAwaitAdvance();
  11. System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
  12. });
  13. }
  14. executorService.shutdown();
  15. }
  16. }

CyclicBarrier 支持 barrierAction,Phaser 同样也支持。区别是 Phaser 的 barrierAction 需要重写 onAdvance 方法来进行定制当阶段执行结束时的操作。

  1. public class PhaserExample {
  2. public static void main(String[] args) {
  3. final int totalRequestCount = 10;
  4. Phaser phaser = new Phaser() {
  5. @Override
  6. protected boolean onAdvance(int phase, int registeredParties) {
  7. System.out.println(String.format("%s call back is ready.", Thread.currentThread().getName()));
  8. return super.onAdvance(phase, registeredParties);
  9. }
  10. };
  11. ExecutorService executorService = Executors.newCachedThreadPool();
  12. for (int i = 0; i < totalRequestCount; i++) {
  13. // 注册各个参与者线程
  14. phaser.register();
  15. executorService.execute(() -> {
  16. // 等待其它参与者线程到达
  17. int j = phaser.arriveAndAwaitAdvance();
  18. System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
  19. });
  20. }
  21. executorService.shutdown();
  22. }
  23. }

4. 通过 Phaser 实现分层

  1. public class PhaserExample {
  2. public static void main(String[] args) {
  3. final int parties = 3;
  4. final int phases = 4;
  5. Phaser phaser = new Phaser() {
  6. @Override
  7. protected boolean onAdvance(int phase, int registeredParties) {
  8. System.out.println("====== Phase :" + phase + "======");
  9. return super.onAdvance(phase, registeredParties);
  10. }
  11. };
  12. ExecutorService executorService = Executors.newCachedThreadPool();
  13. for (int i = 0; i < parties; i++) {
  14. // 注册各个参与者线程
  15. phaser.register();
  16. executorService.execute(() -> {
  17. for (int phase = 0; phase < phases; phase++) {
  18. // 等待其它参与者线程到达
  19. int j = phaser.arriveAndAwaitAdvance();
  20. System.out.println(String.format("currentThread:%s, Executing the task, currentPhase:%s", Thread.currentThread().getName(), j));
  21. }
  22. });
  23. }
  24. executorService.shutdown();
  25. }
  26. }