ArrayBlockingQueue 是一个常用的阻塞队列,是基于数组实现的,其内部使用一个定长数组来存储元素。除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整型变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue 的添加和删除操作共用同一个锁对象,由此意味着添加和删除无法并行运行,这点不同于 LinkedBlockingQueue。
ArrayBlockingQueue在添加或删除元素时不会产生或销毁任何额外的 Node 实例,而 LinkedBlockingQueue 会生成一个额外的 Node 实例。在长时间、高并发处理大批量数据的场景中, LinkedBlockingQueue 产生的额外 Node 实例会加大系统的 GC 压力。
ArrayBlockingQueue 的基本使用
public class ArrayBlockingQueue_Test {
private static final int MAX_COUNT = 10;
// 使用 ArrayBlockingQueue 保存数据
private static BlockingQueue<String> dataList = new ArrayBlockingQueue<>(MAX_COUNT);
public static void main(String[] args) {
// 10个生产者
for (int i = 0; i < 10; i++) {
new Producer("第" + i + "个生产者").start();
}
// 2个生产者
for (int i = 0; i < 2; i++) {
new Consumer("第" + i + "个消费者").start();
}
}
private static class Producer extends Thread {
private Producer(String name) {
super(name);
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
String s = String.valueOf(new Random().nextInt(10000));
dataList.put(s);
System.out.println(Thread.currentThread().getName() + "生产了数据:" + s);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
private static class Consumer extends Thread {
private Consumer(String name) {
super(name);
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1500);
String s = dataList.remove();
System.out.println(Thread.currentThread().getName() + " 消费了:" + s);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
}
程序运行结果:
第0个生产者生产了数据:9759
第1个生产者生产了数据:6133
第2个生产者生产了数据:844
第4个生产者生产了数据:6437
第3个生产者生产了数据:4
第5个生产者生产了数据:7431
第6个生产者生产了数据:1100
第7个生产者生产了数据:8753
第9个生产者生产了数据:5596
第8个生产者生产了数据:4271
第1个消费者 消费了:9759
第0个消费者 消费了:6133
第8个生产者生产了数据:7577
第9个生产者生产了数据:8858
......
ArrayBlockingQueue 源码解析
ArrayBlockingQueue 中访问元素存在公平与非公平两种方式,通过构造器可以声明:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
}
- ArrayBlockingQueue 必须声明初始容量,初始容量不能小于 1,否则报 IllegalArgumentException 异常
- ArrayBlockingQueue 的公平与非公平访问方式是通过 ReentrantLock 来实现的
- ArrayBlockingQueue 内部给 ReentrantLock 定义了两个条件 Condition:notEmpty 和 notFull
- 添加元素时,若队列已满,则调用 notFull.await() 进行阻塞,直到被唤醒才执行添加元素的操作
- 删除元素时,若队列为空,则调用 notEmpty.await() 进行阻塞,直到被唤醒才执行删除元素的操作
- takeIndex 表示删除元素的索引,标识的是下一个方法(take、poll、peek、remove)被调用时获取数组元素的位置。putIndex 表示添加元素的索引,代表下一个方法(put、offer、add)被调用时元素添加到数组中的位置
enqueue(E) - 元素入队
private void enqueue(E x) {
final Object[] items = this.items;
// 将元素加入数组
items[putIndex] = x;
// 没有扩容,循环添加
if (++putIndex == items.length)
putIndex = 0;
// 添加成功,数组中元素数量+1
count++;
// 若之前有线程取元素因为数组为空被阻塞在Condition条件队列中,这里添加元素之后唤醒之前的等待线程
notEmpty.signal();
}
- 进入该方法表示数组未满,通过 putIndex 索引直接将元素添加到数组 items 中,然后调整 putIndex 索引值
- 当 putIndex 索引大小等于数组长度时,将 putIndex 重新设置为 0 。这里是将内部数组作为环形队列使用
- 添加元素成功,数组不再为空,唤醒之前被阻赛的消费线程
offer(E) - 非阻塞添加元素
public boolean offer(E e) {
// e==null,抛出 NullPointerException
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 自旋加锁
lock.lock();
try {
// 数组已满,添加失败并且返回false
if (count == items.length)
return false;
else {
// 元素入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
- 如果数组满了,就直接释放锁,然后返回 false
- 如果数组没满,就将元素入队(加入数组),然后返回 true
add(E) - 非阻塞添加元素
// ArrayBlockingQueue.java
public boolean add(E e) {
return super.add(e);
}
// AbstractQueue.java
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
间接调用 offer()
方法,通过该方法的返回值来执行逻辑:
offer()
方法添加成功,返回trueoffer()
方法添加失败,抛出 IllegalStateException 异常
put(E) - 阻塞添加元素
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 可中断加锁
lock.lockInterruptibly();
try {
// 数组已满,无法添加,将当前线程挂起,添加到 notFull 条件队列,等待被唤醒
while (count == items.length)
notFull.await();
// 数组还有空间,直接添加
enqueue(e);
} finally {
lock.unlock();
}
}
- 获取 lock 锁
- 如果队列元素已满,那么当前线程会被加入 notFull 条件对象的等待队列中,直到队列有空位置才会被唤醒执行添加操作
- 如果队列没有满,直接调用
enqueue(e)
方法将元素加入数组队列中 - 释放 lock 锁
注意:调用 **put()**
方法而阻塞的线程是可以中断的
dequeue() - 元素出队
private E dequeue() {
final Object[] items = this.items;
// 获取当前元素
E x = (E) items[takeIndex];
// 将当前元素置空,help GC
items[takeIndex] = null;
// 调整 takeIndex
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// 更新迭代器中的元素数据
if (itrs != null)
itrs.elementDequeued();
// 唤醒在notFull条件队列上阻塞的线程
notFull.signal();
return x;
}
- 进入 dequeue() 方法,意味着 takeIndex 位置有元素可以删除,直接获取当前元素,并将数组对应位置元素置为 null
- 将 takeIndex 位置后移(自增),移动到下一个位置,无论一个位置有没有元素都没有关系,总之移动之后的 takeIndex 新位置会是下一轮删除元素的位置
- 如果 takeIndex 自增之后值为 items.length,说明 takeIndex 的索引已到数组尽头,就将其值校正为 0,表示下一次从头部开始删除元素,达到环形队列的效果
- 删除了元素说明队列有空位,唤醒 notFull 条件等待队列中的一个 put 线程,执行添加操作
poll() - 非阻塞删除元素
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
- 若队列为空,则立即返回 null
- 若队列不为空,则获取并删除此队列的头元素
remove() - 非租塞删除特定元素
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 数据中还存在元素
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
// equals 方法判断两个元素相等
if (o.equals(items[i])) {
// 找到元素则移除
removeAt(i);
return true;
}
// 环形寻找
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
- 先判断数组中是否有数据,没有则返回 false,存在数组在进行下一步寻找
- while 循环遍历 takeIndex(对头)到 putIndex(队尾)的所有元素,通过 equals() 方法来判定两个元素是否相等。找到合适的元素在调用 removeAt(int removeIndex) 方法移除元素并返回 true,否则返回 false,表示移除元素失败
- 如果要移除的元素就在 takeIndex 位置,那么移除该元素并且需要重新计算 takeIndex 的值
- 移除对应位置的元素,并将 removeIndex 到 putIndex 之间的元素往前移动一个位置,并设置 putIndex 的值
take() - 阻塞删除元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
- 如果队列没有数据,就将线程加入 notEmpty 等待队列并阻塞线程,一直到有生产者插入数据后通过 notEmpty 发出一个消息,notEmpty 将从其等待队列唤醒一个消费(或者删除)节点,同时启动该消费线程
- 如果队列有数据,就通过
dequeue()
执行元素的删除(或消费)操作
peek() - 获取对头元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
从 takeIndex(头部位置)直接就可以获取最早被添加的元素,如果不存在就返回 null。