简述

两种常用的线程计数器CountDownLatch和循环屏障CyclicBarrier。

定义

  • CountDownLatch是一个非常实用的多线程控制工具类,称之为“倒计时器”,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。
  • CyclicBarrier 栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

    区别

  1. CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  2. CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。
  3. CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。


案例

CountDownLatch

CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
使用场景:zookeeper分布式锁。统计汇总等

工作原理

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后再关闭锁上等待的线程就可以恢复执行任务。
JUC中的CountDownLatch与CyclicBarrier - 图1

常用API

  1. CountDownLatch countDownLatch = new CountDownLatch(num);//num代表线程数
  2. CountDownLatch.countDown();//线程执行结束,做减一操作
  3. CountDownLatch.await(); //等待所有线程执行完

案例说明

EXCEL中sheet页汇总操作,某人在银行中流水导出为EXCEL,现需要统计汇总金额

  1. package com.itmck.countdownlatch;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.Random;
  5. import java.util.concurrent.CountDownLatch;
  6. public class CountDownLatchTest {
  7. public static void main(String[] args) throws Exception {
  8. //初始2为两个线程进行并行
  9. CountDownLatch countDownLatch = new CountDownLatch(2);
  10. List<Integer> integers = new ArrayList<>();
  11. int sum=0;
  12. for (int i = 0; i < 2; i++) {
  13. int finalI = i;
  14. new Thread(() -> {
  15. int i1 = getAnInt(integers);
  16. System.out.println("线程:" + finalI + " 计算结果:"+i1);
  17. countDownLatch.countDown();//执行计数器减1
  18. }).start();
  19. }
  20. countDownLatch.await();//等待所有线程执行完.
  21. System.out.println("主线程开始执行,多个线程得到的数值列表:"+integers);
  22. for (Integer integer : integers) {
  23. sum += integer;
  24. }
  25. System.out.println("汇总多个线程的数据结果:"+sum);
  26. }
  27. /**
  28. * 进行随机值得获取.模拟计算表格每个sheet页得到一个值
  29. * @param integers
  30. * @return
  31. */
  32. private static synchronized int getAnInt(List<Integer> integers) {
  33. int i1 = new Random().nextInt(10);
  34. integers.add(i1);
  35. return i1;
  36. }
  37. }

陪媳妇去医院检查
医院里边排队的人很多,如果一个人的话,要先看大夫,看完大夫再去排队交钱取药。
现在我们是双核,可以同时做这两个事(多线程)。
假设看大夫花3秒钟,排队交费取药花5秒钟。我们同时搞的话,5秒钟我们就能完成,然后一起回家(回到主线程)。
代码如下:

  1. /**
  2. * 看大夫任务
  3. */
  4. public class SeeDoctorTask implements Runnable {
  5. private CountDownLatch countDownLatch;
  6. public SeeDoctorTask(CountDownLatch countDownLatch){
  7. this.countDownLatch = countDownLatch;
  8. }
  9. public void run() {
  10. try {
  11. System.out.println("开始看医生");
  12. Thread.sleep(3000);
  13. System.out.println("看医生结束,准备离开病房");
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }finally {
  17. if (countDownLatch != null)
  18. countDownLatch.countDown();
  19. }
  20. }
  21. }
  22. /**
  23. * 排队的任务
  24. */
  25. public class QueueTask implements Runnable {
  26. private CountDownLatch countDownLatch;
  27. public QueueTask(CountDownLatch countDownLatch){
  28. this.countDownLatch = countDownLatch;
  29. }
  30. public void run() {
  31. try {
  32. System.out.println("开始在医院药房排队买药....");
  33. Thread.sleep(5000);
  34. System.out.println("排队成功,可以开始缴费买药");
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }finally {
  38. if (countDownLatch != null)
  39. countDownLatch.countDown();
  40. }
  41. }
  42. }
  43. /**
  44. * 配媳妇去看病,轮到媳妇看大夫时
  45. * 我就开始去排队准备交钱了。
  46. */
  47. public class CountDownLaunchRunner {
  48. public static void main(String[] args) throws InterruptedException {
  49. long now = System.currentTimeMillis();
  50. CountDownLatch countDownLatch = new CountDownLatch(2);
  51. new Thread(new SeeDoctorTask(countDownLatch)).start();
  52. new Thread(new QueueTask(countDownLatch)).start();
  53. //等待线程池中的2个任务执行完毕,否则一直
  54. countDownLatch.await();
  55. System.out.println("over,回家 cost:"+(System.currentTimeMillis()-now));
  56. }
  57. }

