基于 JDK 1.8.0

1. 概括

ArrayBlockingQueue 是一个先进先出(FIFO)的阻塞队列,底层是数组,队列长度在创建的时候确定不能修改。

使用场景:生产者消费者

2. 类定义

  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable

ArrayBlockingQueue 继承于 AbstractQueue,实现 BlockingQueue,Serializable 接口。

  • BlockingQueue 的介绍参考 BlockingQueue 源码学习

  • AbstractQueue 源码查看

    1. public abstract class AbstractQueue<E>
    2. extends AbstractCollection<E>
    3. implements Queue<E>


抽象类 AbstractQueue 继承于 AbstractCollection,实现 Queue 接口。
其中 AbstractQueue 实现了 Queue 的一些操作,在插入时不允许为 NULL;
根据 AbstractQueue 源码知道,AbstractQueue 对 Queue 接口的方法的返回值进行的参数值的判断,如果添加失败或者没有查找到元素,就会抛出相关异常,这里就不再展示。

3. 成员变量

  1. /**
  2. * 队列中的元素,final 类型不可变
  3. */
  4. final Object[] items;
  5. /** 下一次执行 take,poll,peek,remove 操作的位置 */
  6. int takeIndex;
  7. /** 下一次执行 put,offer,add 操作的位置,在哪个位置进行这些操作 */
  8. int putIndex;
  9. /** 队列中的元素个数 */
  10. int count;
  11. /** 锁,所有请求都会先去获取这个锁 */
  12. final ReentrantLock lock;
  13. /** 条件队列,用于通知 take 方法告知队列中有元素了 */
  14. private final Condition notEmpty;
  15. /** 条件队列,用于通知 put() 方法有空间放元素了 */
  16. private final Condition notFull;
  17. /**
  18. * TODO 迭代器,后面再展开学习
  19. * Shared state for currently active iterators, or null if there
  20. * are known not to be any. Allows queue operations to update
  21. * iterator state.
  22. */
  23. transient Itrs itrs = null;

根据成员变量的介绍可以得知:

  • 该队列用数组保持元素
  • 入队和出队用了两个指针标记,这两个标记的移动类似于在环形节点上面移动,如上图,下面详细分析。
  • 并发控制使用了 ReentrantLock ,并且使用两个 Condition 来控制队列的入队方法和出队方法的阻塞与继续运行

4. 构造方法

  1. /**
  2. * 仅指定容量大小,默认使用 非公平锁 控制并发请求
  3. */
  4. public ArrayBlockingQueue(int capacity) {
  5. this(capacity, false);
  6. }
  7. /**
  8. * 生成指定容量大小的数组,容量必须大于 0
  9. * 构造 锁 和 条件队列
  10. */
  11. public ArrayBlockingQueue(int capacity, boolean fair) {
  12. if (capacity <= 0)
  13. throw new IllegalArgumentException();
  14. this.items = new Object[capacity];
  15. lock = new ReentrantLock(fair);
  16. notEmpty = lock.newCondition();
  17. notFull = lock.newCondition();
  18. }
  19. /**
  20. * 将集合 c 中的元素添加到 ArrayBlockingQueue 中
  21. * count = c.size()
  22. * 若集合 c 的容量大小等于 capacity,将 putIndex 置为 0
  23. */
  24. public ArrayBlockingQueue(int capacity, boolean fair,
  25. Collection<? extends E> c) {
  26. this(capacity, fair);
  27. final ReentrantLock lock = this.lock;
  28. lock.lock(); // Lock only for visibility, not mutual exclusion
  29. try {
  30. int i = 0;
  31. try {
  32. for (E e : c) {
  33. checkNotNull(e);
  34. items[i++] = e;
  35. }
  36. } catch (ArrayIndexOutOfBoundsException ex) {
  37. throw new IllegalArgumentException();
  38. }
  39. count = i;
  40. putIndex = (i == capacity) ? 0 : i;
  41. } finally {
  42. lock.unlock();
  43. }
  44. }

5. 成员方法

ArrayBlockingQueue 源码学习 - 图1

5.1 入队方法

5.1.1 boolean add(E e)

调用父类 add 方法,而父类 AbstractQueue add 方法实际调用的是 offer(E e) 方法

  1. public boolean add(E e) {
  2. return super.add(e);
  3. }
  4. // super.add
  5. public boolean add(E e) {
  6. if (offer(e))
  7. return true;
  8. else
  9. throw new IllegalStateException("Queue full");
  10. }

