一、CountDownLatch

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

用给定的计数初始化 CountDownLatch。由于调用了 countDown()) 方法,所以在当前计数到达零之前,await) 方法会一直受阻塞。之后,会释放所有等待的线程,await) 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier

CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown()) 的线程打开入口前,所有调用 await) 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。

CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个 await)。

构造器中的计数值(count)实际上就是闭锁需要等待的线程数量(任务数量)。这个值只能被设置一次,而且 CountDownLatch 没有提供任何机制去重新设置这个计数值的方法

与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须再启动其他线程后,立即调用 await 方法,这样主线程的操作才会被阻塞,直到其他线程各自完成了自己的任务,并调用了 countDown 方法

每次调用 countDown 方法都会再原有的 cout 基础上减 1,当 count = 0 的时候,主线程就可以通过 await 方法(接阻塞),继续开始工作了

比如有一个任务A,他执行的前提是另外三个任务执行完,他才能执行,此时就可以用 ConutDownLatch 来实现这种功能需求。

  1. public class CountDownLatchDemo {
  2. static List<String> companies = Arrays.asList("东方航空", "南方航空", "海南航空");
  3. static List<String> ticketList = new ArrayList<>();
  4. public static void main(String[] args) throws InterruptedException {
  5. String origin = "北京";
  6. String dest = "上海";
  7. Thread[] threads = new Thread[companies.size()];
  8. CountDownLatch countDownLatch = new CountDownLatch(companies.size());
  9. for (int i = 0; i < threads.length; i++) {
  10. String name = companies.get(i);
  11. threads[i] = new Thread(() -> {
  12. System.out.printf("%s 查询从 %s 到 %s 的机票...\n", name, origin, dest);
  13. int tickets = new Random().nextInt(10);
  14. try {
  15. TimeUnit.SECONDS.sleep(tickets);
  16. ticketList.add(name + " 剩余票数 " + tickets);
  17. System.out.printf("%s 查询完成... [complete]\n", name);
  18. countDownLatch.countDown();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. });
  23. threads[i].start();
  24. }
  25. countDownLatch.await();
  26. System.out.println("========= 查询结果如下 ==========");
  27. for (String s : ticketList) {
  28. System.out.println(s);
  29. }
  30. }
  31. }

二、CyclicBarrier

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环Barrier

每个线程执行时,都会遇到一个屏障,直到所有的线程执行结束,然后屏障才会打开,使所有线程继续往下执行。

在 CyclicBarrier 的内部定义了一个 Lock 对象,每当一个线程调用 await 方法时,将拦截的线程数加 1,然后判断拦截的线程数是否等于初始值 parties,如果不是,进入 Lock 对象的条件队列等待,如果是,执行 barrierAction 的 Runnable 方法,然后将锁的条件队列中的所有线程放入锁等待队列中,这些线程就可以依次获取锁,释放锁。

  1. public class CyclicBarrierDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. CyclicBarrier cyclicBarrier = new CyclicBarrier(8);
  4. List<String> ranks = new ArrayList<>();
  5. for (int i = 0; i < 8; i++) {
  6. new Thread(() -> {
  7. try {
  8. TimeUnit.SECONDS.sleep(new Random().nextInt(10));
  9. System.out.println(Thread.currentThread().getName() + "准备好了");
  10. cyclicBarrier.await();
  11. System.out.println("选手" + Thread.currentThread().getName() + "开始起跑...");
  12. TimeUnit.SECONDS.sleep(new Random().nextInt(10));
  13. ranks.add(Thread.currentThread().getName());
  14. System.out.println("选手" + Thread.currentThread().getName() + "到达终点");
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. }, "player[" + i + "]").start();
  19. }
  20. }
  21. }

比较 CountDownLatch 与 CyclicBarrier

CountDownLatch 类似于集结点的概念,很多个线程都做完自己的事以后,等待其他线程完成自己的任务,全部线程完成后再执行 await 后面的代码,不同的是 CountDownLatch 需要自己调用 countDown 方法,来减少一个数,然后调用 await 方法后面的代码,而 CyclicBarrier 是线程自己直接调用 await 方法,来增加一个数,并且让自己等待,不用去关心 paries 数量,当总数 = parties 的数量时,后面的代码就会一起执行。

所以从上面来看,CountDownLatch 更适用于多个线程合作的情况(等大家都完成好了,我就可以继续执行了),而 CyclicBarrier 则是,等大家都准备好了,我们再一起执行

三、Semaphore

Semaphore 一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire()),然后再获取该许可。每个 release()) 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目

  • 信号量:代表可以使用的资源数量

    1. 请求资源:acquire 方法,获取许可(资源的数量可以满足)
    2. 使用资源:具体业务
    3. 释放资源:release 方法
  • 使用场景:通常是资源有限的情况下使用,如:停车场的停车位使用情况,公园人流量限流等

  1. public class SemaphoreDemo {
  2. public static void main(String[] args) {
  3. Semaphore semaphore = new Semaphore(5);
  4. Thread[] cars = new Thread[10];
  5. for (int i = 0; i < 10; i++) {
  6. cars[i] = new Thread(() -> {
  7. try {
  8. // 1、请求许可
  9. semaphore.acquire();
  10. System.out.println(Thread.currentThread().getName() + " 进入停车厂");
  11. // 2、使用资源
  12. TimeUnit.SECONDS.sleep(new Random().nextInt(10));
  13. // 3、释放资源
  14. semaphore.release();
  15. System.out.println(Thread.currentThread().getName() + " 离开停车厂");
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }, "Car[" + i + "]");
  20. cars[i].start();
  21. }
  22. }
  23. }