1、什么是控制并发流程

让线程之间相互配合,来满足业务逻辑

2、常见的并发流程工具类

作用 说明
Semaphore 信号量,可以控制“许可证”数量,来保证线程之间的配合 线程只有拿到许可证之后才能继续运行。相比于其他同步器,更灵活
CyclicBarrier 线程会等待,直到足够多的线程达到了事先规定的数目,一旦达到触发条件,就可以进行下一步动作 适用于线程之间相互等待处理结果就绪的场景
Phaser 和CyclicBarrier类似,但是计数可变 java7加入
CountDownlatch 和CyclicBarrier类似,数量递减到0时,触发动作 不可重复使用
Exchanger 让两个线程在合适时交换对象 适用场景:当两个线程工作在同一个类下的不同实例中,用于交换数据
Condition 可以控制线程的等待和唤醒 是Object.wait()的升级版

3、CountDownLatch类的作用

  • 用法1:一个线程等待多个线程都执行完毕,再继续自己的工作
  • 用法2:多个线程等待某一个线程的信号,同时执行

3.1、用法一实例

  1. public static void main(String[] args) throws InterruptedException {
  2. ExecutorService executorService = Executors.newFixedThreadPool(10);
  3. CountDownLatch latch = new CountDownLatch(5);
  4. Runnable runnable = new Runnable() {
  5. @Override
  6. public void run() {
  7. System.out.println("子线程正在执行");
  8. try {
  9. Thread.sleep(50);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. System.out.println("子线程执行完毕!");
  14. latch.countDown();
  15. }
  16. };
  17. IntStream.range(0,5).forEach(e -> executorService.submit(runnable));
  18. System.out.println("主线程阻塞,等待子线程运行完毕!");
  19. latch.await();
  20. System.out.println("子线程运行完毕,主线程继续运行");
  21. Thread.sleep(500);
  22. executorService.shutdownNow();
  23. }

输出

  1. 子线程正在执行
  2. 子线程正在执行
  3. 子线程正在执行
  4. 子线程正在执行
  5. 子线程正在执行
  6. 主线程阻塞,等待子线程运行完毕!
  7. 子线程执行完毕!
  8. 子线程执行完毕!
  9. 子线程执行完毕!
  10. 子线程执行完毕!
  11. 子线程执行完毕!
  12. 子线程运行完毕,主线程继续运行

3.2、用法二实例

  1. public static void main(String[] args) throws InterruptedException {
  2. ExecutorService executorService = Executors.newFixedThreadPool(10);
  3. CountDownLatch latch = new CountDownLatch(1);
  4. Runnable runnable = new Runnable() {
  5. @Override
  6. public void run() {
  7. System.out.println("线程准备完毕,等待指令!");
  8. try {
  9. latch.await();
  10. Thread.sleep(10);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println("线程执行完毕!");
  15. }
  16. };
  17. IntStream.range(0,5).forEach(e -> executorService.submit(runnable));
  18. Thread.sleep(50);
  19. System.out.println("发送指令,开始运行");
  20. latch.countDown();
  21. Thread.sleep(500);
  22. executorService.shutdownNow();
  23. }

输出

  1. 线程准备完毕,等待指令!
  2. 线程准备完毕,等待指令!
  3. 线程准备完毕,等待指令!
  4. 线程准备完毕,等待指令!
  5. 线程准备完毕,等待指令!
  6. 发送指令,开始运行
  7. 线程执行完毕!
  8. 线程执行完毕!
  9. 线程执行完毕!
  10. 线程执行完毕!
  11. 线程执行完毕!

4、Semaphore信号量

4.1、简介

作用:
常用于限制可以访问某些资源的线程数量,例如通过 Semaphore 限流

原理:
信号量会维护“许可证”的计数,而线程去访问共享资源前,必须先拿到许可证,线程也可以“释放”一个许可证归还给信号量。当信号量拥有的许可证数量减到 0 时,如果下个线程还想要获得许可证,那么这个线程就必须等待,直到之前得到许可证的线程释放。

4.2、主要方法

  • 构造函数:public Semaphore(int permits, boolean fair)

第一个参数是许可证的数量,另一个参数是是否公平,第二个参数传入 false,则代表非公平策略,也就有可能插队

  • acquire() 和 acquireUninterruptibly()

获取许可证,第一个方法可响应中断,第二个方法忽略中断 ,如果此时信号量已经没有剩余的许可证,线程会等待

  • release()

    释放信号量

  • tryAcquire()

尝试获取,获取到为true

  • tryAcquire(long timeout, TimeUnit unit)

如果等待期间获取到了许可证,则往下继续执行;如果超时时间到,依然获取不到许可证,它就认为获取失败,且返回 false。

  • availablePermits()

这个方法用来查询可用许可证的数量,返回一个整型的结果

4.3、信号量特殊用法

可以一次性获取或者释放多个许可证(获取和释放许可证的数量必须保持一致)

注意设置公平性,设置true更为合理

释放和获取对线程无要求,可以A获取,B释放,合理即可

4.4、信号量实例

  1. public static void main(String[] args) {
  2. Semaphore semaphore = new Semaphore(3,true);
  3. ExecutorService executorService = Executors.newFixedThreadPool(10);
  4. Runnable runnable = () -> {
  5. try {
  6. semaphore.acquire();
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. System.out.println(Thread.currentThread().getName() + "线程开始运行");
  11. try {
  12. Thread.sleep(500);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. System.out.println(Thread.currentThread().getName() + "线程运行完毕");
  17. semaphore.release();
  18. };
  19. IntStream.range(0,9).forEach(e -> executorService.submit(runnable));
  20. }

输出

  1. pool-1-thread-2线程开始运行
  2. pool-1-thread-1线程开始运行
  3. pool-1-thread-3线程开始运行
  4. pool-1-thread-2线程运行完毕
  5. pool-1-thread-1线程运行完毕
  6. pool-1-thread-3线程运行完毕
  7. pool-1-thread-5线程开始运行
  8. pool-1-thread-4线程开始运行
  9. pool-1-thread-6线程开始运行
  10. pool-1-thread-6线程运行完毕
  11. pool-1-thread-7线程开始运行
  12. pool-1-thread-4线程运行完毕
  13. pool-1-thread-5线程运行完毕
  14. pool-1-thread-8线程开始运行
  15. pool-1-thread-9线程开始运行
  16. pool-1-thread-9线程运行完毕
  17. pool-1-thread-8线程运行完毕
  18. pool-1-thread-7线程运行完毕

5、Condition接口

5.1、简介

作用:
当线程1需要等待某个条件才能继续运行的时候,执行condition.await()方法,线程进入阻塞状态。线程2在某个合适的时机执行condition.signal()方法,jvm从阻塞线程中找到等待该condition的线程,唤醒,变为可执行状态

5.2、主要方法

  • signal()

唤醒一个等待时间最长的线程

  • signalAll()

唤醒所有等待该condition的线程

5.3、实例

5.3.1、基本用法

  1. public class ConditionDemo {
  2. private ReentrantLock lock = new ReentrantLock();
  3. private Condition condition = lock.newCondition();
  4. void method1() throws InterruptedException {
  5. lock.lock();
  6. try{
  7. System.out.println("条件不满足,开始等待!");
  8. condition.await();
  9. System.out.println("条件满足,继续运行!");
  10. }finally {
  11. lock.unlock();
  12. }
  13. }
  14. void method2() throws InterruptedException {
  15. lock.lock();
  16. try{
  17. System.out.println("准备工作完成,唤醒等待线程!");
  18. condition.signal();
  19. }finally {
  20. lock.unlock();
  21. }
  22. }
  23. public static void main(String[] args) throws InterruptedException {
  24. ConditionDemo conditionDemo = new ConditionDemo();
  25. new Thread(() -> {
  26. try {
  27. Thread.sleep(100);
  28. conditionDemo.method2();
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }).start();
  33. conditionDemo.method1();
  34. }
  35. }

输出

  1. 条件不满足,开始等待!
  2. 准备工作完成,唤醒等待线程!
  3. 条件满足,继续运行!

5.3.2、condition实现生产者消费者模式

  1. public class ConditionDemo2 {
  2. final Integer QUEUE_SIZE = 10;
  3. private Queue<Integer> queue = new PriorityQueue<>(QUEUE_SIZE);
  4. private Lock lock = new ReentrantLock();
  5. Condition notFull = lock.newCondition();
  6. Condition notEmpty = lock.newCondition();
  7. class Producer implements Runnable{
  8. @Override
  9. public void run() {
  10. producer();
  11. }
  12. private void producer() {
  13. while (true){
  14. lock.lock();
  15. try {
  16. if(queue.size() == QUEUE_SIZE){
  17. System.out.println("队列满,生产者停止生产!");
  18. notFull.await();
  19. }
  20. queue.add(new Random().nextInt());
  21. System.out.println("生产者生产一个数据,队列还有" + queue.size() + "个数据");
  22. Thread.sleep(10);
  23. notEmpty.signalAll();
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. } finally {
  27. lock.unlock();
  28. }
  29. }
  30. }
  31. }
  32. class Consumer implements Runnable{
  33. @Override
  34. public void run() {
  35. consumer();
  36. }
  37. private void consumer() {
  38. while (true){
  39. lock.lock();
  40. try {
  41. if(queue.size() == 0){
  42. System.out.println("队列空,消费者停止消费!");
  43. notEmpty.await();
  44. }
  45. Thread.sleep(10);
  46. queue.poll();
  47. System.out.println("消费者消费一个数据!队列还有 " + queue.size() + "个数据");
  48. notFull.signalAll();
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. } finally {
  52. lock.unlock();
  53. }
  54. }
  55. }
  56. }
  57. public static void main(String[] args) {
  58. ConditionDemo2 conditionDemo2 = new ConditionDemo2();
  59. Consumer consumer = conditionDemo2.new Consumer();
  60. Producer producer = conditionDemo2.new Producer();
  61. new Thread(producer).start();
  62. new Thread(consumer).start();
  63. }
  64. }

6、CyclicBarrier循环栅栏

6.1、 简介

CyclicBarrier可以构造一个集合点,当一个线程执行完毕,就会到集合点等待,当集合点到达指定数量的线程后,栅栏撤销,所有线程统一出发。CyclicBarrier可以重复使用

对应实际场景:坐大巴车,等到达到大巴人数后发车

6.2、实例

  1. public class CyclicBarrierDemo {
  2. static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() ->{
  3. System.out.println("运动员达到五人次组,开始比赛");
  4. });
  5. static class Task implements Runnable{
  6. @Override
  7. public void run() {
  8. System.out.println("运动员" + Thread.currentThread().getName() + " 开始出发!");
  9. try {
  10. Thread.sleep((long)(Math.random() * 1000));
  11. System.out.println("运动员" + Thread.currentThread().getName() + " 到达目的地!");
  12. cyclicBarrier.await();
  13. System.out.println("运动员" + Thread.currentThread().getName() + " 开始比赛!");
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. } catch (BrokenBarrierException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. public static void main(String[] args) {
  22. ExecutorService executorService = Executors.newFixedThreadPool(10);
  23. IntStream.range(0,10).forEach(e -> executorService.submit(new Task()));
  24. }
  25. }

输出

  1. 运动员pool-1-thread-3 开始出发!
  2. 运动员pool-1-thread-4 开始出发!
  3. 运动员pool-1-thread-7 开始出发!
  4. 运动员pool-1-thread-1 开始出发!
  5. 运动员pool-1-thread-2 开始出发!
  6. 运动员pool-1-thread-5 开始出发!
  7. 运动员pool-1-thread-8 开始出发!
  8. 运动员pool-1-thread-6 开始出发!
  9. 运动员pool-1-thread-9 开始出发!
  10. 运动员pool-1-thread-10 开始出发!
  11. 运动员pool-1-thread-7 到达目的地!
  12. 运动员pool-1-thread-2 到达目的地!
  13. 运动员pool-1-thread-10 到达目的地!
  14. 运动员pool-1-thread-3 到达目的地!
  15. 运动员pool-1-thread-6 到达目的地!
  16. 运动员达到五人次组,开始比赛
  17. 运动员pool-1-thread-6 开始比赛!
  18. 运动员pool-1-thread-7 开始比赛!
  19. 运动员pool-1-thread-2 开始比赛!
  20. 运动员pool-1-thread-3 开始比赛!
  21. 运动员pool-1-thread-10 开始比赛!
  22. 运动员pool-1-thread-1 到达目的地!
  23. 运动员pool-1-thread-5 到达目的地!
  24. 运动员pool-1-thread-9 到达目的地!
  25. 运动员pool-1-thread-4 到达目的地!
  26. 运动员pool-1-thread-8 到达目的地!
  27. 运动员达到五人次组,开始比赛
  28. 运动员pool-1-thread-8 开始比赛!
  29. 运动员pool-1-thread-4 开始比赛!
  30. 运动员pool-1-thread-9 开始比赛!
  31. 运动员pool-1-thread-5 开始比赛!
  32. 运动员pool-1-thread-1 开始比赛!

6.3、CyclicBarrier和CountDownLatch的区别

  1. 作用对象不同

CyclicBarrier是用于线程的,而CountDownLatch是用于事件的

  1. 可重用性不同

CountDownLatch倒数到0就不能使用了,CyclicBarrier可以重复使用

  1. CyclicBarrier可以利用runnable进行额外处理