CyclicBarrier

栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
image.png

工作原理

CyclicBarrier内部是通过条件队列trip来对线程进行阻塞的,并且其内部维护了两个int型的变量parties和count,parties表示每次拦截的线程数,该值在构造时进行赋值。count是内部计数器,它的初始值和parties相同,以后随着每次await方法的调用而减1,直到减为0就将所有线程唤醒。

常用API

  1. public CyclicBarrier(int parties)
  2. public CyclicBarrier(int parties, Runnable barrierAction)
  • parties 是参与线程的个数
  • 第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务

    1. cyclicBarrier.await();
  • 线程调用 await() 表示自己已经到达栅栏

  • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时

    案例说明

    模拟高并发抢购。银行流水分sheet汇总 ```java package com.itmck.cyclicbarrier;

import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger;

public class CyclicBarrierTest {

  1. public static void main(String[] args) throws Exception {
  2. List<Integer> integers = new ArrayList<>();
  3. AtomicInteger sum = new AtomicInteger();
  4. CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
  5. //多个线程执行完后,走这里逻辑
  6. System.out.println("初始化线程完毕,多个线程得到的数值列表:" + integers);
  7. for (Integer integer : integers) {
  8. sum.addAndGet(integer);
  9. }
  10. System.out.println("汇总多个线程的数据结果:" + sum);
  11. });
  12. for (int i = 0; i < 2; i++) {
  13. new Thread(() -> {
  14. try {
  15. int i1 = getAnInt();//每个线程计算完结果
  16. integers.add(i1);
  17. System.out.println("线程:" + Thread.currentThread().getName() + " 计算结果:" + i1);
  18. cyclicBarrier.await();//在这里等待着。等到所有线程计算完结果后,进行存入集合中
  19. integers.add(i1);
  20. } catch (Exception e) {
  21. throw new RuntimeException(e);
  22. }
  23. }).start();
  24. }
  25. }
  26. /**
  27. * 进行随机值得获取.模拟计算表格每个sheet页得到一个值
  28. *
  29. * @return 返回每个sheet页汇总数据
  30. */
  31. private static synchronized int getAnInt() {
  32. return new Random().nextInt(10);
  33. }

}

  1. 高并发抢购
  2. ```java
  3. @RunWith(SpringRunner.class)
  4. @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
  5. @Slf4j
  6. public class RedisDisLockControllerTest {
  7. @Autowired
  8. private StringRedisTemplate redisTemplate;
  9. @Autowired
  10. private RedissonClient redissonClient;
  11. @Before
  12. public void init() {
  13. String productNum = "3";
  14. redisTemplate.opsForValue().set("stock", productNum);
  15. System.out.println("初始化商品:" + productNum + "个");
  16. }
  17. @Test
  18. public void deleteProduct() {
  19. int count = 10;//并发线程数
  20. ExecutorService executorService = Executors.newFixedThreadPool(count);
  21. /**
  22. * param1:是参与线程的个数
  23. * param2:第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
  24. *
  25. */
  26. CyclicBarrier cyclicBarrier = new CyclicBarrier(count, () -> System.out.println("==========所有线程准备完毕======="));
  27. for (int i = 0; i < count; i++) {
  28. executorService.execute(() -> {
  29. try {
  30. System.out.println(Thread.currentThread().getName() + "-->来到栅栏准备抢购商品");
  31. cyclicBarrier.await();//等待最后一个线程初始化完毕
  32. this.getProducts();//抢购商品
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. });
  37. }
  38. try {
  39. //这里睡10000毫秒是为了主线程不关闭
  40. Thread.sleep(10000);
  41. } catch (Exception e) {
  42. e.printStackTrace();
  43. }
  44. executorService.shutdown(); // 关闭线程池
  45. }
  46. private void getProducts() {
  47. String lockKey = "product_001";
  48. RLock lock = redissonClient.getLock(lockKey);//获取redis锁
  49. lock.lock();//加锁,实现锁续命.默认30s
  50. try {
  51. int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get("stock")));
  52. if (stock > 0) {
  53. int realStock = stock - 1;
  54. redisTemplate.opsForValue().set("stock", realStock + "");
  55. System.out.println("扣减库存成功,剩余:" + realStock);
  56. } else {
  57. System.out.println("库存不足");
  58. }
  59. } finally {
  60. lock.unlock();//解锁
  61. }
  62. }
  63. }