1、什么是控制并发流程
让线程之间相互配合,来满足业务逻辑
2、常见的并发流程工具类
| 类 | 作用 | 说明 | 
|---|---|---|
| Semaphore | 信号量,可以控制“许可证”数量,来保证线程之间的配合 | 线程只有拿到许可证之后才能继续运行。相比于其他同步器,更灵活 | 
| CyclicBarrier | 线程会等待,直到足够多的线程达到了事先规定的数目,一旦达到触发条件,就可以进行下一步动作 | 适用于线程之间相互等待处理结果就绪的场景 | 
| Phaser | 和CyclicBarrier类似,但是计数可变 | java7加入 | 
| CountDownlatch | 和CyclicBarrier类似,数量递减到0时,触发动作 | 不可重复使用 | 
| Exchanger | 让两个线程在合适时交换对象 | 适用场景:当两个线程工作在同一个类下的不同实例中,用于交换数据 | 
| Condition | 可以控制线程的等待和唤醒 | 是Object.wait()的升级版 | 
3、CountDownLatch类的作用
- 用法1:一个线程等待多个线程都执行完毕,再继续自己的工作
 - 用法2:多个线程等待某一个线程的信号,同时执行
 
3.1、用法一实例
public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);CountDownLatch latch = new CountDownLatch(5);Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println("子线程正在执行");try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("子线程执行完毕!");latch.countDown();}};IntStream.range(0,5).forEach(e -> executorService.submit(runnable));System.out.println("主线程阻塞,等待子线程运行完毕!");latch.await();System.out.println("子线程运行完毕,主线程继续运行");Thread.sleep(500);executorService.shutdownNow();}
输出
子线程正在执行子线程正在执行子线程正在执行子线程正在执行子线程正在执行主线程阻塞,等待子线程运行完毕!子线程执行完毕!子线程执行完毕!子线程执行完毕!子线程执行完毕!子线程执行完毕!子线程运行完毕,主线程继续运行
3.2、用法二实例
public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);CountDownLatch latch = new CountDownLatch(1);Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println("线程准备完毕,等待指令!");try {latch.await();Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程执行完毕!");}};IntStream.range(0,5).forEach(e -> executorService.submit(runnable));Thread.sleep(50);System.out.println("发送指令,开始运行");latch.countDown();Thread.sleep(500);executorService.shutdownNow();}
输出
线程准备完毕,等待指令!线程准备完毕,等待指令!线程准备完毕,等待指令!线程准备完毕,等待指令!线程准备完毕,等待指令!发送指令,开始运行线程执行完毕!线程执行完毕!线程执行完毕!线程执行完毕!线程执行完毕!
4、Semaphore信号量
4.1、简介
作用:
常用于限制可以访问某些资源的线程数量,例如通过 Semaphore 限流
原理:
信号量会维护“许可证”的计数,而线程去访问共享资源前,必须先拿到许可证,线程也可以“释放”一个许可证归还给信号量。当信号量拥有的许可证数量减到 0 时,如果下个线程还想要获得许可证,那么这个线程就必须等待,直到之前得到许可证的线程释放。
4.2、主要方法
- 构造函数:public Semaphore(int permits, boolean fair)
 
第一个参数是许可证的数量,另一个参数是是否公平,第二个参数传入 false,则代表非公平策略,也就有可能插队
- acquire() 和 acquireUninterruptibly()
 
获取许可证,第一个方法可响应中断,第二个方法忽略中断 ,如果此时信号量已经没有剩余的许可证,线程会等待
release()
释放信号量
tryAcquire()
尝试获取,获取到为true
- tryAcquire(long timeout, TimeUnit unit)
 
如果等待期间获取到了许可证,则往下继续执行;如果超时时间到,依然获取不到许可证,它就认为获取失败,且返回 false。
- availablePermits()
 
这个方法用来查询可用许可证的数量,返回一个整型的结果
4.3、信号量特殊用法
可以一次性获取或者释放多个许可证(获取和释放许可证的数量必须保持一致)
注意设置公平性,设置true更为合理
释放和获取对线程无要求,可以A获取,B释放,合理即可
4.4、信号量实例
public static void main(String[] args) {Semaphore semaphore = new Semaphore(3,true);ExecutorService executorService = Executors.newFixedThreadPool(10);Runnable runnable = () -> {try {semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "线程开始运行");try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "线程运行完毕");semaphore.release();};IntStream.range(0,9).forEach(e -> executorService.submit(runnable));}
输出
pool-1-thread-2线程开始运行pool-1-thread-1线程开始运行pool-1-thread-3线程开始运行pool-1-thread-2线程运行完毕pool-1-thread-1线程运行完毕pool-1-thread-3线程运行完毕pool-1-thread-5线程开始运行pool-1-thread-4线程开始运行pool-1-thread-6线程开始运行pool-1-thread-6线程运行完毕pool-1-thread-7线程开始运行pool-1-thread-4线程运行完毕pool-1-thread-5线程运行完毕pool-1-thread-8线程开始运行pool-1-thread-9线程开始运行pool-1-thread-9线程运行完毕pool-1-thread-8线程运行完毕pool-1-thread-7线程运行完毕
5、Condition接口
5.1、简介
作用:
当线程1需要等待某个条件才能继续运行的时候,执行condition.await()方法,线程进入阻塞状态。线程2在某个合适的时机执行condition.signal()方法,jvm从阻塞线程中找到等待该condition的线程,唤醒,变为可执行状态
5.2、主要方法
- signal()
 
