ArrayBlockingQueue 是一个常用的阻塞队列,是基于数组实现的,其内部使用一个定长数组来存储元素。除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整型变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue 的添加和删除操作共用同一个锁对象,由此意味着添加和删除无法并行运行,这点不同于 LinkedBlockingQueue。

ArrayBlockingQueue在添加或删除元素时不会产生或销毁任何额外的 Node 实例,而 LinkedBlockingQueue 会生成一个额外的 Node 实例。在长时间、高并发处理大批量数据的场景中, LinkedBlockingQueue 产生的额外 Node 实例会加大系统的 GC 压力。

ArrayBlockingQueue 的基本使用

  1. public class ArrayBlockingQueue_Test {
  2. private static final int MAX_COUNT = 10;
  3. // 使用 ArrayBlockingQueue 保存数据
  4. private static BlockingQueue<String> dataList = new ArrayBlockingQueue<>(MAX_COUNT);
  5. public static void main(String[] args) {
  6. // 10个生产者
  7. for (int i = 0; i < 10; i++) {
  8. new Producer("第" + i + "个生产者").start();
  9. }
  10. // 2个生产者
  11. for (int i = 0; i < 2; i++) {
  12. new Consumer("第" + i + "个消费者").start();
  13. }
  14. }
  15. private static class Producer extends Thread {
  16. private Producer(String name) {
  17. super(name);
  18. }
  19. @Override
  20. public void run() {
  21. while (true) {
  22. try {
  23. Thread.sleep(1000);
  24. String s = String.valueOf(new Random().nextInt(10000));
  25. dataList.put(s);
  26. System.out.println(Thread.currentThread().getName() + "生产了数据:" + s);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. break;
  30. }
  31. }
  32. }
  33. }
  34. private static class Consumer extends Thread {
  35. private Consumer(String name) {
  36. super(name);
  37. }
  38. @Override
  39. public void run() {
  40. while (true) {
  41. try {
  42. Thread.sleep(1500);
  43. String s = dataList.remove();
  44. System.out.println(Thread.currentThread().getName() + " 消费了:" + s);
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. break;
  48. }
  49. }
  50. }
  51. }
  52. }

程序运行结果:

  1. 0个生产者生产了数据:9759
  2. 1个生产者生产了数据:6133
  3. 2个生产者生产了数据:844
  4. 4个生产者生产了数据:6437
  5. 3个生产者生产了数据:4
  6. 5个生产者生产了数据:7431
  7. 6个生产者生产了数据:1100
  8. 7个生产者生产了数据:8753
  9. 9个生产者生产了数据:5596
  10. 8个生产者生产了数据:4271
  11. 1个消费者 消费了:9759
  12. 0个消费者 消费了:6133
  13. 8个生产者生产了数据:7577
  14. 9个生产者生产了数据:8858
  15. ......

ArrayBlockingQueue 源码解析

ArrayBlockingQueue 中访问元素存在公平与非公平两种方式,通过构造器可以声明:

  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
  2. /** The queued items */
  3. final Object[] items;
  4. /** items index for next take, poll, peek or remove */
  5. int takeIndex;
  6. /** items index for next put, offer, or add */
  7. int putIndex;
  8. /** Number of elements in the queue */
  9. int count;
  10. /** Main lock guarding all access */
  11. final ReentrantLock lock;
  12. /** Condition for waiting takes */
  13. private final Condition notEmpty;
  14. /** Condition for waiting puts */
  15. private final Condition notFull;
  16. public ArrayBlockingQueue(int capacity) {
  17. this(capacity, false);
  18. }
  19. public ArrayBlockingQueue(int capacity, boolean fair) {
  20. if (capacity <= 0)
  21. throw new IllegalArgumentException();
  22. this.items = new Object[capacity];
  23. lock = new ReentrantLock(fair);
  24. notEmpty = lock.newCondition();
  25. notFull = lock.newCondition();
  26. }
  27. }
  1. ArrayBlockingQueue 必须声明初始容量,初始容量不能小于 1,否则报 IllegalArgumentException 异常
  2. ArrayBlockingQueue 的公平与非公平访问方式是通过 ReentrantLock 来实现的
  3. ArrayBlockingQueue 内部给 ReentrantLock 定义了两个条件 Condition:notEmpty 和 notFull
    1. 添加元素时,若队列已满,则调用 notFull.await() 进行阻塞,直到被唤醒才执行添加元素的操作
    2. 删除元素时,若队列为空,则调用 notEmpty.await() 进行阻塞,直到被唤醒才执行删除元素的操作
  4. takeIndex 表示删除元素的索引,标识的是下一个方法(take、poll、peek、remove)被调用时获取数组元素的位置。putIndex 表示添加元素的索引,代表下一个方法(put、offer、add)被调用时元素添加到数组中的位置

