ArrayBlockingQueue 是一个常用的阻塞队列,是基于数组实现的,其内部使用一个定长数组来存储元素。除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整型变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue 的添加和删除操作共用同一个锁对象,由此意味着添加和删除无法并行运行,这点不同于 LinkedBlockingQueue。
ArrayBlockingQueue在添加或删除元素时不会产生或销毁任何额外的 Node 实例,而 LinkedBlockingQueue 会生成一个额外的 Node 实例。在长时间、高并发处理大批量数据的场景中, LinkedBlockingQueue 产生的额外 Node 实例会加大系统的 GC 压力。
ArrayBlockingQueue 的基本使用
public class ArrayBlockingQueue_Test {private static final int MAX_COUNT = 10;// 使用 ArrayBlockingQueue 保存数据private static BlockingQueue<String> dataList = new ArrayBlockingQueue<>(MAX_COUNT);public static void main(String[] args) {// 10个生产者for (int i = 0; i < 10; i++) {new Producer("第" + i + "个生产者").start();}// 2个生产者for (int i = 0; i < 2; i++) {new Consumer("第" + i + "个消费者").start();}}private static class Producer extends Thread {private Producer(String name) {super(name);}@Overridepublic void run() {while (true) {try {Thread.sleep(1000);String s = String.valueOf(new Random().nextInt(10000));dataList.put(s);System.out.println(Thread.currentThread().getName() + "生产了数据:" + s);} catch (InterruptedException e) {e.printStackTrace();break;}}}}private static class Consumer extends Thread {private Consumer(String name) {super(name);}@Overridepublic void run() {while (true) {try {Thread.sleep(1500);String s = dataList.remove();System.out.println(Thread.currentThread().getName() + " 消费了:" + s);} catch (InterruptedException e) {e.printStackTrace();break;}}}}}
程序运行结果:
第0个生产者生产了数据:9759第1个生产者生产了数据:6133第2个生产者生产了数据:844第4个生产者生产了数据:6437第3个生产者生产了数据:4第5个生产者生产了数据:7431第6个生产者生产了数据:1100第7个生产者生产了数据:8753第9个生产者生产了数据:5596第8个生产者生产了数据:4271第1个消费者 消费了:9759第0个消费者 消费了:6133第8个生产者生产了数据:7577第9个生产者生产了数据:8858......
ArrayBlockingQueue 源码解析
ArrayBlockingQueue 中访问元素存在公平与非公平两种方式,通过构造器可以声明:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {/** The queued items */final Object[] items;/** items index for next take, poll, peek or remove */int takeIndex;/** items index for next put, offer, or add */int putIndex;/** Number of elements in the queue */int count;/** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;public ArrayBlockingQueue(int capacity) {this(capacity, false);}public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}}
- ArrayBlockingQueue 必须声明初始容量,初始容量不能小于 1,否则报 IllegalArgumentException 异常
- ArrayBlockingQueue 的公平与非公平访问方式是通过 ReentrantLock 来实现的
- ArrayBlockingQueue 内部给 ReentrantLock 定义了两个条件 Condition:notEmpty 和 notFull
- 添加元素时,若队列已满,则调用 notFull.await() 进行阻塞,直到被唤醒才执行添加元素的操作
- 删除元素时,若队列为空,则调用 notEmpty.await() 进行阻塞,直到被唤醒才执行删除元素的操作
- takeIndex 表示删除元素的索引,标识的是下一个方法(take、poll、peek、remove)被调用时获取数组元素的位置。putIndex 表示添加元素的索引,代表下一个方法(put、offer、add)被调用时元素添加到数组中的位置

enqueue(E) - 元素入队
private void enqueue(E x) {final Object[] items = this.items;// 将元素加入数组items[putIndex] = x;// 没有扩容,循环添加if (++putIndex == items.length)putIndex = 0;// 添加成功,数组中元素数量+1count++;// 若之前有线程取元素因为数组为空被阻塞在Condition条件队列中,这里添加元素之后唤醒之前的等待线程notEmpty.signal();}
- 进入该方法表示数组未满,通过 putIndex 索引直接将元素添加到数组 items 中,然后调整 putIndex 索引值
- 当 putIndex 索引大小等于数组长度时,将 putIndex 重新设置为 0 。这里是将内部数组作为环形队列使用
- 添加元素成功,数组不再为空,唤醒之前被阻赛的消费线程
offer(E) - 非阻塞添加元素
public boolean offer(E e) {// e==null,抛出 NullPointerExceptioncheckNotNull(e);final ReentrantLock lock = this.lock;// 自旋加锁lock.lock();try {// 数组已满,添加失败并且返回falseif (count == items.length)return false;else {// 元素入队enqueue(e);return true;}} finally {lock.unlock();}}
- 如果数组满了,就直接释放锁,然后返回 false
- 如果数组没满,就将元素入队(加入数组),然后返回 true
add(E) - 非阻塞添加元素
// ArrayBlockingQueue.javapublic boolean add(E e) {return super.add(e);}// AbstractQueue.javapublic boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}
间接调用 offer() 方法,通过该方法的返回值来执行逻辑:
offer()方法添加成功,返回trueoffer()方法添加失败,抛出 IllegalStateException 异常
put(E) - 阻塞添加元素
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;// 可中断加锁lock.lockInterruptibly();try {// 数组已满,无法添加,将当前线程挂起,添加到 notFull 条件队列,等待被唤醒while (count == items.length)notFull.await();// 数组还有空间,直接添加enqueue(e);} finally {lock.unlock();}}
- 获取 lock 锁
- 如果队列元素已满,那么当前线程会被加入 notFull 条件对象的等待队列中,直到队列有空位置才会被唤醒执行添加操作
- 如果队列没有满,直接调用
enqueue(e)方法将元素加入数组队列中 - 释放 lock 锁
注意:调用 **put()** 方法而阻塞的线程是可以中断的
dequeue() - 元素出队
private E dequeue() {final Object[] items = this.items;// 获取当前元素E x = (E) items[takeIndex];// 将当前元素置空,help GCitems[takeIndex] = null;// 调整 takeIndexif (++takeIndex == items.length)takeIndex = 0;count--;// 更新迭代器中的元素数据if (itrs != null)itrs.elementDequeued();// 唤醒在notFull条件队列上阻塞的线程notFull.signal();return x;}
- 进入 dequeue() 方法,意味着 takeIndex 位置有元素可以删除,直接获取当前元素,并将数组对应位置元素置为 null
- 将 takeIndex 位置后移(自增),移动到下一个位置,无论一个位置有没有元素都没有关系,总之移动之后的 takeIndex 新位置会是下一轮删除元素的位置
- 如果 takeIndex 自增之后值为 items.length,说明 takeIndex 的索引已到数组尽头,就将其值校正为 0,表示下一次从头部开始删除元素,达到环形队列的效果
- 删除了元素说明队列有空位,唤醒 notFull 条件等待队列中的一个 put 线程,执行添加操作
poll() - 非阻塞删除元素
public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}
- 若队列为空,则立即返回 null
- 若队列不为空,则获取并删除此队列的头元素
remove() - 非租塞删除特定元素
public boolean remove(Object o) {if (o == null) return false;final Object[] items = this.items;final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 数据中还存在元素if (count > 0) {final int putIndex = this.putIndex;int i = takeIndex;do {// equals 方法判断两个元素相等if (o.equals(items[i])) {// 找到元素则移除removeAt(i);return true;}// 环形寻找if (++i == items.length)i = 0;} while (i != putIndex);}return false;} finally {lock.unlock();}}void removeAt(final int removeIndex) {final Object[] items = this.items;if (removeIndex == takeIndex) {// removing front item; just advanceitems[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();} else {final int putIndex = this.putIndex;for (int i = removeIndex;;) {int next = i + 1;if (next == items.length)next = 0;if (next != putIndex) {items[i] = items[next];i = next;} else {items[i] = null;this.putIndex = i;break;}}count--;if (itrs != null)itrs.removedAt(removeIndex);}notFull.signal();}
- 先判断数组中是否有数据,没有则返回 false,存在数组在进行下一步寻找
- while 循环遍历 takeIndex(对头)到 putIndex(队尾)的所有元素,通过 equals() 方法来判定两个元素是否相等。找到合适的元素在调用 removeAt(int removeIndex) 方法移除元素并返回 true,否则返回 false,表示移除元素失败
- 如果要移除的元素就在 takeIndex 位置,那么移除该元素并且需要重新计算 takeIndex 的值
- 移除对应位置的元素,并将 removeIndex 到 putIndex 之间的元素往前移动一个位置,并设置 putIndex 的值
take() - 阻塞删除元素
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}
- 如果队列没有数据,就将线程加入 notEmpty 等待队列并阻塞线程,一直到有生产者插入数据后通过 notEmpty 发出一个消息,notEmpty 将从其等待队列唤醒一个消费(或者删除)节点,同时启动该消费线程
- 如果队列有数据,就通过
dequeue()执行元素的删除(或消费)操作
peek() - 获取对头元素
public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}}final E itemAt(int i) {return (E) items[i];}
从 takeIndex(头部位置)直接就可以获取最早被添加的元素,如果不存在就返回 null。
