1. 简介

BlockingQueue,即阻塞队列,主要用于支持一下两种操作:

  • 在获取元素时,如果队列为空,会等待其变为非空。
  • 在存入元素时,如果队列已满,会等待其变为非满。

BlockingQueue主要提供了以下方法:

  1. public interface BlockingQueueNote<E> extends Queue<E> {
  2. /**
  3. * 如果可以立即将指定的元素插入此队列,而不会违反容量限制,则在成功时返回true。
  4. * 如果当前没有可用空间,则会抛出IllegalStateException 。
  5. * 建议:当使用容量受限的队列时,通常最好使用offer 。
  6. *
  7. * @param e 需要添加的元素
  8. * @return 是否成功
  9. */
  10. boolean add(E e);
  11. /**
  12. * 如果可以立即将指定的元素插入此队列,而不会违反容量限制,则在成功时返回true。
  13. * 如果当前没有可用空间,则返回false。
  14. * 使用容量受限的队列时,该方法优于add方法 ,因为add在插入失败时会抛出异常。
  15. *
  16. * @param e 需要添加的元素
  17. * @return 是否成功
  18. */
  19. boolean offer(E e);
  20. /**
  21. * 将指定的元素插入此队列,如果队列已满,等待空间可用。
  22. *
  23. * @param e 需要添加的元素
  24. * @throws InterruptedException 等待时可以被中断
  25. */
  26. void put(E e) throws InterruptedException;
  27. /**
  28. * 将指定的元素插入此队列,如果队列已满,等待指定的等待时间以使空间可用。
  29. *
  30. * @param e 需要添加的元素
  31. * @param timeout 等待的时间
  32. * @param unit 等待的时间单位
  33. * @return 是否成功
  34. * @throws InterruptedException 等待时可以被中断
  35. */
  36. boolean offer(E e, long timeout, TimeUnit unit)
  37. throws InterruptedException;
  38. /**
  39. * 获取并删除此队列的头,如果队列为空,等待直到有元素可用。
  40. *
  41. * @return 队列头
  42. * @throws InterruptedException 等待时可以被中断
  43. */
  44. E take() throws InterruptedException;
  45. /**
  46. * 获取并删除此队列的头,如果队列为空,则等待指定的等待时间直到有元素可用。
  47. *
  48. * @param timeout 等待的时间
  49. * @param unit 等待的时间单位
  50. * @return 队列头
  51. * @throws InterruptedException 等待时可以被中断
  52. */
  53. E poll(long timeout, TimeUnit unit)
  54. throws InterruptedException;
  55. /**
  56. * 返回此队列理想情况下(在没有内存或资源约束的情况下)可以无阻塞接受的其他元素的数量;
  57. * 如果没有限制,则返回Integer.MAX_VALUE-当前容量。
  58. * <p>
  59. * 请注意,无法始终通过检查remainingCapacity容量来判断插入元素的尝试是否会成功,因为可能是另一线程即将插入或删除元素的情况
  60. *
  61. * @return the remaining capacity
  62. */
  63. int remainingCapacity();
  64. /**
  65. * 从该队列中删除指定元素(如果存在)。 如果此队列包含一个或多个这样的元素,则删除一个元素e ,使其o.equals(e) 。
  66. * 如果此队列包含指定的元素,返回true
  67. *
  68. * @return {@code true} 删除成功
  69. */
  70. boolean remove(Object o);
  71. /**
  72. * 如果该队列包含指定的元素,则返回true
  73. *
  74. * @param o 元素
  75. * @return {@code true} 是否包含钙元素
  76. */
  77. public boolean contains(Object o);
  78. /**
  79. * 从此队列中删除所有可用的元素,并将它们添加到给定的集合中。
  80. * 注意:
  81. * 1. 如果c==自己,可能会抛出IllegalArgumentException
  82. * 2. 如果将元素往c添加的时候报错了, 可能会导致元素两个集合都不在
  83. * 3. 如果操作进行过程中, c被修改了, 那这个操作的结果不一定*
  84. *
  85. * @param c 目标集合
  86. * @return 转移的元素个数
  87. */
  88. int drainTo(Collection<? super E> c);
  89. /**
  90. * 从此队列中删除指定数量的元素,并将它们添加到给定的集合中。
  91. *
  92. * @param c 目标集合
  93. * @param maxElements 需要转移的元素个数
  94. * @return 转移的元素个数
  95. */
  96. int drainTo(Collection<? super E> c, int maxElements);
  97. }

BlockingQueue 和其父接口 Queue 的方法可以按照以下表格分类:

抛出异常 特定值 阻塞 超时
添加 add(e) offer(e) put(e) offer(e, time, unit)
取出 remove() pull() take() pull(time, unit)
检查 element() peek()

2. 使用

BlockingQueuee 的设计实现主要用于生产者-消费者队列,可以看一下🌰:
该类定义了一个 BlockingQueue 作为消息队列。
Producer 负责生产消息,并且不断添加到messageQueue中,当消息队列满时,阻塞。
Consumer负责消费消息,不断从messageQueue中获取,当消息队列为空时,阻塞。

  1. public class BlockingQueueTest {
  2. public static void main(String[] args) throws InterruptedException {
  3. BlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(10);
  4. Producer producer = new Producer(messageQueue);
  5. Consumer consumer = new Consumer(messageQueue);
  6. producer.start();
  7. consumer.start();
  8. producer.join();
  9. consumer.join();
  10. }
  11. static class Producer extends Thread {
  12. private final BlockingQueue<String> messageQueue;
  13. public Producer(BlockingQueue<String> messageQueue) {
  14. this.messageQueue = messageQueue;
  15. }
  16. @Override
  17. public void run() {
  18. try {
  19. while (true) {
  20. messageQueue.put(producer());
  21. }
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. private String producer() {
  27. String message = "消息" + UUID.randomUUID().toString().substring(0, 8);
  28. System.out.println("生产消息:" + message);
  29. return message;
  30. }
  31. }
  32. static class Consumer extends Thread {
  33. private final BlockingQueue<String> messageQueue;
  34. public Consumer(BlockingQueue<String> messageQueue) {
  35. this.messageQueue = messageQueue;
  36. }
  37. @Override
  38. public void run() {
  39. try {
  40. while (true) {
  41. consumer(messageQueue.take());
  42. }
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. private void consumer(String message) {
  48. System.out.println("消费消息:" + message);
  49. }
  50. }
  51. }