image.png

enqueue(E) - 元素入队

  1. private void enqueue(E x) {
  2. final Object[] items = this.items;
  3. // 将元素加入数组
  4. items[putIndex] = x;
  5. // 没有扩容,循环添加
  6. if (++putIndex == items.length)
  7. putIndex = 0;
  8. // 添加成功,数组中元素数量+1
  9. count++;
  10. // 若之前有线程取元素因为数组为空被阻塞在Condition条件队列中,这里添加元素之后唤醒之前的等待线程
  11. notEmpty.signal();
  12. }
  1. 进入该方法表示数组未满,通过 putIndex 索引直接将元素添加到数组 items 中,然后调整 putIndex 索引值
  2. 当 putIndex 索引大小等于数组长度时,将 putIndex 重新设置为 0 。这里是将内部数组作为环形队列使用
  3. 添加元素成功,数组不再为空,唤醒之前被阻赛的消费线程

offer(E) - 非阻塞添加元素

  1. public boolean offer(E e) {
  2. // e==null,抛出 NullPointerException
  3. checkNotNull(e);
  4. final ReentrantLock lock = this.lock;
  5. // 自旋加锁
  6. lock.lock();
  7. try {
  8. // 数组已满,添加失败并且返回false
  9. if (count == items.length)
  10. return false;
  11. else {
  12. // 元素入队
  13. enqueue(e);
  14. return true;
  15. }
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  1. 如果数组满了,就直接释放锁,然后返回 false
  2. 如果数组没满,就将元素入队(加入数组),然后返回 true

add(E) - 非阻塞添加元素

  1. // ArrayBlockingQueue.java
  2. public boolean add(E e) {
  3. return super.add(e);
  4. }
  5. // AbstractQueue.java
  6. public boolean add(E e) {
  7. if (offer(e))
  8. return true;
  9. else
  10. throw new IllegalStateException("Queue full");
  11. }

间接调用 offer() 方法,通过该方法的返回值来执行逻辑:

  1. offer() 方法添加成功,返回true
  2. offer() 方法添加失败,抛出 IllegalStateException 异常

put(E) - 阻塞添加元素

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. // 可中断加锁
  5. lock.lockInterruptibly();
  6. try {
  7. // 数组已满,无法添加,将当前线程挂起,添加到 notFull 条件队列,等待被唤醒
  8. while (count == items.length)
  9. notFull.await();
  10. // 数组还有空间,直接添加
  11. enqueue(e);
  12. } finally {
  13. lock.unlock();
  14. }
  15. }
  1. 获取 lock 锁
  2. 如果队列元素已满,那么当前线程会被加入 notFull 条件对象的等待队列中,直到队列有空位置才会被唤醒执行添加操作
  3. 如果队列没有满,直接调用 enqueue(e) 方法将元素加入数组队列中
  4. 释放 lock 锁

注意:调用 **put()** 方法而阻塞的线程是可以中断的

dequeue() - 元素出队

  1. private E dequeue() {
  2. final Object[] items = this.items;
  3. // 获取当前元素
  4. E x = (E) items[takeIndex];
  5. // 将当前元素置空,help GC
  6. items[takeIndex] = null;
  7. // 调整 takeIndex
  8. if (++takeIndex == items.length)
  9. takeIndex = 0;
  10. count--;
  11. // 更新迭代器中的元素数据
  12. if (itrs != null)
  13. itrs.elementDequeued();
  14. // 唤醒在notFull条件队列上阻塞的线程
  15. notFull.signal();
  16. return x;
  17. }


  1. 进入 dequeue() 方法,意味着 takeIndex 位置有元素可以删除,直接获取当前元素,并将数组对应位置元素置为 null
  2. 将 takeIndex 位置后移(自增),移动到下一个位置,无论一个位置有没有元素都没有关系,总之移动之后的 takeIndex 新位置会是下一轮删除元素的位置
  3. 如果 takeIndex 自增之后值为 items.length,说明 takeIndex 的索引已到数组尽头,就将其值校正为 0,表示下一次从头部开始删除元素,达到环形队列的效果
  4. 删除了元素说明队列有空位,唤醒 notFull 条件等待队列中的一个 put 线程,执行添加操作


poll() - 非阻塞删除元素

  1. public E poll() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. return (count == 0) ? null : dequeue();
  6. } finally {
  7. lock.unlock();
  8. }
  9. }
  1. 若队列为空,则立即返回 null
  2. 若队列不为空,则获取并删除此队列的头元素

