JAVA并发包中有三个类用于同步一批线程的行为,分别是CountDownLatch、Semaphore和CyclicBarrier。

1.CountDownLatch

6.1 CountDownLatch、Semaphore和CyclicBarrier。 - 图1
CountDownLatch是一个计数器闭锁,通过它可以完成类似于阻塞当前线程的功能,即:一个线程或多个线程一直等待,直到其他线程执行的操作完成。CountDownLatch用一个给定的计数器来初始化,该计数器的操作是原子操作,即同时只能有一个线程去操作该计数器。调用该类await方法的线程会一直处于阻塞状态,直到其他线程调用countDown方法使当前计数器的值变为零,每次调用countDown计数器的值减1。当计数器值减至零时,所有因调用await()方法而处于等待状态的线程就会继续往下执行。这种现象只会出现一次,因为计数器不能被重置,如果业务上需要一个可以重置计数次数的版本,可以考虑使用CycliBarrier。
在某些业务场景中,程序执行需要等待某个条件完成后才能继续执行后续的操作;典型的应用如并行计算,当某个处理的运算量很大时,可以将该运算任务拆分成多个子任务,等待所有的子任务都完成之后,父任务再拿到所有子任务的运算结果进行汇总。

  1. @Slf4j
  2. class CountDownLatchExample1 {
  3. private final static int threadCount = 10;
  4. public static void main(String[] args) throws Exception {
  5. ExecutorService exec = Executors.newCachedThreadPool();
  6. final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
  7. for (int i = 0; i < threadCount; i++) {
  8. final int threadNum = i;
  9. exec.execute(() -> {
  10. try {
  11. test(threadNum);
  12. } catch (Exception e) {
  13. log.error("exception", e);
  14. } finally {
  15. countDownLatch.countDown();
  16. }
  17. });
  18. }
  19. countDownLatch.await();
  20. log.info("finish");
  21. exec.shutdown();
  22. }
  23. private static void test(int threadNum) throws Exception {
  24. Thread.sleep(100);
  25. log.info("{}", threadNum);
  26. Thread.sleep(100);
  27. }
  28. }

结果:
20:18:32.917 [pool-1-thread-7] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 6 20:18:32.917 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 5 20:18:32.919 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 4 20:18:32.918 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 0 20:18:32.918 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 2 20:18:32.916 [pool-1-thread-9] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 8 20:18:32.918 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 3 20:18:32.916 [pool-1-thread-10] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 9 20:18:32.916 [pool-1-thread-8] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 7 20:18:32.917 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 1 20:18:33.032 [main] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - finish

下面演示设置countDownLatch.await时间,看看会出现什么结果

  1. @Slf4j
  2. class CountDownLatchExample2 {
  3. private final static int threadCount = 10;
  4. public static void main(String[] args) throws Exception {
  5. ExecutorService exec = Executors.newCachedThreadPool();
  6. final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
  7. for (int i = 0; i < threadCount; i++) {
  8. final int threadNum = i;
  9. exec.execute(() -> {
  10. try {
  11. test(threadNum);
  12. } catch (Exception e) {
  13. log.error("exception", e);
  14. } finally {
  15. countDownLatch.countDown();
  16. }
  17. });
  18. }
  19. countDownLatch.await(10, TimeUnit.MILLISECONDS);
  20. log.info("finish");
  21. exec.shutdown();
  22. }
  23. private static void test(int threadNum) throws Exception {
  24. Thread.sleep(100);
  25. log.info("{}", threadNum);
  26. }
  27. }

结果: 超过指定时间跳过等待
20:19:34.878 [main] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - finish 20:19:34.964 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 2 20:19:34.965 [pool-1-thread-10] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 9 20:19:34.964 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 0 20:19:34.965 [pool-1-thread-8] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 7 20:19:34.964 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 1 20:19:34.965 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 4 20:19:34.965 [pool-1-thread-7] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 6 20:19:34.964 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 3 20:19:34.965 [pool-1-thread-9] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 8 20:19:34.965 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 5

2.Semaphore

