1、什么是控制并发流程
让线程之间相互配合,来满足业务逻辑
2、常见的并发流程工具类
类 | 作用 | 说明 |
---|---|---|
Semaphore | 信号量,可以控制“许可证”数量,来保证线程之间的配合 | 线程只有拿到许可证之后才能继续运行。相比于其他同步器,更灵活 |
CyclicBarrier | 线程会等待,直到足够多的线程达到了事先规定的数目,一旦达到触发条件,就可以进行下一步动作 | 适用于线程之间相互等待处理结果就绪的场景 |
Phaser | 和CyclicBarrier类似,但是计数可变 | java7加入 |
CountDownlatch | 和CyclicBarrier类似,数量递减到0时,触发动作 | 不可重复使用 |
Exchanger | 让两个线程在合适时交换对象 | 适用场景:当两个线程工作在同一个类下的不同实例中,用于交换数据 |
Condition | 可以控制线程的等待和唤醒 | 是Object.wait()的升级版 |
3、CountDownLatch类的作用
- 用法1:一个线程等待多个线程都执行完毕,再继续自己的工作
- 用法2:多个线程等待某一个线程的信号,同时执行
3.1、用法一实例
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(5);
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("子线程正在执行");
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程执行完毕!");
latch.countDown();
}
};
IntStream.range(0,5).forEach(e -> executorService.submit(runnable));
System.out.println("主线程阻塞,等待子线程运行完毕!");
latch.await();
System.out.println("子线程运行完毕,主线程继续运行");
Thread.sleep(500);
executorService.shutdownNow();
}
输出
子线程正在执行
子线程正在执行
子线程正在执行
子线程正在执行
子线程正在执行
主线程阻塞,等待子线程运行完毕!
子线程执行完毕!
子线程执行完毕!
子线程执行完毕!
子线程执行完毕!
子线程执行完毕!
子线程运行完毕,主线程继续运行
3.2、用法二实例
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(1);
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("线程准备完毕,等待指令!");
try {
latch.await();
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程执行完毕!");
}
};
IntStream.range(0,5).forEach(e -> executorService.submit(runnable));
Thread.sleep(50);
System.out.println("发送指令,开始运行");
latch.countDown();
Thread.sleep(500);
executorService.shutdownNow();
}
输出
线程准备完毕,等待指令!
线程准备完毕,等待指令!
线程准备完毕,等待指令!
线程准备完毕,等待指令!
线程准备完毕,等待指令!
发送指令,开始运行
线程执行完毕!
线程执行完毕!
线程执行完毕!
线程执行完毕!
线程执行完毕!
4、Semaphore信号量
4.1、简介
作用:
常用于限制可以访问某些资源的线程数量,例如通过 Semaphore 限流
原理:
信号量会维护“许可证”的计数,而线程去访问共享资源前,必须先拿到许可证,线程也可以“释放”一个许可证归还给信号量。当信号量拥有的许可证数量减到 0 时,如果下个线程还想要获得许可证,那么这个线程就必须等待,直到之前得到许可证的线程释放。
4.2、主要方法
- 构造函数:public Semaphore(int permits, boolean fair)
第一个参数是许可证的数量,另一个参数是是否公平,第二个参数传入 false,则代表非公平策略,也就有可能插队
- acquire() 和 acquireUninterruptibly()
获取许可证,第一个方法可响应中断,第二个方法忽略中断 ,如果此时信号量已经没有剩余的许可证,线程会等待
release()
释放信号量
tryAcquire()
尝试获取,获取到为true
- tryAcquire(long timeout, TimeUnit unit)
如果等待期间获取到了许可证,则往下继续执行;如果超时时间到,依然获取不到许可证,它就认为获取失败,且返回 false。
- availablePermits()
这个方法用来查询可用许可证的数量,返回一个整型的结果
4.3、信号量特殊用法
可以一次性获取或者释放多个许可证(获取和释放许可证的数量必须保持一致)
注意设置公平性,设置true更为合理
释放和获取对线程无要求,可以A获取,B释放,合理即可
4.4、信号量实例
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3,true);
ExecutorService executorService = Executors.newFixedThreadPool(10);
Runnable runnable = () -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "线程开始运行");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "线程运行完毕");
semaphore.release();
};
IntStream.range(0,9).forEach(e -> executorService.submit(runnable));
}
输出
pool-1-thread-2线程开始运行
pool-1-thread-1线程开始运行
pool-1-thread-3线程开始运行
pool-1-thread-2线程运行完毕
pool-1-thread-1线程运行完毕
pool-1-thread-3线程运行完毕
pool-1-thread-5线程开始运行
pool-1-thread-4线程开始运行
pool-1-thread-6线程开始运行
pool-1-thread-6线程运行完毕
pool-1-thread-7线程开始运行
pool-1-thread-4线程运行完毕
pool-1-thread-5线程运行完毕
pool-1-thread-8线程开始运行
pool-1-thread-9线程开始运行
pool-1-thread-9线程运行完毕
pool-1-thread-8线程运行完毕
pool-1-thread-7线程运行完毕
5、Condition接口
5.1、简介
作用:
当线程1需要等待某个条件才能继续运行的时候,执行condition.await()方法,线程进入阻塞状态。线程2在某个合适的时机执行condition.signal()方法,jvm从阻塞线程中找到等待该condition的线程,唤醒,变为可执行状态
5.2、主要方法
- signal()
唤醒一个等待时间最长的线程
- signalAll()
唤醒所有等待该condition的线程
5.3、实例
5.3.1、基本用法
public class ConditionDemo {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
void method1() throws InterruptedException {
lock.lock();
try{
System.out.println("条件不满足,开始等待!");
condition.await();
System.out.println("条件满足,继续运行!");
}finally {
lock.unlock();
}
}
void method2() throws InterruptedException {
lock.lock();
try{
System.out.println("准备工作完成,唤醒等待线程!");
condition.signal();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionDemo conditionDemo = new ConditionDemo();
new Thread(() -> {
try {
Thread.sleep(100);
conditionDemo.method2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
conditionDemo.method1();
}
}
输出
条件不满足,开始等待!
准备工作完成,唤醒等待线程!
条件满足,继续运行!
5.3.2、condition实现生产者消费者模式
public class ConditionDemo2 {
final Integer QUEUE_SIZE = 10;
private Queue<Integer> queue = new PriorityQueue<>(QUEUE_SIZE);
private Lock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();
class Producer implements Runnable{
@Override
public void run() {
producer();
}
private void producer() {
while (true){
lock.lock();
try {
if(queue.size() == QUEUE_SIZE){
System.out.println("队列满,生产者停止生产!");
notFull.await();
}
queue.add(new Random().nextInt());
System.out.println("生产者生产一个数据,队列还有" + queue.size() + "个数据");
Thread.sleep(10);
notEmpty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
class Consumer implements Runnable{
@Override
public void run() {
consumer();
}
private void consumer() {
while (true){
lock.lock();
try {
if(queue.size() == 0){
System.out.println("队列空,消费者停止消费!");
notEmpty.await();
}
Thread.sleep(10);
queue.poll();
System.out.println("消费者消费一个数据!队列还有 " + queue.size() + "个数据");
notFull.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
ConditionDemo2 conditionDemo2 = new ConditionDemo2();
Consumer consumer = conditionDemo2.new Consumer();
Producer producer = conditionDemo2.new Producer();
new Thread(producer).start();
new Thread(consumer).start();
}
}
6、CyclicBarrier循环栅栏
6.1、 简介
CyclicBarrier可以构造一个集合点,当一个线程执行完毕,就会到集合点等待,当集合点到达指定数量的线程后,栅栏撤销,所有线程统一出发。CyclicBarrier可以重复使用
对应实际场景:坐大巴车,等到达到大巴人数后发车
6.2、实例
public class CyclicBarrierDemo {
static CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() ->{
System.out.println("运动员达到五人次组,开始比赛");
});
static class Task implements Runnable{
@Override
public void run() {
System.out.println("运动员" + Thread.currentThread().getName() + " 开始出发!");
try {
Thread.sleep((long)(Math.random() * 1000));
System.out.println("运动员" + Thread.currentThread().getName() + " 到达目的地!");
cyclicBarrier.await();
System.out.println("运动员" + Thread.currentThread().getName() + " 开始比赛!");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
IntStream.range(0,10).forEach(e -> executorService.submit(new Task()));
}
}
输出
运动员pool-1-thread-3 开始出发!
运动员pool-1-thread-4 开始出发!
运动员pool-1-thread-7 开始出发!
运动员pool-1-thread-1 开始出发!
运动员pool-1-thread-2 开始出发!
运动员pool-1-thread-5 开始出发!
运动员pool-1-thread-8 开始出发!
运动员pool-1-thread-6 开始出发!
运动员pool-1-thread-9 开始出发!
运动员pool-1-thread-10 开始出发!
运动员pool-1-thread-7 到达目的地!
运动员pool-1-thread-2 到达目的地!
运动员pool-1-thread-10 到达目的地!
运动员pool-1-thread-3 到达目的地!
运动员pool-1-thread-6 到达目的地!
运动员达到五人次组,开始比赛
运动员pool-1-thread-6 开始比赛!
运动员pool-1-thread-7 开始比赛!
运动员pool-1-thread-2 开始比赛!
运动员pool-1-thread-3 开始比赛!
运动员pool-1-thread-10 开始比赛!
运动员pool-1-thread-1 到达目的地!
运动员pool-1-thread-5 到达目的地!
运动员pool-1-thread-9 到达目的地!
运动员pool-1-thread-4 到达目的地!
运动员pool-1-thread-8 到达目的地!
运动员达到五人次组,开始比赛
运动员pool-1-thread-8 开始比赛!
运动员pool-1-thread-4 开始比赛!
运动员pool-1-thread-9 开始比赛!
运动员pool-1-thread-5 开始比赛!
运动员pool-1-thread-1 开始比赛!
6.3、CyclicBarrier和CountDownLatch的区别
- 作用对象不同
CyclicBarrier是用于线程的,而CountDownLatch是用于事件的
- 可重用性不同
CountDownLatch倒数到0就不能使用了,CyclicBarrier可以重复使用
- CyclicBarrier可以利用runnable进行额外处理