引言
这篇文章,我们分析一下ArrayBlockingQueue这个阻塞队列的实现。
类定义
ArrayBlockingQueue的定义如下:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {}
它实现了BlockingQueue接口同时继承了AbstractQueue。从前一篇文章我们知道,AbstractQueue对add、remove和element方法进行了实现,不过它们分别依赖了offer、poll和peek方法。我们现在来看ArrayBlockingQueue是怎样实现offer、poll、put、take等方法的。
字段
ArrayBlockingQueue有如下几个重要字段:
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
items表示队列中的元素,也就是说ArrayBlockingQueue的队列是通过数组来实现的。takeIndex用来记录数组中下一个要删除或者获取的元素的索引,它会在take、poll、peek和remove等方法中用到。putIndex用来记录下一个要增加的元素在数组中的索引,它会在put、offer、add方法中用到。count表示队列中元素的数量,注意不是队列的长度,每增加一个元素,count的数量才会加1,如果count==items.length,说明队列已满。
后面的lock和condition用来对队列的访问进行并发控制,两个condition,一个用来表示队列已满,一个用来表示队列为空。稍后我们会看到,队列元素的添加和删除都是通过lock和condition来控制的。
Offer方法
offer是接口Queue的方法,它会立即返回添加元素的结果,队列满时不会阻塞。
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
首先,它进行了lock操作,该操作会让执行offer方法的线程独占对队列(数组)的访问。count==items.length说明队列已满,此时直接返回false。如果队列没满,它会执行enqueue方法,这个方法比较重要,我们放在put方法中讲解。
put方法
put方法是一个阻塞方法,我们来看它的实现:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
整体的逻辑与offer类似,只是在判断队列是否已满时,用的是while,并且如果队列已满,就会执行notFull.await,也就是说它会在notFull这个condition上阻塞。notFull从字面上理解就是队列未满,而put方法是要添加元素,所以它就应该阻塞在队列未满的条件上。
如果队列未满,它也会执行enqueue方法:
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
这个方法我们需要重点理解。它会将数组的putIndex位置上的元素设置为要添加的元素,然后增加putIndex和count的值,注意if里面的判断,它的意思是如果添加完当前元素之后队列就满了,putIndex就设置为零。它最后还执行了notEmpty.signal ()方法,notEmpty是代表队列非空的condition,enqueue添加了元素,队列肯定非空了。那我们就能想到,这里应该是用来唤醒所有阻塞在队列非空条件上的线程,这些线程应该是执行阻塞地删除元素的方法例如take、带有时间限制的poll方法。
上面我们说的offer方法不会阻塞,所以当队列满时,它没有调用notFull.await方法而是直接返回false。
take方法
take方法用来删除队首的元素,它与put方法对应,也是阻塞的。当队列为空时,它会阻塞在notEmpty这个condition上。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
如果队列未满,它会调用dequeue方法,我们大概猜测一下,dequeue方法应该与enqueue方法执行相反的逻辑,它会减少count和takeIndex的值,然后会执行notFull.await方法来唤醒所有阻塞在队列未满条件上进行阻塞式元素添加的线程。
我们来验证一下:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
猜测的大致正确,它同时对itrs进行了操作,这个涉及到的是队列的迭代器,我们不去重点分析。
poll方法
poll方法是与offer方法对应的删除元素的方法,它也不会阻塞,所以跟take方法相比,它不会执行notEmpty.await来阻塞线程而是直接返回null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
除了上面的四个方法,还有两个带有时间参数的方法:
public boolean offer(E e, long timeout, TimeUnit unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
它们同样是阻塞的,但是会超时退出。这里不再分析。
大致总结一下,ArrayBlockingQueue中lock和两个condition的使用规则:首先,只要涉及到队列的修改不管是增加元素还是删除元素,都会进行lock操作,所以整个阻塞队列是并发安全的;其次,增加元素需要保证队列未满,删除元素需要保证队列非空;最后,阻塞地增加和删除元素操作(例如put和take)会在对应的条件上进行阻塞直到条件满足或者超时(带有时间参数的offer和poll方法)。
小结
ArrayBlockingQueue利用数组实现了有界队列,通过lock和condition实现了对队列访问的并发控制,保证了增加元素和删除元素的正确性。队列已满不能增加元素,队列为空不能删除元素,阻塞队列的这个特性使得它非常适合实现生产者和消费者,下一篇文章,我们来看使用阻塞队列实现生产者和消费者的示例。