引言

这篇文章,我们分析一下ArrayBlockingQueue这个阻塞队列的实现。

类定义

ArrayBlockingQueue的定义如下:

  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {}

它实现了BlockingQueue接口同时继承了AbstractQueue。从前一篇文章我们知道,AbstractQueue对add、remove和element方法进行了实现,不过它们分别依赖了offer、poll和peek方法。我们现在来看ArrayBlockingQueue是怎样实现offer、poll、put、take等方法的。

字段

ArrayBlockingQueue有如下几个重要字段:

  1. /** The queued items */
  2. final Object[] items;
  3. /** items index for next take, poll, peek or remove */
  4. int takeIndex;
  5. /** items index for next put, offer, or add */
  6. int putIndex;
  7. /** Number of elements in the queue */
  8. int count;
  9. /*
  10. * Concurrency control uses the classic two-condition algorithm
  11. * found in any textbook.
  12. */
  13. /** Main lock guarding all access */
  14. final ReentrantLock lock;
  15. /** Condition for waiting takes */
  16. private final Condition notEmpty;
  17. /** Condition for waiting puts */
  18. private final Condition notFull;

items表示队列中的元素,也就是说ArrayBlockingQueue的队列是通过数组来实现的。takeIndex用来记录数组中下一个要删除或者获取的元素的索引,它会在take、poll、peek和remove等方法中用到。putIndex用来记录下一个要增加的元素在数组中的索引,它会在put、offer、add方法中用到。count表示队列中元素的数量,注意不是队列的长度,每增加一个元素,count的数量才会加1,如果count==items.length,说明队列已满。
后面的lock和condition用来对队列的访问进行并发控制,两个condition,一个用来表示队列已满,一个用来表示队列为空。稍后我们会看到,队列元素的添加和删除都是通过lock和condition来控制的。

Offer方法

offer是接口Queue的方法,它会立即返回添加元素的结果,队列满时不会阻塞。

  1. public boolean offer(E e) {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lock();
  5. try {
  6. if (count == items.length)
  7. return false;
  8. else {
  9. enqueue(e);
  10. return true;
  11. }
  12. } finally {
  13. lock.unlock();
  14. }
  15. }

首先,它进行了lock操作,该操作会让执行offer方法的线程独占对队列(数组)的访问。count==items.length说明队列已满,此时直接返回false。如果队列没满,它会执行enqueue方法,这个方法比较重要,我们放在put方法中讲解。

put方法

put方法是一个阻塞方法,我们来看它的实现:

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. while (count == items.length)
  7. notFull.await();
  8. enqueue(e);
  9. } finally {
  10. lock.unlock();
  11. }

整体的逻辑与offer类似,只是在判断队列是否已满时,用的是while,并且如果队列已满,就会执行notFull.await,也就是说它会在notFull这个condition上阻塞。notFull从字面上理解就是队列未满,而put方法是要添加元素,所以它就应该阻塞在队列未满的条件上。
如果队列未满,它也会执行enqueue方法:

  1. /**
  2. * Inserts element at current put position, advances, and signals.
  3. * Call only when holding lock.
  4. */
  5. private void enqueue(E x) {
  6. // assert lock.getHoldCount() == 1;
  7. // assert items[putIndex] == null;
  8. final Object[] items = this.items;
  9. items[putIndex] = x;
  10. if (++putIndex == items.length)
  11. putIndex = 0;
  12. count++;
  13. notEmpty.signal();
  14. }

这个方法我们需要重点理解。它会将数组的putIndex位置上的元素设置为要添加的元素,然后增加putIndex和count的值,注意if里面的判断,它的意思是如果添加完当前元素之后队列就满了,putIndex就设置为零。它最后还执行了notEmpty.signal ()方法,notEmpty是代表队列非空的condition,enqueue添加了元素,队列肯定非空了。那我们就能想到,这里应该是用来唤醒所有阻塞在队列非空条件上的线程,这些线程应该是执行阻塞地删除元素的方法例如take、带有时间限制的poll方法。
上面我们说的offer方法不会阻塞,所以当队列满时,它没有调用notFull.await方法而是直接返回false。

take方法

take方法用来删除队首的元素,它与put方法对应,也是阻塞的。当队列为空时,它会阻塞在notEmpty这个condition上。

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. notEmpty.await();
  7. return dequeue();
  8. } finally {
  9. lock.unlock();
  10. }
  11. }

如果队列未满,它会调用dequeue方法,我们大概猜测一下,dequeue方法应该与enqueue方法执行相反的逻辑,它会减少count和takeIndex的值,然后会执行notFull.await方法来唤醒所有阻塞在队列未满条件上进行阻塞式元素添加的线程。
我们来验证一下:

  1. private E dequeue() {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[takeIndex] != null;
  4. final Object[] items = this.items;
  5. @SuppressWarnings("unchecked")
  6. E x = (E) items[takeIndex];
  7. items[takeIndex] = null;
  8. if (++takeIndex == items.length)
  9. takeIndex = 0;
  10. count--;
  11. if (itrs != null)
  12. itrs.elementDequeued();
  13. notFull.signal();
  14. return x;
  15. }

猜测的大致正确,它同时对itrs进行了操作,这个涉及到的是队列的迭代器,我们不去重点分析。

poll方法

poll方法是与offer方法对应的删除元素的方法,它也不会阻塞,所以跟take方法相比,它不会执行notEmpty.await来阻塞线程而是直接返回null。

  1. public E poll() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. return (count == 0) ? null : dequeue();
  6. } finally {
  7. lock.unlock();
  8. }
  9. }

除了上面的四个方法,还有两个带有时间参数的方法:

  1. public boolean offer(E e, long timeout, TimeUnit unit)
  2. public E poll(long timeout, TimeUnit unit) throws InterruptedException {

它们同样是阻塞的,但是会超时退出。这里不再分析。
大致总结一下,ArrayBlockingQueue中lock和两个condition的使用规则:首先,只要涉及到队列的修改不管是增加元素还是删除元素,都会进行lock操作,所以整个阻塞队列是并发安全的;其次,增加元素需要保证队列未满,删除元素需要保证队列非空;最后,阻塞地增加和删除元素操作(例如put和take)会在对应的条件上进行阻塞直到条件满足或者超时(带有时间参数的offer和poll方法)。

小结

ArrayBlockingQueue利用数组实现了有界队列,通过lock和condition实现了对队列访问的并发控制,保证了增加元素和删除元素的正确性。队列已满不能增加元素,队列为空不能删除元素,阻塞队列的这个特性使得它非常适合实现生产者和消费者,下一篇文章,我们来看使用阻塞队列实现生产者和消费者的示例。