如何控制并发流程
什么是控制并发流程
控制并发流程的工具类,作用就是帮助我们程序员让线程之间合作
让线程之间相互配合,来满足业务逻辑
比如让线程A等待线程B执行完毕后再执行等合作策略
常用工具类

CountDownLatch
作用
倒数门闸
例子:购物拼团;大巴,人满发车
流程:倒数结束之前,一直处于等待状态,直到倒计时结束了,此线程才继续工作
方法
CountDownLatch(int count):仅有一个构造函数,参数count为需要倒数的数值
awati():调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
countDown():将count值减一,直到为0时,等待的线程会被唤起

用法
- 一个线程等待多个线程执行完毕,再继续自己的工作
- 多个线程等待某一个线程的信号,同时开始执行
注意点
- 扩展用法:多个线程等多个线程完成执行后,再同时执行
- CountDownLatch是不能够被重用的,如果需要重新记数,可以考虑适用CyclicBarrier或者创建新的CountDownLatch实例
demo
/*** 描述: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。*/public class CountDownLatchDemo2 {public static void main(String[] args) throws InterruptedException {CountDownLatch begin = new CountDownLatch(1);ExecutorService service = Executors.newFixedThreadPool(5);for (int i = 0; i < 5; i++) {final int no = i + 1;Runnable runnable = new Runnable() {@Overridepublic void run() {System.out.println("No." + no + "准备完毕,等待发令枪");try {begin.await();System.out.println("No." + no + "开始跑步了");} catch (InterruptedException e) {e.printStackTrace();}}};service.submit(runnable);}//裁判员检查发令枪...Thread.sleep(5000);System.out.println("发令枪响,比赛开始!");begin.countDown();}}
semaphore信号量
作用
可以用来限制或管理数量有限资源的使用情况
信号量的作用是维护一个“许可证”的计数,线程可以获取许可证,那信号量剩余的许可证就减一,线程也可以释放一个许可证,那信号量剩余的许可证数量就加一,当信号量所拥有的许可证数量为0时,那么下一个想要获取许可证的线程,就需要等待,直到另外的线程释放了许可证

流程
- 初始化Semaphore并指定许可证的数量
- 在需要被现在的代码前加acquire()或者acquireUninterruptibly()方法
- 在任务执行结束后,调用release来释放许可证
方法
new Semaphore(int permits,boolean fair):这里可以设置是否需要公平策略,fair传入true,将按顺序分发许可证
acquire():获取证书,不响应中断
acquireUninterruptibly():获取证书,响应中断
tryAcquire():尝试获取
release():归还许可证用法
限流
一次性获取或释放多个许可证注意点
获取和释放的许可证数量必须一致
设置为公平锁,更合理
获取和释放许可证对线程并无要求,也许是A获取了,然后由B释放,只要逻辑合理即可
信号量的作用,除了控制临界区最多同时有N个线程访问外,另一个作用是可以实现“条件等待”,相当于轻量级的CountDownLatch
demo
/*** 描述: 演示Semaphore用法*/public class SemaphoreDemo {static Semaphore semaphore = new Semaphore(5, true);public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(50);for (int i = 0; i < 100; i++) {service.submit(new Task());}service.shutdown();}static class Task implements Runnable {@Overridepublic void run() {try {semaphore.acquire(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "拿到了许可证");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "释放了许可证");semaphore.release(2);}}}
Condition接口(又称为条件对象)
作用
当线程1需要等待某个条件的时候,它就去执行condition.await()方法,一旦执行了await()方法,线程就会进入阻塞状态
然后通常会有另外一个线程,假设线程2,去执行对应的条件,直到这个条件达成的时候,线程2就会去执行condition.signal()方法,这时JVM就会从被阻塞的线程里找,找到那些等待该condition的线程,当线程1就会收到可执行信号的时候,它的线程状态就会变成Runnable可执行状态

方法
signalAll()会唤起所有等待的线程
signal()是公平的,会唤起等待时间最长的线程
注意点
实际上,如果说Lock用来代替synchronized,那么condition就是用来代替相对应的Object.wait/notify的,所以用法和性质几乎一样
await会自动释放锁,和object.wait一样,不需要自己手动释放
调用await的时候,必须持有锁,否则会抛异常,和Object.wait一样
demo
/*** 描述: 演示用Condition实现生产者消费者模式*/public class ConditionDemo2 {private int queueSize = 10;private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);private Lock lock = new ReentrantLock();private Condition notFull = lock.newCondition();private Condition notEmpty = lock.newCondition();public static void main(String[] args) {ConditionDemo2 conditionDemo2 = new ConditionDemo2();Producer producer = conditionDemo2.new Producer();Consumer consumer = conditionDemo2.new Consumer();producer.start();consumer.start();}class Consumer extends Thread {@Overridepublic void run() {consume();}private void consume() {while (true) {lock.lock();try {while (queue.size() == 0) {System.out.println("队列空,等待数据");try {notEmpty.await();} catch (InterruptedException e) {e.printStackTrace();}}queue.poll();notFull.signalAll();System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");} finally {lock.unlock();}}}}class Producer extends Thread {@Overridepublic void run() {produce();}private void produce() {while (true) {lock.lock();try {while (queue.size() == queueSize) {System.out.println("队列满,等待有空余");try {notFull.await();} catch (InterruptedException e) {e.printStackTrace();}}queue.offer(1);notEmpty.signalAll();System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));} finally {lock.unlock();}}}}}
CyclicBarrier循环栅栏
CyclicBarrier和CountDownLatch类似,都能阻塞一组线程
当有大量线程相互配合,分别计算不同的人,并且最后需要统一汇总时,我们可以使用CyclicBarrier。CyclicBarrier可以构造一个集结点,当某一个线程执行完毕,它就会在集结点等待,直到所有线程都到了集结点,那么该栅栏都被撤销,所有任务再统一出发,继续执行剩下的任务
生活中的例子:咱们三个明天中午在学校碰头,到齐后,一起讨论下学期计划
CyclicBarrier和CountDownLatch的区别
作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才继续执行,而CountDownLatch只需等待数字到0,也就是说,CountDownLatch用于事件,但CyclicBarrier用于线程
可重用性不同:CountDownLatch在倒数到0后并触发门闸打开后,就不能再次使用,除非新的实例;CyclicBarrier可以重复使用
CyclicBarrier在都到了后,可以运行一个runnable,去做一些统一的工作
demo
/*** 描述: 演示CyclicBarrier*/public class CyclicBarrierDemo {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {@Overridepublic void run() {System.out.println("所有人都到场了, 大家统一出发!");}});for (int i = 0; i < 10; i++) {new Thread(new Task(i, cyclicBarrier)).start();}}static class Task implements Runnable{private int id;private CyclicBarrier cyclicBarrier;public Task(int id, CyclicBarrier cyclicBarrier) {this.id = id;this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {System.out.println("线程" + id + "现在前往集合地点");try {Thread.sleep((long) (Math.random()*10000));System.out.println("线程"+id+"到了集合地点,开始等待其他人到达");cyclicBarrier.await();System.out.println("线程"+id+"出发了");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}}
