一 特性
- 数组支持的有界队列
- 按照FIFO(先进先出) 对元素进行排序
- 一旦创建则不能增加其容量
- 队列满了之后再添加会导致操作阻塞,向空队列提取元素也会被阻塞
- 支持对等待的生产者线程和使用者线程进行排序的可选公平策略
- 采用ReentrantLock + Condition实现线程安全和阻塞
- 生产者和消费者公用一把锁
- 支持阻塞方法和 非阻塞方法
-
二 属性
/** The queued items 存放元素的数组 */final Object[] items;/** items index for next take, poll, peek or remove 出队索引*/int takeIndex;/** items index for next put, offer, or add 入队索引*/int putIndex;/** Number of elements in the queue */int count;/** Concurrency control uses the classic two-condition algorithm* found in any textbook.*//** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;/*** Shared state for currently active iterators, or null if there* are known not to be any. Allows queue operations to update* iterator state.*/transient Itrs itrs = null;
三 构造方法
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];// 这里是是否按照FIFO的策略进行访问 底层ReentrantLock 的实现则为 如果是true 则当前线程加入到等待队列中 否则直接CAS设置lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}
四 添加
```java //调用offer
public boolean add(E e) {return super.add(e);
}
public boolean offer(E e) {
checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {//队列满了则插入失败if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}
}
//真正的插入元素 private void enqueue(E x) {
// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;//这里到队尾之后再从数组头开始 外边进来已经进行了大小判断 超过数组大小不会再进来了if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();
}
//这个当队列满的时候会进行等待 而不是直接返回 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
//插入的时候添加等待时间 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(e);return true;} finally {lock.unlock();}}
<a name="kNREb"></a># 五 获取```java//这个区别于下边的是会直接返回的public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}//这个是当队列为空的时候会进入等待阶段public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}//一直等待下去直到有元素位置public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}// 这个会等待指定的时间public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}}//真正的出队操作private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;}
六 删除
//删除的话会循环数组 找到相等的元素然后删除public boolean remove(Object o) {if (o == null) return false;final Object[] items = this.items;final ReentrantLock lock = this.lock;lock.lock();try {if (count > 0) {final int putIndex = this.putIndex;int i = takeIndex;do {if (o.equals(items[i])) {removeAt(i);return true;}if (++i == items.length)i = 0;} while (i != putIndex);}return false;} finally {lock.unlock();}}void removeAt(final int removeIndex) {// assert lock.getHoldCount() == 1;// assert items[removeIndex] != null;// assert removeIndex >= 0 && removeIndex < items.length;final Object[] items = this.items;//如果删除的是头结点则直接删除if (removeIndex == takeIndex) {// removing front item; just advanceitems[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();} else {// an "interior" remove// slide over all others up through putIndex.final int putIndex = this.putIndex;//元素从删除的位置开始 向putIndex 挨个前移for (int i = removeIndex;;) {int next = i + 1;if (next == items.length)next = 0;if (next != putIndex) {items[i] = items[next];i = next;} else {items[i] = null;this.putIndex = i;break;}}count--;if (itrs != null)itrs.removedAt(removeIndex);}notFull.signal();}
