队列

阻塞队列

  1. 阻塞队列有没有好的一面?
  2. 不得不阻塞,你如何管理?

定义

阻塞队列,首先是一个队列,而一个阻塞队列在数据结构中所起的作用大概如图所示:
image.png

  • 当阻塞队列是空的时候,从队列中获取元素的操作将会被阻塞
  • 当阻塞队列是满的时候,往队列中添加元素的操作将会被阻塞
  • 试图从空的阻塞队列中获取元素将会被阻塞,直到其他线程往空的队列插入显得元素,同比插入元素。

    什么好处?

    在多线程领域,所谓阻塞,在某些情况下会挂起线程,即阻塞,一旦条件满足,被挂起的线程又会自动被唤醒。
    为什么需要阻塞队列?

    不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切被BlockingQueue一手包办了。也不需要考虑效率和线程安全。

架构梳理和种类分析

  • 红色重点关注 | 分类 | 说明 | | —- | —- | | ArrayBlockingQueue | 是一个基于数组结构的有界阻塞队列,此队列按FIFO原则对元素进行操作。 | | LinkedBlockingQueue | 基于链表结构的有界阻塞队列,按照FIFO排序元素,吞吐量通常高于ArrayBlockingQueue.但是大小是Integer.MAX_VALUE | | PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 | | DelayQueue | 使用优先级队列实现的延迟无界阻塞队列 | | LinkedTransferQueue | 由链表结构组成的无界阻塞队列 | | SynchrobousBlockingQueue | 一种不存储元素的阻塞队列,每个插入操作必须等待另外一个线程调用移除操作,否则插入操作一直处于阻塞状态 | | LinkedBlockingDueue | 由链表结构组成的双向阻塞队列。 |

核心方法

image.png
image.png