5.1.2 boolean offer(E e) 与 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

  • 加锁后判断容量大小,满了就不添加;没满执行共用的入队方法,入队成功返回 true
  1. /**
  2. * 插入成功返回 true,否则返回 false
  3. * 插入元素为 NULL 返回空指针异常
  4. */
  5. public boolean offer(E e) {
  6. checkNotNull(e); // 首先判断元素是否为 NULL,为 NULL 抛出异常
  7. final ReentrantLock lock = this.lock;
  8. lock.lock();
  9. try {
  10. if (count == items.length)
  11. return false; // 若数组满,返回 false
  12. else {
  13. enqueue(e); // 添加元素并返回 true
  14. return true;
  15. }
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  20. /**
  21. * 带超时时间的添加元素方法
  22. * 当超时时间达到,直接返回 false
  23. */
  24. public boolean offer(E e, long timeout, TimeUnit unit)
  25. throws InterruptedException {
  26. checkNotNull(e);
  27. long nanos = unit.toNanos(timeout);
  28. final ReentrantLock lock = this.lock;
  29. lock.lockInterruptibly();
  30. try {
  31. while (count == items.length) {
  32. if (nanos <= 0)
  33. return false;
  34. nanos = notFull.awaitNanos(nanos);
  35. }
  36. enqueue(e);
  37. return true;
  38. } finally {
  39. lock.unlock();
  40. }
  41. }

内部共用的 入队 方法 enqueue()

ArrayBlockingQueue 源码学习 - 图2

  • 仅在同步方法块中调用
  • putIndex 的值一直在从 0 到 items.length 之间循环,添加元素进来就往后移动一位,移动到最后就又回到了开头,像是在环形数组上面移动
  1. private void enqueue(E x) {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[putIndex] == null;
  4. final Object[] items = this.items;
  5. items[putIndex] = x;
  6. if (++putIndex == items.length)
  7. putIndex = 0;
  8. count++;
  9. notEmpty.signal(); // 通知等待 take 的线程,表示当前数组有元素可取
  10. }

5.1.3 void put(E e) throws InterruptedException

  1. /**
  2. * 阻塞的插入方法
  3. */
  4. public void put(E e) throws InterruptedException {
  5. checkNotNull(e);
  6. final ReentrantLock lock = this.lock;
  7. lock.lockInterruptibly(); // 获取锁,若被中断抛出异常
  8. try {
  9. while (count == items.length)
  10. notFull.await(); // 数组满了就进行等待
  11. enqueue(e); // 否则插入元素
  12. } finally {
  13. lock.unlock();
  14. }
  15. }

入队方法总结:

  • 根据 BlockingQueue 源码可知,添加元素有 3 个方法,分别是 add,offer,put,其中 add 方法添加元素失败会抛出异常,官方推荐使用 offer,put 方法带阻塞功能。
  • enqueue 是内部共用的添加元素方法,其中 putIndex 的值一直都在从表头 -> 表尾 -> 表头循环,有点类似在环上面移动
  • ArrayBlockingQueue 的 add,offer,put 方法都是线程安全的

5.2 出队方法

5.2.1 E poll() 与 E poll(long timeout, TimeUnit unit) throws InterruptedException

  1. public E poll() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. return (count == 0) ? null : dequeue(); // 若队列没有元素,返回 NULL;否则执行出队操作,将表头节点弹出并返回
  6. } finally {
  7. lock.unlock();
  8. }
  9. }
  10. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  11. long nanos = unit.toNanos(timeout);
  12. final ReentrantLock lock = this.lock;
  13. lock.lockInterruptibly();
  14. try {
  15. while (count == 0) { // 当数组没有元素时进行等待
  16. if (nanos <= 0)
  17. return null; // 等待超时,返回 NULL
  18. nanos = notEmpty.awaitNanos(nanos);
  19. }
  20. return dequeue();
  21. } finally {
  22. lock.unlock();
  23. }
  24. }

dequeue 内部共用的出队操作

  1. private E dequeue() {
  2. // assert lock.getHoldCount() == 1; 仅在同步方法块中调用
  3. // assert items[takeIndex] != null;
  4. final Object[] items = this.items;
  5. @SuppressWarnings("unchecked")
  6. E x = (E) items[takeIndex]; //保存 takeIndex 处的元素,并将此处位置置为空
  7. items[takeIndex] = null;
  8. if (++takeIndex == items.length) // takeIndex 往后移动一位,若 takeIndex 已经到数组末尾,那么让他回到开头
  9. takeIndex = 0;
  10. count--; // 队列数组元素减 1
  11. if (itrs != null)
  12. itrs.elementDequeued();
  13. notFull.signal(); // 通知等待 notFull 变量可用的线程,队列将可以添加元素进来
  14. return x;
  15. }

