1. 简介
BlockingQueue,即阻塞队列,主要用于支持一下两种操作:
- 在获取元素时,如果队列为空,会等待其变为非空。
- 在存入元素时,如果队列已满,会等待其变为非满。
BlockingQueue主要提供了以下方法:
public interface BlockingQueueNote<E> extends Queue<E> {/*** 如果可以立即将指定的元素插入此队列,而不会违反容量限制,则在成功时返回true。* 如果当前没有可用空间,则会抛出IllegalStateException 。* 建议:当使用容量受限的队列时,通常最好使用offer 。** @param e 需要添加的元素* @return 是否成功*/boolean add(E e);/*** 如果可以立即将指定的元素插入此队列,而不会违反容量限制,则在成功时返回true。* 如果当前没有可用空间,则返回false。* 使用容量受限的队列时,该方法优于add方法 ,因为add在插入失败时会抛出异常。** @param e 需要添加的元素* @return 是否成功*/boolean offer(E e);/*** 将指定的元素插入此队列,如果队列已满,等待空间可用。** @param e 需要添加的元素* @throws InterruptedException 等待时可以被中断*/void put(E e) throws InterruptedException;/*** 将指定的元素插入此队列,如果队列已满,等待指定的等待时间以使空间可用。** @param e 需要添加的元素* @param timeout 等待的时间* @param unit 等待的时间单位* @return 是否成功* @throws InterruptedException 等待时可以被中断*/boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;/*** 获取并删除此队列的头,如果队列为空,等待直到有元素可用。** @return 队列头* @throws InterruptedException 等待时可以被中断*/E take() throws InterruptedException;/*** 获取并删除此队列的头,如果队列为空,则等待指定的等待时间直到有元素可用。** @param timeout 等待的时间* @param unit 等待的时间单位* @return 队列头* @throws InterruptedException 等待时可以被中断*/E poll(long timeout, TimeUnit unit)throws InterruptedException;/*** 返回此队列理想情况下(在没有内存或资源约束的情况下)可以无阻塞接受的其他元素的数量;* 如果没有限制,则返回Integer.MAX_VALUE-当前容量。* <p>* 请注意,无法始终通过检查remainingCapacity容量来判断插入元素的尝试是否会成功,因为可能是另一线程即将插入或删除元素的情况** @return the remaining capacity*/int remainingCapacity();/*** 从该队列中删除指定元素(如果存在)。 如果此队列包含一个或多个这样的元素,则删除一个元素e ,使其o.equals(e) 。* 如果此队列包含指定的元素,返回true** @return {@code true} 删除成功*/boolean remove(Object o);/*** 如果该队列包含指定的元素,则返回true** @param o 元素* @return {@code true} 是否包含钙元素*/public boolean contains(Object o);/*** 从此队列中删除所有可用的元素,并将它们添加到给定的集合中。* 注意:* 1. 如果c==自己,可能会抛出IllegalArgumentException* 2. 如果将元素往c添加的时候报错了, 可能会导致元素两个集合都不在* 3. 如果操作进行过程中, c被修改了, 那这个操作的结果不一定*** @param c 目标集合* @return 转移的元素个数*/int drainTo(Collection<? super E> c);/*** 从此队列中删除指定数量的元素,并将它们添加到给定的集合中。** @param c 目标集合* @param maxElements 需要转移的元素个数* @return 转移的元素个数*/int drainTo(Collection<? super E> c, int maxElements);}
BlockingQueue 和其父接口 Queue 的方法可以按照以下表格分类:
| 抛出异常 | 特定值 | 阻塞 | 超时 | |
|---|---|---|---|---|
| 添加 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 取出 | remove() | pull() | take() | pull(time, unit) |
| 检查 | element() | peek() |
2. 使用
BlockingQueuee 的设计实现主要用于生产者-消费者队列,可以看一下🌰:
该类定义了一个 BlockingQueue 作为消息队列。
Producer 负责生产消息,并且不断添加到messageQueue中,当消息队列满时,阻塞。
Consumer负责消费消息,不断从messageQueue中获取,当消息队列为空时,阻塞。
public class BlockingQueueTest {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(10);Producer producer = new Producer(messageQueue);Consumer consumer = new Consumer(messageQueue);producer.start();consumer.start();producer.join();consumer.join();}static class Producer extends Thread {private final BlockingQueue<String> messageQueue;public Producer(BlockingQueue<String> messageQueue) {this.messageQueue = messageQueue;}@Overridepublic void run() {try {while (true) {messageQueue.put(producer());}} catch (InterruptedException e) {e.printStackTrace();}}private String producer() {String message = "消息" + UUID.randomUUID().toString().substring(0, 8);System.out.println("生产消息:" + message);return message;}}static class Consumer extends Thread {private final BlockingQueue<String> messageQueue;public Consumer(BlockingQueue<String> messageQueue) {this.messageQueue = messageQueue;}@Overridepublic void run() {try {while (true) {consumer(messageQueue.take());}} catch (InterruptedException e) {e.printStackTrace();}}private void consumer(String message) {System.out.println("消费消息:" + message);}}}
