Java 队列
Java 中的队列有很多,例如:ArrayBlockingQueueLinkedBlockingQueuePriorityQueueDelayQueueSynchronousQueue 等,那它们的作用是什么?又是如何分类的呢?
其实 Java 中的这些队列可以从不同的维度进行分类,例如可以从阻塞和非阻塞进行分类,也可以从有界和无界进行分类,从队列的功能上进行分类,例如:优先队列、普通队列、双端队列、延迟队列等。
Java中的5大队列 - 图1
虽然重点是从功能上对队列进行解读,但其它分类也是 Java 中的重要概念,先来了解一下它们。

阻塞队列和非阻塞队列

阻塞队列(Blocking Queue)提供了可阻塞的 puttake 方法,它们与可定时的 offerpoll 是等价的。如果队列满了 put 方法会被阻塞等到有空间可用再将元素插入;如果队列是空的,那么 take 方法也会阻塞,直到有元素可用。当队列永远不会被充满时,put 方法和 take 方法就永远不会阻塞。
Java中的5大队列 - 图2
可以从队列的名称中知道此队列是否为阻塞队列,阻塞队列中包含 BlockingQueue 关键字,比如以下这些:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • …….

    阻塞队列功能演示

    接下来来演示一下当阻塞队列的容量满了之后会怎样,示例代码如下:

    1. import java.util.Date;
    2. import java.util.concurrent.ArrayBlockingQueue;
    3. public class BlockingTest {
    4. public static void main(String[] args) throws InterruptedException {
    5. // 创建一个长度为 5 的阻塞队列
    6. ArrayBlockingQueue q1 = new ArrayBlockingQueue(5);
    7. // 新创建一个线程执行入列
    8. new Thread(() -> {
    9. // 循环 10 次
    10. for (int i = 0; i < 10; i++) {
    11. try {
    12. q1.put(i);
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. System.out.println(new Date() + " | ArrayBlockingQueue Size:" + q1.size());
    17. }
    18. System.out.println(new Date() + " | For End.");
    19. }).start();
    20. // 新创建一个线程执行出列
    21. new Thread(() -> {
    22. for (int i = 0; i < 5; i++) {
    23. try {
    24. // 休眠 1S
    25. Thread.sleep(1000);
    26. } catch (InterruptedException e) {
    27. e.printStackTrace();
    28. }
    29. if (!q1.isEmpty()) {
    30. try {
    31. q1.take(); // 出列
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. }
    36. }
    37. }).start();
    38. }
    39. }

    以上代码的执行结果如下:

    Mon Oct 19 20:16:12 CST 2020 | ArrayBlockingQueue Size:1 Mon Oct 19 20:16:12 CST 2020 | ArrayBlockingQueue Size:2 Mon Oct 19 20:16:12 CST 2020 | ArrayBlockingQueue Size:3 Mon Oct 19 20:16:12 CST 2020 | ArrayBlockingQueue Size:4 Mon Oct 19 20:16:12 CST 2020 | ArrayBlockingQueue Size:5 Mon Oct 19 20:16:13 CST 2020 | ArrayBlockingQueue Size:5 Mon Oct 19 20:16:14 CST 2020 | ArrayBlockingQueue Size:5 Mon Oct 19 20:16:15 CST 2020 | ArrayBlockingQueue Size:5 Mon Oct 19 20:16:16 CST 2020 | ArrayBlockingQueue Size:5 Mon Oct 19 20:16:17 CST 2020 | ArrayBlockingQueue Size:5 Mon Oct 19 20:16:17 CST 2020 | For End.

从上述结果可以看出,当 ArrayBlockingQueue 队列满了之后就会进入阻塞,当过了 1 秒有元素从队列中移除之后,才会将新的元素入列。

非阻塞队列

非阻塞队列也就是普通队列,它的名字中不会包含 BlockingQueue 关键字,并且它不会包含 puttake 方法,当队列满之后如果还有新元素入列会直接返回错误,并不会阻塞的等待着添加元素,如下图所示:
Java中的5大队列 - 图3
非阻塞队列的典型代表是 ConcurrentLinkedQueuePriorityQueue

有界队列和无界队列

有界队列

有界队列是指有固定大小的队列,比如设定了固定大小的 ArrayBlockingQueue,又或者大小为 0 的 SynchronousQueue
Java中的5大队列 - 图4

无界队列

无界队列指的是没有设置固定大小的队列,但其实如果没有设置固定大小也是有默认值的,只不过默认值是 Integer.MAX_VALUE,当然实际的使用中不会有这么大的容量(超过 Integer.MAX_VALUE),所以从使用者的角度来看相当于 “无界”的。
Java中的5大队列 - 图5

按功能分类

以功能来划分一下队列,它可以被分为:普通队列、优先队列、双端队列、延迟队列、其他队列等,接下来分别来看。

1.普通队列

普通队列(Queue)是指实现了先进先出的基本队列,例如 ArrayBlockingQueueLinkedBlockingQueue,其中 ArrayBlockingQueue 是用数组实现的普通队列,如下图所示:
Java中的5大队列 - 图6
LinkedBlockingQueue 是使用链表实现的普通队列,如下图所示:
Java中的5大队列 - 图7

常用方法

普通队列中的常用方法有以下这些:

  • offer():添加元素,如果队列已满直接返回 false,队列未满则直接插入并返回 true;
  • poll():删除并返回队头元素,当队列为空返回 null;
  • add():添加元素,此方法是对 offer 方法的简单封装,如果队列已满,抛出 IllegalStateException 异常;
  • remove():直接删除队头元素;
  • put():添加元素,如果队列已经满,则会阻塞等待插入;
  • take():删除并返回队头元素,当队列为空,则会阻塞等待;
  • peek():查询队头元素,但不会进行删除;
  • element():对 peek 方法进行简单封装,如果队头元素存在则取出并不删除,如果不存在抛出 NoSuchElementException 异常。 :::danger 注意:一般情况下 **offer()****poll()** 方法配合使用,**put()****take()** 阻塞方法配合使用,**add()****remove()** 方法会配合使用,程序中常用的是 **offer()****poll()** 方法,因此这两个方法比较友好,不会报错。 ::: 接下来以 LinkedBlockingQueue 为例,演示一下普通队列的使用:
    1. import java.util.concurrent.LinkedBlockingQueue;
    2. static class LinkedBlockingQueueTest {
    3. public static void main(String[] args) {
    4. LinkedBlockingQueue queue = new LinkedBlockingQueue();
    5. queue.offer("Hello");
    6. queue.offer("Java");
    7. queue.offer("Fcant");
    8. while (!queue.isEmpty()) {
    9. System.out.println(queue.poll());
    10. }
    11. }
    12. }
    以上代码的执行结果如下:

    Hello Java Fcant

2.双端队列

双端队列(Deque)是指队列的头部和尾部都可以同时入队和出队的数据结构,如下图所示:
image.png
接下来演示一下双端队列 LinkedBlockingDeque 的使用:

  1. import java.util.concurrent.LinkedBlockingDeque;
  2. /**
  3. * 双端队列示例
  4. */
  5. static class LinkedBlockingDequeTest {
  6. public static void main(String[] args) {
  7. // 创建一个双端队列
  8. LinkedBlockingDeque deque = new LinkedBlockingDeque();
  9. deque.offer("offer"); // 插入首个元素
  10. deque.offerFirst("offerFirst"); // 队头插入元素
  11. deque.offerLast("offerLast"); // 队尾插入元素
  12. while (!deque.isEmpty()) {
  13. // 从头遍历打印
  14. System.out.println(deque.poll());
  15. }
  16. }
  17. }

以上代码的执行结果如下:

offerFirst offer offerLast

3.优先队列

优先队列(PriorityQueue)是一种特殊的队列,它并不是先进先出的,而是优先级高的元素先出队。
优先队列是根据二叉堆实现的,二叉堆的数据结构如下图所示:
Java中的5大队列 - 图9
二叉堆分为两种类型:一种是最大堆一种是最小堆。以上展示的是最大堆,在最大堆中,任意一个父节点的值都大于等于它左右子节点的值。

因为优先队列是基于二叉堆实现的,因此它可以将优先级最好的元素先出队。

接下来演示一下优先队列的使用:

  1. import java.util.PriorityQueue;
  2. public class PriorityQueueTest {
  3. // 自定义的实体类
  4. static class Viper {
  5. private int id; // id
  6. private String name; // 名称
  7. private int level; // 等级
  8. public Viper(int id, String name, int level) {
  9. this.id = id;
  10. this.name = name;
  11. this.level = level;
  12. }
  13. public int getId() {
  14. return id;
  15. }
  16. public void setId(int id) {
  17. this.id = id;
  18. }
  19. public String getName() {
  20. return name;
  21. }
  22. public void setName(String name) {
  23. this.name = name;
  24. }
  25. public int getLevel() {
  26. return level;
  27. }
  28. public void setLevel(int level) {
  29. this.level = level;
  30. }
  31. }
  32. public static void main(String[] args) {
  33. PriorityQueue queue = new PriorityQueue(10, new Comparator<Viper>() {
  34. @Override
  35. public int compare(Viper v1, Viper v2) {
  36. // 设置优先级规则(倒序,等级越高权限越大)
  37. return v2.getLevel() - v1.getLevel();
  38. }
  39. });
  40. // 构建实体类
  41. Viper v1 = new Viper(1, "Java", 1);
  42. Viper v2 = new Viper(2, "MySQL", 5);
  43. Viper v3 = new Viper(3, "Redis", 3);
  44. // 入列
  45. queue.offer(v1);
  46. queue.offer(v2);
  47. queue.offer(v3);
  48. while (!queue.isEmpty()) {
  49. // 遍历名称
  50. Viper item = (Viper) queue.poll();
  51. System.out.println("Name:" + item.getName() +
  52. " Level:" + item.getLevel());
  53. }
  54. }
  55. }

以上代码的执行结果如下:

Name:MySQL Level:5 Name:Redis Level:3 Name:Java Level:1

从上述结果可以看出,优先队列的出队是不考虑入队顺序的,它始终遵循的是优先级高的元素先出队

4.延迟队列

延迟队列(DelayQueue)是基于优先队列 PriorityQueue 实现的,它可以看作是一种以时间为度量单位的优先的队列,当入队的元素到达指定的延迟时间之后方可出队。
Java中的5大队列 - 图10
来演示一下延迟队列的使用:

  1. import lombok.Getter;
  2. import lombok.Setter;
  3. import java.text.DateFormat;
  4. import java.util.Date;
  5. import java.util.concurrent.DelayQueue;
  6. import java.util.concurrent.Delayed;
  7. import java.util.concurrent.TimeUnit;
  8. public class CustomDelayQueue {
  9. // 延迟消息队列
  10. private static DelayQueue delayQueue = new DelayQueue();
  11. public static void main(String[] args) throws InterruptedException {
  12. producer(); // 调用生产者
  13. consumer(); // 调用消费者
  14. }
  15. // 生产者
  16. public static void producer() {
  17. // 添加消息
  18. delayQueue.put(new MyDelay(1000, "消息1"));
  19. delayQueue.put(new MyDelay(3000, "消息2"));
  20. }
  21. // 消费者
  22. public static void consumer() throws InterruptedException {
  23. System.out.println("开始执行时间:" +
  24. DateFormat.getDateTimeInstance().format(new Date()));
  25. while (!delayQueue.isEmpty()) {
  26. System.out.println(delayQueue.take());
  27. }
  28. System.out.println("结束执行时间:" +
  29. DateFormat.getDateTimeInstance().format(new Date()));
  30. }
  31. static class MyDelay implements Delayed {
  32. // 延迟截止时间(单位:毫秒)
  33. long delayTime = System.currentTimeMillis();
  34. // 借助 lombok 实现
  35. @Getter
  36. @Setter
  37. private String msg;
  38. /**
  39. * 初始化
  40. * @param delayTime 设置延迟执行时间
  41. * @param msg 执行的消息
  42. */
  43. public MyDelay(long delayTime, String msg) {
  44. this.delayTime = (this.delayTime + delayTime);
  45. this.msg = msg;
  46. }
  47. // 获取剩余时间
  48. @Override
  49. public long getDelay(TimeUnit unit) {
  50. return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  51. }
  52. // 队列里元素的排序依据
  53. @Override
  54. public int compareTo(Delayed o) {
  55. if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
  56. return 1;
  57. } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
  58. return -1;
  59. } else {
  60. return 0;
  61. }
  62. }
  63. @Override
  64. public String toString() {
  65. return this.msg;
  66. }
  67. }
  68. }

