一 特性
- 数组支持的有界队列
- 按照FIFO(先进先出) 对元素进行排序
- 一旦创建则不能增加其容量
- 队列满了之后再添加会导致操作阻塞,向空队列提取元素也会被阻塞
- 支持对等待的生产者线程和使用者线程进行排序的可选公平策略
- 采用ReentrantLock + Condition实现线程安全和阻塞
- 生产者和消费者公用一把锁
- 支持阻塞方法和 非阻塞方法
-
二 属性
/** The queued items 存放元素的数组 */
final Object[] items;
/** items index for next take, poll, peek or remove 出队索引*/
int takeIndex;
/** items index for next put, offer, or add 入队索引*/
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
/**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
transient Itrs itrs = null;
三 构造方法
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 这里是是否按照FIFO的策略进行访问 底层ReentrantLock 的实现则为 如果是true 则当前线程加入到等待队列中 否则直接CAS设置
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
四 添加
```java //调用offer
public boolean add(E e) {return super.add(e);
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列满了则插入失败
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
//真正的插入元素 private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
//这里到队尾之后再从数组头开始 外边进来已经进行了大小判断 超过数组大小不会再进来了
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
//这个当队列满的时候会进行等待 而不是直接返回 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
//插入的时候添加等待时间 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
<a name="kNREb"></a>
# 五 获取
```java
//这个区别于下边的是会直接返回的
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//这个是当队列为空的时候会进入等待阶段
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//一直等待下去直到有元素位置
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
// 这个会等待指定的时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
//真正的出队操作
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
六 删除
//删除的话会循环数组 找到相等的元素然后删除
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
//如果删除的是头结点则直接删除
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
//元素从删除的位置开始 向putIndex 挨个前移
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}