remove() - 非租塞删除特定元素

  1. public boolean remove(Object o) {
  2. if (o == null) return false;
  3. final Object[] items = this.items;
  4. final ReentrantLock lock = this.lock;
  5. // 加锁
  6. lock.lock();
  7. try {
  8. // 数据中还存在元素
  9. if (count > 0) {
  10. final int putIndex = this.putIndex;
  11. int i = takeIndex;
  12. do {
  13. // equals 方法判断两个元素相等
  14. if (o.equals(items[i])) {
  15. // 找到元素则移除
  16. removeAt(i);
  17. return true;
  18. }
  19. // 环形寻找
  20. if (++i == items.length)
  21. i = 0;
  22. } while (i != putIndex);
  23. }
  24. return false;
  25. } finally {
  26. lock.unlock();
  27. }
  28. }
  29. void removeAt(final int removeIndex) {
  30. final Object[] items = this.items;
  31. if (removeIndex == takeIndex) {
  32. // removing front item; just advance
  33. items[takeIndex] = null;
  34. if (++takeIndex == items.length)
  35. takeIndex = 0;
  36. count--;
  37. if (itrs != null)
  38. itrs.elementDequeued();
  39. } else {
  40. final int putIndex = this.putIndex;
  41. for (int i = removeIndex;;) {
  42. int next = i + 1;
  43. if (next == items.length)
  44. next = 0;
  45. if (next != putIndex) {
  46. items[i] = items[next];
  47. i = next;
  48. } else {
  49. items[i] = null;
  50. this.putIndex = i;
  51. break;
  52. }
  53. }
  54. count--;
  55. if (itrs != null)
  56. itrs.removedAt(removeIndex);
  57. }
  58. notFull.signal();
  59. }
  1. 先判断数组中是否有数据,没有则返回 false,存在数组在进行下一步寻找
  2. while 循环遍历 takeIndex(对头)到 putIndex(队尾)的所有元素,通过 equals() 方法来判定两个元素是否相等。找到合适的元素在调用 removeAt(int removeIndex) 方法移除元素并返回 true,否则返回 false,表示移除元素失败
    1. 如果要移除的元素就在 takeIndex 位置,那么移除该元素并且需要重新计算 takeIndex 的值
    2. 移除对应位置的元素,并将 removeIndex 到 putIndex 之间的元素往前移动一个位置,并设置 putIndex 的值

take() - 阻塞删除元素

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. notEmpty.await();
  7. return dequeue();
  8. } finally {
  9. lock.unlock();
  10. }
  11. }
  1. 如果队列没有数据,就将线程加入 notEmpty 等待队列并阻塞线程,一直到有生产者插入数据后通过 notEmpty 发出一个消息,notEmpty 将从其等待队列唤醒一个消费(或者删除)节点,同时启动该消费线程
  2. 如果队列有数据,就通过 dequeue() 执行元素的删除(或消费)操作

peek() - 获取对头元素

  1. public E peek() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. return itemAt(takeIndex); // null when queue is empty
  6. } finally {
  7. lock.unlock();
  8. }
  9. }
  10. final E itemAt(int i) {
  11. return (E) items[i];
  12. }

从 takeIndex(头部位置)直接就可以获取最早被添加的元素,如果不存在就返回 null。