6.1 CountDownLatch、Semaphore和CyclicBarrier。 - 图2
Semaphore与CountDownLatch相似,不同的地方在于Semaphore的值被获取到后是可以释放的,并不像CountDownLatch那样一直减到底。它也被更多地用来限制流量,类似阀门的功能。如果限定某些资源最多有N个线程可以访问,那么超过N个主不允许再有线程来访问,同时当现有线程结束后,就会释放,然后允许新的线程进来。有点类似于锁的lock与 unlock过程。相对来说他也有两个主要的方法:
用于获取权限的acquire(),其底层实现与CountDownLatch.countdown()类似;
用于释放权限的release(),其底层实现与acquire()是一个互逆的过程。

  1. import lombok.extern.slf4j.Slf4j;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.Semaphore;
  5. @Slf4j
  6. public class SemaphoreExample1 {
  7. private final static int threadCount = 20;
  8. public static void main(String[] args) throws Exception {
  9. ExecutorService exec = Executors.newCachedThreadPool();
  10. // 每次最多三个线程获取许可
  11. final Semaphore semaphore = new Semaphore(3);
  12. for (int i = 0; i < threadCount; i++) {
  13. final int threadNum = i;
  14. exec.execute(() -> {
  15. try {
  16. semaphore.acquire(); // 获取一个许可
  17. test(threadNum);
  18. semaphore.release(); // 释放一个许可
  19. } catch (Exception e) {
  20. log.error("exception", e);
  21. }
  22. });
  23. }
  24. exec.shutdown();
  25. }
  26. private static void test(int threadNum) throws Exception {
  27. log.info("{}", threadNum);
  28. Thread.sleep(1000);
  29. }
  30. }

3.CyclicBarrier

CyclicBarrier也是一个同步辅助类,它允许一组线程相互等待,直到到达某个公共屏障点(common barrier point)。通过它可以完成多个线程之间相互等待,只有当每个线程都准备就绪后,才能各自继续往下执行后面的操作。类似于CountDownLatch,它也是通过计数器来实现的。当某个线程调用await方法时,该线程进入等待状态,且计数器加1,当计数器的值达到设置的初始值时,所有因调用await进入等待状态的线程被唤醒,继续执行后续操作。因为CycliBarrier在释放等待线程后可以重用,所以称为循环barrier。CycliBarrier支持一个可选的Runnable,在计数器的值到达设定值后(但在释放所有线程之前),该Runnable运行一次,注,Runnable在每个屏障点只运行一个。
使用场景类似于CountDownLatch与CountDownLatch的区别

  • CountDownLatch主要是实现了1个或N个线程需要等待其他线程完成某项操作之后才能继续往下执行操作,描述的是1个线程或N个线程等待其他线程的关系。CyclicBarrier主要是实现了多个线程之间相互等待,直到所有的线程都满足了条件之后各自才能继续执行后续的操作,描述的多个线程内部相互等待的关系。
  • CountDownLatch是一次性的,而CyclicBarrier则可以被重置而重复使用。
    1. @Slf4j
    2. class CyclicBarrierExample1 {
    3. private static CyclicBarrier barrier = new CyclicBarrier(5);
    4. public static void main(String[] args) throws Exception {
    5. ExecutorService executor = Executors.newCachedThreadPool();
    6. for (int i = 0; i < 10; i++) {
    7. final int threadNum = i;
    8. Thread.sleep(1000);
    9. executor.execute(() -> {
    10. try {
    11. race(threadNum);
    12. } catch (Exception e) {
    13. log.error("exception", e);
    14. }
    15. });
    16. }
    17. executor.shutdown();
    18. }
    19. private static void race(int threadNum) throws Exception {
    20. Thread.sleep(1000);
    21. log.info("{} is ready", threadNum);
    22. barrier.await();
    23. log.info("{} continue", threadNum);
    24. }
    25. }
    ```shell int statusTest1 = 0; CopyOnWriteArrayList> list = new CopyOnWriteArrayList(); AtomicInteger atomicInteger = new AtomicInteger(0);

@GetMapping(“testId”) public void testId() throws InterruptedException { //同步代码块,防止多次点击 synchronized (this) { if (statusTest1 > 0) { return; } statusTest1 = 1; }

  1. int threadCount = 100;
  2. CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount, new Runnable() {
  3. @Override
  4. public void run() {
  5. System.out.println(Thread.currentThread().getName()+"所有线程完成");
  6. Map<String, List<Map<String, Object>>> gbList = list.stream().collect(Collectors.groupingBy(e -> e.get("snowId").toString()));
  7. System.out.println(gbList);
  8. }
  9. });
  10. for(int i=1; i<=threadCount; i++){
  11. final int finalI = i;
  12. new Thread(new Runnable() {
  13. @SneakyThrows
  14. @Override
  15. public void run() {
  16. for(int j= 1;j<=6000;j++){
  17. Map<String, Object> map = new HashMap<>();
  18. map.put("snowId", IdGenerator.nextId());
  19. list.add(map);
  20. int now = atomicInteger.incrementAndGet();
  21. System.out.println("now"+now);
  22. }
  23. cyclicBarrier.await();
  24. }
  25. }, String.valueOf(finalI)).start();
  26. }
  27. }

```