继承体系
数据结构
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {final Object[] items;// 数组实现int takeIndex;// 即将拿出的元素索引int putIndex;// 即将放入的元素索引// 并发控制的锁final ReentrantLock lock;}
put() & offer()
put加入队列,如果队列满了则等待;offer则立即返回错误
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 数组满了则等待while (count == items.length)notFull.await();// 入队enqueue(e);} finally {lock.unlock();}}public boolean offer(E e) {// 如果传的是null 直接抛空指针异常checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {// 如果已经满了,返回falseif (count == items.length)return false;else {// 调用 enqueue 插入数据enqueue(e);return true;}} finally {lock.unlock();}}private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;// 入队成功后putIndex++// 数组满了putIndex重置为0(这里可以理解为0并不是队列的尾,队列的尾是在移动的类似于一个环,队列尾是putIndex++)if (++putIndex == items.length)putIndex = 0;count++;// 已经添加数据,唤醒not EmptynotEmpty.signal();}
take() & poll()
take如果队列为空则阻塞;poll为空则返回null
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 可响应中断
lock.lockInterruptibly();
try {
// 队列已空
while (count == 0)
// 等待 ”队列不为空“ Condition
notEmpty.await();
// 调用 dequeue 取元素
return dequeue();
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 如果取完就重置 takeIndex,继续从0开始取
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 走到这里说明已经取一条了
// 这个时候需要唤醒 "队列没有满" Condition
notFull.signal();
return x;
}
