如何控制并发流程

什么是控制并发流程

控制并发流程的工具类,作用就是帮助我们程序员让线程之间合作
让线程之间相互配合,来满足业务逻辑
比如让线程A等待线程B执行完毕后再执行等合作策略

常用工具类

截屏2020-02-06下午2.45.17.png

CountDownLatch

作用

倒数门闸
例子:购物拼团;大巴,人满发车
流程:倒数结束之前,一直处于等待状态,直到倒计时结束了,此线程才继续工作

方法

CountDownLatch(int count):仅有一个构造函数,参数count为需要倒数的数值
awati():调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
countDown():将count值减一,直到为0时,等待的线程会被唤起

截屏2020-02-06下午2.56.57.png

用法

  1. 一个线程等待多个线程执行完毕,再继续自己的工作
  2. 多个线程等待某一个线程的信号,同时开始执行

注意点

  1. 扩展用法:多个线程等多个线程完成执行后,再同时执行
  2. CountDownLatch是不能够被重用的,如果需要重新记数,可以考虑适用CyclicBarrier或者创建新的CountDownLatch实例

demo

  1. /**
  2. * 描述: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。
  3. */
  4. public class CountDownLatchDemo2 {
  5. public static void main(String[] args) throws InterruptedException {
  6. CountDownLatch begin = new CountDownLatch(1);
  7. ExecutorService service = Executors.newFixedThreadPool(5);
  8. for (int i = 0; i < 5; i++) {
  9. final int no = i + 1;
  10. Runnable runnable = new Runnable() {
  11. @Override
  12. public void run() {
  13. System.out.println("No." + no + "准备完毕,等待发令枪");
  14. try {
  15. begin.await();
  16. System.out.println("No." + no + "开始跑步了");
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. };
  22. service.submit(runnable);
  23. }
  24. //裁判员检查发令枪...
  25. Thread.sleep(5000);
  26. System.out.println("发令枪响,比赛开始!");
  27. begin.countDown();
  28. }
  29. }

semaphore信号量

作用

可以用来限制或管理数量有限资源的使用情况
信号量的作用是维护一个“许可证”的计数,线程可以获取许可证,那信号量剩余的许可证就减一,线程也可以释放一个许可证,那信号量剩余的许可证数量就加一,当信号量所拥有的许可证数量为0时,那么下一个想要获取许可证的线程,就需要等待,直到另外的线程释放了许可证

截屏2020-02-06下午3.22.04.png

流程

  1. 初始化Semaphore并指定许可证的数量
  2. 在需要被现在的代码前加acquire()或者acquireUninterruptibly()方法
  3. 在任务执行结束后,调用release来释放许可证

    方法

    new Semaphore(int permits,boolean fair):这里可以设置是否需要公平策略,fair传入true,将按顺序分发许可证
    acquire():获取证书,不响应中断
    acquireUninterruptibly():获取证书,响应中断
    tryAcquire():尝试获取
    release():归还许可证

    用法

    限流
    一次性获取或释放多个许可证

    注意点

    获取和释放的许可证数量必须一致
    设置为公平锁,更合理
    获取和释放许可证对线程并无要求,也许是A获取了,然后由B释放,只要逻辑合理即可
    信号量的作用,除了控制临界区最多同时有N个线程访问外,另一个作用是可以实现“条件等待”,相当于轻量级的CountDownLatch

demo

  1. /**
  2. * 描述: 演示Semaphore用法
  3. */
  4. public class SemaphoreDemo {
  5. static Semaphore semaphore = new Semaphore(5, true);
  6. public static void main(String[] args) {
  7. ExecutorService service = Executors.newFixedThreadPool(50);
  8. for (int i = 0; i < 100; i++) {
  9. service.submit(new Task());
  10. }
  11. service.shutdown();
  12. }
  13. static class Task implements Runnable {
  14. @Override
  15. public void run() {
  16. try {
  17. semaphore.acquire(3);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. System.out.println(Thread.currentThread().getName() + "拿到了许可证");
  22. try {
  23. Thread.sleep(2000);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. System.out.println(Thread.currentThread().getName() + "释放了许可证");
  28. semaphore.release(2);
  29. }
  30. }
  31. }

Condition接口(又称为条件对象)

作用

当线程1需要等待某个条件的时候,它就去执行condition.await()方法,一旦执行了await()方法,线程就会进入阻塞状态
然后通常会有另外一个线程,假设线程2,去执行对应的条件,直到这个条件达成的时候,线程2就会去执行condition.signal()方法,这时JVM就会从被阻塞的线程里找,找到那些等待该condition的线程,当线程1就会收到可执行信号的时候,它的线程状态就会变成Runnable可执行状态

截屏2020-02-06下午3.55.28.png

方法

signalAll()会唤起所有等待的线程
signal()是公平的,会唤起等待时间最长的线程

注意点

实际上,如果说Lock用来代替synchronized,那么condition就是用来代替相对应的Object.wait/notify的,所以用法和性质几乎一样
await会自动释放锁,和object.wait一样,不需要自己手动释放
调用await的时候,必须持有锁,否则会抛异常,和Object.wait一样

demo

  1. /**
  2. * 描述: 演示用Condition实现生产者消费者模式
  3. */
  4. public class ConditionDemo2 {
  5. private int queueSize = 10;
  6. private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
  7. private Lock lock = new ReentrantLock();
  8. private Condition notFull = lock.newCondition();
  9. private Condition notEmpty = lock.newCondition();
  10. public static void main(String[] args) {
  11. ConditionDemo2 conditionDemo2 = new ConditionDemo2();
  12. Producer producer = conditionDemo2.new Producer();
  13. Consumer consumer = conditionDemo2.new Consumer();
  14. producer.start();
  15. consumer.start();
  16. }
  17. class Consumer extends Thread {
  18. @Override
  19. public void run() {
  20. consume();
  21. }
  22. private void consume() {
  23. while (true) {
  24. lock.lock();
  25. try {
  26. while (queue.size() == 0) {
  27. System.out.println("队列空,等待数据");
  28. try {
  29. notEmpty.await();
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. queue.poll();
  35. notFull.signalAll();
  36. System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
  37. } finally {
  38. lock.unlock();
  39. }
  40. }
  41. }
  42. }
  43. class Producer extends Thread {
  44. @Override
  45. public void run() {
  46. produce();
  47. }
  48. private void produce() {
  49. while (true) {
  50. lock.lock();
  51. try {
  52. while (queue.size() == queueSize) {
  53. System.out.println("队列满,等待有空余");
  54. try {
  55. notFull.await();
  56. } catch (InterruptedException e) {
  57. e.printStackTrace();
  58. }
  59. }
  60. queue.offer(1);
  61. notEmpty.signalAll();
  62. System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
  63. } finally {
  64. lock.unlock();
  65. }
  66. }
  67. }
  68. }
  69. }

CyclicBarrier循环栅栏

CyclicBarrier和CountDownLatch类似,都能阻塞一组线程
当有大量线程相互配合,分别计算不同的人,并且最后需要统一汇总时,我们可以使用CyclicBarrier。CyclicBarrier可以构造一个集结点,当某一个线程执行完毕,它就会在集结点等待,直到所有线程都到了集结点,那么该栅栏都被撤销,所有任务再统一出发,继续执行剩下的任务
生活中的例子:咱们三个明天中午在学校碰头,到齐后,一起讨论下学期计划

CyclicBarrier和CountDownLatch的区别

作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才继续执行,而CountDownLatch只需等待数字到0,也就是说,CountDownLatch用于事件,但CyclicBarrier用于线程
可重用性不同:CountDownLatch在倒数到0后并触发门闸打开后,就不能再次使用,除非新的实例;CyclicBarrier可以重复使用
CyclicBarrier在都到了后,可以运行一个runnable,去做一些统一的工作

demo

  1. /**
  2. * 描述: 演示CyclicBarrier
  3. */
  4. public class CyclicBarrierDemo {
  5. public static void main(String[] args) {
  6. CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
  7. @Override
  8. public void run() {
  9. System.out.println("所有人都到场了, 大家统一出发!");
  10. }
  11. });
  12. for (int i = 0; i < 10; i++) {
  13. new Thread(new Task(i, cyclicBarrier)).start();
  14. }
  15. }
  16. static class Task implements Runnable{
  17. private int id;
  18. private CyclicBarrier cyclicBarrier;
  19. public Task(int id, CyclicBarrier cyclicBarrier) {
  20. this.id = id;
  21. this.cyclicBarrier = cyclicBarrier;
  22. }
  23. @Override
  24. public void run() {
  25. System.out.println("线程" + id + "现在前往集合地点");
  26. try {
  27. Thread.sleep((long) (Math.random()*10000));
  28. System.out.println("线程"+id+"到了集合地点,开始等待其他人到达");
  29. cyclicBarrier.await();
  30. System.out.println("线程"+id+"出发了");
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. } catch (BrokenBarrierException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  38. }