以上代码的执行结果如下:

开始执行时间:2020-10-20 20:17:28 消息1 消息2 结束执行时间:2020-10-20 20:17:31

从上述结束执行时间和开始执行时间可以看出,消息 1 和消息 2 都正常实现了延迟执行的功能。

5.其他队列

在 Java 的队列中有一个比较特殊的队列 SynchronousQueue,它的特别之处在于它内部没有容器,每次进行 put() 数据后(添加数据),必须等待另一个线程拿走数据后才可以再次添加数据,它的使用示例如下:

  1. import java.util.concurrent.SynchronousQueue;
  2. public class SynchronousQueueTest {
  3. public static void main(String[] args) {
  4. SynchronousQueue queue = new SynchronousQueue();
  5. // 入队
  6. new Thread(() -> {
  7. for (int i = 0; i < 3; i++) {
  8. try {
  9. System.out.println(new Date() + ",元素入队");
  10. queue.put("Data " + i);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }).start();
  16. // 出队
  17. new Thread(() -> {
  18. while (true) {
  19. try {
  20. Thread.sleep(1000);
  21. System.out.println(new Date() + ",元素出队:" + queue.take());
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }).start();
  27. }
  28. }

以上代码的执行结果如下:

Mon Oct 19 21:00:21 CST 2020,元素入队 Mon Oct 19 21:00:22 CST 2020,元素出队:Data 0 Mon Oct 19 21:00:22 CST 2020,元素入队 Mon Oct 19 21:00:23 CST 2020,元素出队:Data 1 Mon Oct 19 21:00:23 CST 2020,元素入队 Mon Oct 19 21:00:24 CST 2020,元素出队:Data 2

从上述结果可以看出,当有一个元素入队之后,只有等到另一个线程将元素出队之后,新的元素才能再次入队。

总结

Java 中的 5 种队列:普通队列、双端队列、优先队列、延迟队列、其他队列。其中普通队列的典型代表为 ArrayBlockingQueueLinkedBlockingQueue,双端队列的代表为 LinkedBlockingDeque,优先队列的代表为 PriorityQueue,延迟队列的代表为 DelayQueue,最后还讲了内部没有容器的其他队列 SynchronousQueue