上节介绍了使用有界链表方式实现的阻塞队列 LinkedBlockingQueue,本节来研究使用有界数组方式实现的阻塞队列 ArrayBlockingQueue 的原理。
类图结构
同样,为了能从全局一览 ArrayBlockingQueue 的内部构造,先来看它的类图,如图 7-30 所示。
由该图可以看出,ArrayBlockingQueue 的内部有一个数组 items,用来存放队列元素,putindex 变量表示入队元素下标,takeIndex 是出队下标,count 统计队列元素个数。从定义可知,这些变量并没有使用 volatile 修饰,这是因为访问这些变量都是在锁块内,而加锁已经保证了锁块内变量的内存可见性了。另外有个独占锁 lock 用来保证出、入队操作的原子性,这保证了同时只有一个线程可以进行入队、出队操作。另外,notEmpty、notFull 条件变量用来进行出、入队的同步。
另外,由于 ArrayBlockingQueue 是有界队列,所以构造函数必须传入队列大小参数。构造函数的代码如下。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
由以上代码可知,在默认情况下使用 ReentrantLock 提供的非公平独占锁进行出、入队操作的同步。
ArrayBlockingQueue 原理介绍
本节主要讲解下面几个函数的原理,研究过 LinkedBlockingQueue 的实现后再看 ArrayBlockingQueue 的实现会感觉后者简单了很多。
1.offer 操作
向队列尾部插入一个元素,如果队列有空闲空间则插入成功后返回 true,如果队列已满则丢弃当前元素然后返回 false。如果 e 元素为 null 则抛出 NullPointerException 异常。另外,该方法是不阻塞的。
public boolean offer(E e) {
//(1)e 为 null,则抛出 NullPointerException 异常
checkNotNull(e);
//(2)获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(3)如果队列满则返回 false
if (count == items.length)
return false;
else {
//(4)否则插入元素
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
代码(2)获取独占锁,当前线程获取该锁后,其他入队和出队操作的线程都会被阻塞挂起而后被放入 lock 锁的 AQS 阻塞队列。
代码(3)判断如果队列满则直接返回 false,否则调用 enqueue 方法后返回 true,enqueue 的代码如下。
private void enqueue(E x) {
//(6)元素入队
final Object[] items = this.items;
items[putIndex] = x;
//(7)计算下一个元素应该存放的下标位置
if (++putIndex == items.length)
putIndex = 0;
count++;
//(8)
notEmpty.signal();
}
如上代码首先把当前元素放入 items 数组,然后计算下一个元素应该存放的下标位置,并递增元素个数计数器,最后激活 notEmpty 的条件队列中因为调用 take 操作而被阻塞的一个线程。这里由于在操作共享变量 count 前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是从 CPU 缓存或者寄存器获取。
代码(5)释放锁,然后会把修改的共享变量值(比如 count 的值)刷新回主内存中,这样其他线程通过加锁再次读取这些共享变量时,就可以看到最新的值。
2.put 操作
向队列尾部插入一个元素,如果队列有空闲则插入后直接返回 true,如果队列已满则阻塞当前线程直到队列有空闲并插入成功后返回 true,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。另外,如果 e 元素为 null 则抛出 NullPointerException 异常。
public void put(E e) throws InterruptedException {
//(1)
checkNotNull(e);
final ReentrantLock lock = this.lock;
//(2)获取锁(可被中断)
lock.lockInterruptibly();
try {
//(3)如果队列满,则把当前线程放入 notFull 管理的条件队列
while (count == items.length)
notFull.await();
//(4)插入元素
enqueue(e);
} finally {
//(5)
lock.unlock();
}
}
在代码(2)中,在获取锁的过程中当前线程被其他线程中断了,则当前线程会抛出 InterruptedException 异常而退出。
代码(3)判断如果当前队列已满,则把当前线程阻塞挂起后放入 notFull 的条件队列,注意这里也是使用了 while 循环而不是 if 语句。
代码(4)判断如果队列不满则插入当前元素,此处不再赘述。
3.poll 操作
从队列头部获取并移除一个元素,如果队列为空则返回 null,该方法是不阻塞的。
public E poll() {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)当前队列为空则返回 null,否则调用 dequeue()获取
return (count == 0) ? null : dequeue();
} finally {
//(3)释放锁
lock.unlock();
}
}
代码(1)获取独占锁。
代码(2)判断如果队列为空则返回 null,否则调用 dequeue()方法。dequeue 方法的代码如下。
private E dequeue() {
final Object[] items = this.items;
//(4)获取元素值
@SuppressWarnings(「unchecked」)
E x = (E) items[takeIndex];
//(5)数组中的值为 null
items[takeIndex] = null;
//(6)队头指针计算,队列元素个数减 1
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//(7)发送信号激活 notFull 条件队列里面的一个线程
notFull.signal();
return x;
}
由以上代码可知,首先获取当前队头元素并将其保存到局部变量,然后重置队头元素为 null,并重新设置队头下标,递减元素计数器,最后发送信号激活 notFull 的条件队列里面一个因为调用 put 方法而被阻塞的线程。
4.take 操作
获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。
public E take() throws InterruptedException {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//(2)队列为空,则等待,直到队列中有元素
while (count == 0)
notEmpty.await();
//(3)获取队头元素
return dequeue();
} finally {
//(4) 释放锁
lock.unlock();
}
}
take 操作的代码也比较简单,与 poll 相比只是代码(2)不同。在这里,如果队列为空则把当前线程挂起后放入 notEmpty 的条件队列,等其他线程调用 notEmpty.signal()方法后再返回。需要注意的是,这里也是使用 while 循环进行检测并等待而不是使用 if 语句。
5.peek 操作
获取队列头部元素但是不从队列里面移除它,如果队列为空则返回 null,该方法是不阻塞的。
public E peek() {
//(1)获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)
return itemAt(takeIndex);
} finally {
//(3)
lock.unlock();
}
}
@SuppressWarnings(「unchecked」)
final E itemAt(int i) {
return (E) items[i];
}
peek 的实现更简单,首先获取独占锁,然后从数组 items 中获取当前队头下标的值并返回,在返回前释放获取的锁。
6.size 操作
计算当前队列元素个数。
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
size 操作比较简单,获取锁后直接返回 count,并在返回前释放锁。也许你会问,这里又没有修改 count 的值,只是简单地获取,为何要加锁呢?其实如果 count 被声明为 volatile 的这里就不需要加锁了,因为 volatile 类型的变量保证了内存的可见性,而 ArrayBlockingQueue 中的 count 并没有被声明为 volatile 的,这是因为 count 操作都是在获取锁后进行的,而获取锁的语义之一是,获取锁后访问的变量都是从主内存获取的,这保证了变量的内存可见性。
小结
如图 7-31 所示,ArrayBlockingQueue 通过使用全局独占锁实现了同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似于在方法上添加 synchronized 的意思。其中 offer 和 poll 操作通过简单的加锁进行入队、出队操作,而 put、take 操作则使用条件变量实现了,如果队列满则等待,如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。另外,相比 LinkedBlockingQueue,ArrayBlockingQueue 的 size 操作的结果是精确的,因为计算前加了全局锁。