5.2.2 E take() throws InterruptedException

  • 与 poll 不同的是,take 取元素会阻塞
  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. notEmpty.await(); // 没有元素时进行等待
  7. return dequeue(); // 否则取出表头节点
  8. } finally {
  9. lock.unlock();
  10. }
  11. }

5.3 获取表头元素

  • 直接取 takeIndex 处的元素,队列为空时返回 NULL
  1. public E peek() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. return itemAt(takeIndex); // null when queue is empty
  6. } finally {
  7. lock.unlock();
  8. }
  9. }

5.4 删除指定元素

ArrayBlockingQueue 源码学习 - 图3

  • 首先这个方法一定是在同步方法块中进行移除操作 ,环形图继续拿出来,假设原来是顺时针方向进行插入元素
  • 删除元素,就得从 takeIndex (表头位置)开始遍历,如果有相同的就移除当前位置的元素
  • 若 takeIndex 走到末尾,然后再置为 0,就像上面环一样,一直顺时针走
  • 若 takeIndex = putIndex ,说明走到遍历到末尾了,注意这里的末尾是插入位置 putIndex 的前一个位置,说明前面的位置已经遍历完了。
  1. public boolean remove(Object o) {
  2. if (o == null) return false;
  3. final Object[] items = this.items;
  4. final ReentrantLock lock = this.lock;
  5. lock.lock();
  6. try {
  7. if (count > 0) { // 队列不为空时
  8. // 临时保存执行添加元素方法时的元素插入的索引位置,遍历数组时遍历到该处位置时,说明前面的元素都已经遍历过了
  9. final int putIndex = this.putIndex;
  10. int i = takeIndex;
  11. do {
  12. if (o.equals(items[i])) {
  13. // 如果找到指定元素,进行移除操作
  14. removeAt(i);
  15. return true;
  16. }
  17. if (++i == items.length)
  18. // 若走到数组末尾,那么重置 i 到表头
  19. i = 0;
  20. } while (i != putIndex); // i == putIndex 时,说明数组中元素已经全部遍历,因为 putIndex 是下一次插入元素的位置
  21. }
  22. return false; // 队列为空,删除不了元素,返回 false
  23. } finally {
  24. lock.unlock();
  25. }
  26. }
  27. /**
  28. * 仅在获取到锁时调用,删除的过程其实就是 逆时针 移动元素的过程
  29. */
  30. void removeAt(final int removeIndex) {
  31. // assert lock.getHoldCount() == 1;
  32. // assert items[removeIndex] != null;
  33. // assert removeIndex >= 0 && removeIndex < items.length;
  34. final Object[] items = this.items;
  35. // 若待删除元素位置等于此时表头位置,直接将此处元素置为 NULL,并且 takeIndex 指向下一个位置,元素无需移动(参考上面环形图,移除 takeIndex 元素只需要置 NULL 然后后移)
  36. if (removeIndex == takeIndex) {
  37. // 元素值直接置为 null
  38. items[takeIndex] = null;
  39. if (++takeIndex == items.length)
  40. takeIndex = 0;
  41. // 更新元素数量
  42. count--;
  43. if (itrs != null)
  44. // 更新迭代器
  45. itrs.elementDequeued();
  46. } else {
  47. // an "interior" remove
  48. // slide over all others up through putIndex.
  49. final int putIndex = this.putIndex;
  50. for (int i = removeIndex;;) {
  51. int next = i + 1;
  52. // 待删除位置已经是最后一个元素,将 next 置为 0
  53. if (next == items.length)
  54. next = 0;
  55. if (next != putIndex) {
  56. // 逆时针移动,把后面的元素移到前面来
  57. items[i] = items[next];
  58. i = next;
  59. } else {
  60. // 此处位置的下一个位置是 putIndex,那么把此处位置置为 null
  61. items[i] = null;
  62. this.putIndex = i;
  63. break;
  64. }
  65. }
  66. // 更新数组元素数量
  67. count--;
  68. if (itrs != null)
  69. // 更新迭代器
  70. itrs.removedAt(removeIndex);
  71. }
  72. notFull.signal(); // 唤醒等待的 put 线程
  73. }
  74. void elementDequeued() {
  75. // assert lock.getHoldCount() == 1;
  76. if (count == 0)
  77. queueIsEmpty();
  78. else if (takeIndex == 0)
  79. takeIndexWrapped();
  80. }

内部类( TODO 后面再补充)

  1. class Itrs {
  2. }
  3. private class Itr implements Iterator<E> {
  4. }