element检查队首元素是谁。队列先进先出,队首元素。

  1. package com.interview.demo;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.SynchronousQueue;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. * @Author leijs
  7. * @date 2022/3/29
  8. */
  9. public class SynchronousQueueDemo {
  10. public static void main(String[] args) {
  11. BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
  12. new Thread(() -> {
  13. try {
  14. System.out.println(Thread.currentThread().getName() + "生产a");
  15. blockingQueue.put("a");
  16. System.out.println(Thread.currentThread().getName() + "生产b");
  17. blockingQueue.put("b");
  18. System.out.println(Thread.currentThread().getName() + "生产c");
  19. blockingQueue.put("c");
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }, "A").start();
  24. new Thread(() -> {
  25. try {
  26. System.out.println(Thread.currentThread().getName() + "拿走a");
  27. blockingQueue.take();
  28. TimeUnit.SECONDS.sleep(2);
  29. System.out.println(Thread.currentThread().getName() + "拿走b");
  30. blockingQueue.take();
  31. TimeUnit.SECONDS.sleep(2);
  32. System.out.println(Thread.currentThread().getName() + "拿走c");
  33. blockingQueue.take();
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. }, "B").start();
  38. }
  39. }

生产者消费者模式

传统版本

  1. package com.interview.demo.producer_consumer;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. /**
  6. * 一个初始值为0的变量,两个线程交替操作,一个加1,一个减1,来5轮
  7. *
  8. * @Author leijs
  9. * @date 2022/3/29
  10. */
  11. public class ProdConsumerTraditionDemo {
  12. public static void main(String[] args) {
  13. ShareData shareData = new ShareData();
  14. new Thread(() -> {
  15. for (int i = 0; i < 5; i++) {
  16. shareData.increment();
  17. }
  18. }, "t1").start();
  19. new Thread(() -> {
  20. for (int i = 0; i < 5; i++) {
  21. shareData.decrement();
  22. }
  23. }, "t2").start();
  24. }
  25. }
  26. class ShareData {
  27. private int num = 0;
  28. private Lock lock = new ReentrantLock();
  29. private Condition condition = lock.newCondition();
  30. public void increment() {
  31. lock.lock();
  32. try {
  33. while (num != 0) {
  34. // 等待,不能生产
  35. condition.await();
  36. }
  37. num++;
  38. System.out.println(Thread.currentThread().getName() + ":" + num);
  39. condition.signalAll();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. } finally {
  43. lock.unlock();
  44. }
  45. }
  46. public void decrement() {
  47. lock.lock();
  48. try {
  49. // 注意多线程的判断不能用if只能使用while
  50. while (num == 0) {
  51. // 等待,不能生产
  52. condition.await();
  53. }
  54. num--;
  55. System.out.println(Thread.currentThread().getName() + ":" + num);
  56. condition.signalAll();
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. } finally {
  60. lock.unlock();
  61. }
  62. }
  63. }

BlockingQueue实现

  1. package com.interview.demo.producer_consumer;
  2. import org.apache.commons.lang3.StringUtils;
  3. import java.util.concurrent.ArrayBlockingQueue;
  4. import java.util.concurrent.BlockingQueue;
  5. import java.util.concurrent.TimeUnit;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. /**
  8. * @Author leijs
  9. * @date 2022/3/29
  10. */
  11. public class ProdConsumerBlockingQueueDemo {
  12. public static void main(String[] args) throws InterruptedException {
  13. MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
  14. new Thread(() -> {
  15. try {
  16. myResource.produce();
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }, "producer").start();
  21. new Thread(() -> {
  22. try {
  23. myResource.consumer();
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. }
  27. }, "consumer").start();
  28. TimeUnit.SECONDS.sleep(5);
  29. System.out.println("主线程叫停了");
  30. myResource.stop();
  31. }
  32. }
  33. class MyResource {
  34. /**
  35. * 默认开启进行生产+消费
  36. */
  37. private volatile boolean FLAG = true;
  38. private AtomicInteger atomicInteger = new AtomicInteger();
  39. BlockingQueue<String> blockingQueue = null;
  40. public MyResource(BlockingQueue<String> blockingQueue) {
  41. this.blockingQueue = blockingQueue;
  42. System.out.println(blockingQueue.getClass().getName());
  43. }
  44. public void produce() throws Exception {
  45. String data;
  46. boolean returnValue;
  47. while (FLAG) {
  48. data = atomicInteger.incrementAndGet() + "";
  49. returnValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
  50. if (returnValue) {
  51. System.out.println(Thread.currentThread().getName() + "插入队列成功" + data);
  52. } else {
  53. System.out.println(Thread.currentThread().getName() + "插入队列失败" + data);
  54. }
  55. TimeUnit.SECONDS.sleep(1);
  56. }
  57. System.out.println("生产动作结束,flag=false");
  58. }
  59. public void consumer() throws Exception {
  60. String data;
  61. while (FLAG) {
  62. data = blockingQueue.poll(2L, TimeUnit.SECONDS);
  63. if (StringUtils.isEmpty(data)) {
  64. FLAG = false;
  65. System.out.println(Thread.currentThread().getName() + "超过两秒钟没有获取数据,退出");
  66. return;
  67. }
  68. System.out.println(Thread.currentThread().getName() + "消费队列成功:" + data);
  69. }
  70. System.out.println("消费动作结束");
  71. }
  72. public void stop() {
  73. this.FLAG = false;
  74. }
  75. }

扩展

Synchronized和Lock的区别

  1. synchronized是关键字,属于JVM层面。

    1. monitorenter(底层是通过Monitor对象来完成的,其实wait/notify)等方法也依赖monitor对象,只有在同步代表快中才能调用wait/notify等方法<br /> monitorexit<br /> Lock是具体类,是API层面的
  2. 使用方法

    1. synchronized 不需要手动改释放锁,代码执行完系统会自动让线程释放对锁的占用。<br /> ReentrantLock则需要用户去手动释放锁,如果没有主动释放锁,就可能出现死锁现象<br /> lockunlock方法配合try...finally来完成
  3. 等待是否可中断

    1. synchronized 是不可中断的,除非抛出异常或者正常运行完成<br /> lock可中断。可以设置超时方法或者lockInterruptibly
  4. 加锁是否公平

    1. synchronized 只能是非公平锁<br /> lock两者都可以,构造函数传入true/false
  5. 锁绑定多个对象Condition

    1. Synchronized没有<br /> Reentrant用来实现分组唤醒需要唤醒的线程们,可以精确唤醒,而不是像Synchronized那样随机唤醒一个线程或者唤醒全部线程。

多个线程按照顺序打印

  1. package com.interview.demo;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. /**
  6. * A 打印1次,B打印2次,C打印3次
  7. * 循环5次
  8. *
  9. * @Author leijs
  10. * @date 2022/3/29
  11. */
  12. public class SyncAndReentrantLockDemo {
  13. public static void main(String[] args) {
  14. ShareData shareData = new ShareData();
  15. new Thread(() -> {
  16. for (int i = 0; i < 5; i++) {
  17. shareData.print1();
  18. }
  19. }, "A").start();
  20. new Thread(() -> {
  21. for (int i = 0; i < 5; i++) {
  22. shareData.print2();
  23. }
  24. }, "B").start();
  25. new Thread(() -> {
  26. for (int i = 0; i < 5; i++) {
  27. shareData.print3();
  28. }
  29. }, "C").start();
  30. }
  31. }
  32. class ShareData {
  33. // A:1, B:2, C:3
  34. private int num = 1;
  35. // 判断 -> 干活 -> 通知唤醒
  36. private Lock lock = new ReentrantLock();
  37. private Condition c1 = lock.newCondition();
  38. private Condition c2 = lock.newCondition();
  39. private Condition c3 = lock.newCondition();
  40. public void print1() {
  41. lock.lock();
  42. try {
  43. while (num != 1) {
  44. c1.await();
  45. }
  46. printByTimes(1);
  47. num = 2;
  48. c2.signal();
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. } finally {
  52. lock.unlock();
  53. }
  54. }
  55. public void print2() {
  56. lock.lock();
  57. try {
  58. while (num != 2) {
  59. c2.await();
  60. }
  61. printByTimes(2);
  62. num = 3;
  63. c3.signal();
  64. } catch (InterruptedException e) {
  65. e.printStackTrace();
  66. } finally {
  67. lock.unlock();
  68. }
  69. }
  70. public void print3() {
  71. lock.lock();
  72. try {
  73. while (num != 3) {
  74. c3.await();
  75. }
  76. printByTimes(3);
  77. num = 1;
  78. c1.signal();
  79. } catch (InterruptedException e) {
  80. e.printStackTrace();
  81. } finally {
  82. lock.unlock();
  83. }
  84. }
  85. private void printByTimes(int times) {
  86. for (int i = 0; i < times; i++) {
  87. System.out.println(Thread.currentThread().getName() + "打印" + i);
  88. }
  89. }
  90. }

callable

  1. package com.interview.demo;
  2. import java.util.concurrent.Callable;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.FutureTask;
  5. /**
  6. * @Author leijs
  7. * @date 2022/3/29
  8. */
  9. public class CallableDemo {
  10. public static void main(String[] args) throws ExecutionException, InterruptedException {
  11. FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());
  12. Thread thread = new Thread(futureTask, "AA");
  13. thread.start();
  14. System.out.println("*********** result:" + futureTask.get());
  15. }
  16. }
  17. class MyThread implements Callable<Integer> {
  18. @Override
  19. public Integer call() throws Exception {
  20. return 1024;
  21. }
  22. }

futureTask.get()建议放在最后。如果没有计算完成就要去强求,就会导致阻塞,值得计算完成。