唤醒一个等待时间最长的线程
- signalAll()
 
唤醒所有等待该condition的线程
5.3、实例
5.3.1、基本用法
public class ConditionDemo {private ReentrantLock lock = new ReentrantLock();private Condition condition = lock.newCondition();void method1() throws InterruptedException {lock.lock();try{System.out.println("条件不满足,开始等待!");condition.await();System.out.println("条件满足,继续运行!");}finally {lock.unlock();}}void method2() throws InterruptedException {lock.lock();try{System.out.println("准备工作完成,唤醒等待线程!");condition.signal();}finally {lock.unlock();}}public static void main(String[] args) throws InterruptedException {ConditionDemo conditionDemo = new ConditionDemo();new Thread(() -> {try {Thread.sleep(100);conditionDemo.method2();} catch (InterruptedException e) {e.printStackTrace();}}).start();conditionDemo.method1();}}
输出
条件不满足,开始等待!准备工作完成,唤醒等待线程!条件满足,继续运行!
5.3.2、condition实现生产者消费者模式
public class ConditionDemo2 {final Integer QUEUE_SIZE = 10;private Queue<Integer> queue = new PriorityQueue<>(QUEUE_SIZE);private Lock lock = new ReentrantLock();Condition notFull = lock.newCondition();Condition notEmpty = lock.newCondition();class Producer implements Runnable{@Overridepublic void run() {producer();}private void producer() {while (true){lock.lock();try {if(queue.size() == QUEUE_SIZE){System.out.println("队列满,生产者停止生产!");notFull.await();}queue.add(new Random().nextInt());System.out.println("生产者生产一个数据,队列还有" + queue.size() + "个数据");Thread.sleep(10);notEmpty.signalAll();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}}class Consumer implements Runnable{@Overridepublic void run() {consumer();}private void consumer() {while (true){lock.lock();try {if(queue.size() == 0){System.out.println("队列空,消费者停止消费!");notEmpty.await();}Thread.sleep(10);queue.poll();System.out.println("消费者消费一个数据!队列还有 " + queue.size() + "个数据");notFull.signalAll();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}}public static void main(String[] args) {ConditionDemo2 conditionDemo2 = new ConditionDemo2();Consumer consumer = conditionDemo2.new Consumer();Producer producer = conditionDemo2.new Producer();new Thread(producer).start();new Thread(consumer).start();}}
6、CyclicBarrier循环栅栏
6.1、 简介
CyclicBarrier可以构造一个集合点,当一个线程执行完毕,就会到集合点等待,当集合点到达指定数量的线程后,栅栏撤销,所有线程统一出发。CyclicBarrier可以重复使用
对应实际场景:坐大巴车,等到达到大巴人数后发车
6.2、实例
public class CyclicBarrierDemo {static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() ->{System.out.println("运动员达到五人次组,开始比赛");});static class Task implements Runnable{@Overridepublic void run() {System.out.println("运动员" + Thread.currentThread().getName() + " 开始出发!");try {Thread.sleep((long)(Math.random() * 1000));System.out.println("运动员" + Thread.currentThread().getName() + " 到达目的地!");cyclicBarrier.await();System.out.println("运动员" + Thread.currentThread().getName() + " 开始比赛!");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(10);IntStream.range(0,10).forEach(e -> executorService.submit(new Task()));}}
输出
运动员pool-1-thread-3 开始出发!运动员pool-1-thread-4 开始出发!运动员pool-1-thread-7 开始出发!运动员pool-1-thread-1 开始出发!运动员pool-1-thread-2 开始出发!运动员pool-1-thread-5 开始出发!运动员pool-1-thread-8 开始出发!运动员pool-1-thread-6 开始出发!运动员pool-1-thread-9 开始出发!运动员pool-1-thread-10 开始出发!运动员pool-1-thread-7 到达目的地!运动员pool-1-thread-2 到达目的地!运动员pool-1-thread-10 到达目的地!运动员pool-1-thread-3 到达目的地!运动员pool-1-thread-6 到达目的地!运动员达到五人次组,开始比赛运动员pool-1-thread-6 开始比赛!运动员pool-1-thread-7 开始比赛!运动员pool-1-thread-2 开始比赛!运动员pool-1-thread-3 开始比赛!运动员pool-1-thread-10 开始比赛!运动员pool-1-thread-1 到达目的地!运动员pool-1-thread-5 到达目的地!运动员pool-1-thread-9 到达目的地!运动员pool-1-thread-4 到达目的地!运动员pool-1-thread-8 到达目的地!运动员达到五人次组,开始比赛运动员pool-1-thread-8 开始比赛!运动员pool-1-thread-4 开始比赛!运动员pool-1-thread-9 开始比赛!运动员pool-1-thread-5 开始比赛!运动员pool-1-thread-1 开始比赛!
6.3、CyclicBarrier和CountDownLatch的区别
- 作用对象不同
 
CyclicBarrier是用于线程的,而CountDownLatch是用于事件的
- 可重用性不同
 
CountDownLatch倒数到0就不能使用了,CyclicBarrier可以重复使用
- CyclicBarrier可以利用runnable进行额外处理
 
