ArrayBlockingQueue 由数组实现的有界队列,是线程安全的,容量大小在创建对象时已定义好
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {
实现了BlockingQueue接口,该接口定义了队列的入队出队的方法
继承AbstractQueue父类,实现入队出队的基本操作
构造方法
创建ArrayBlockingQueue一定要指定容量
//必须传入容量,可以控制重入锁是公平还是非公平public ArrayBlockingQueue(int capacity) {this(capacity, false);}
还可以设置锁是公平还是非公平,默认是非公平
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();//创建数组this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}
还可以将已有的集合转化为ArrayBlockingQueue
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {//创建ArrayBlockingQueuethis(capacity, fair);final ReentrantLock lock = this.lock;lock.lock(); //创建ArrayBlockingQueue没有并发问题,加锁是为了保证可见性,防止指令重排try {int i = 0;try {for (E e : c) {checkNotNull(e);items[i++] = e;}//如果超出创建的ArrayBlockingQueue长度会抛异常} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}count = i;putIndex = (i == capacity) ? 0 : i;} finally {lock.unlock();}}
核心成员变量
由于ArrayBlockingQueue是基于数组实现的,为了保证Queue先进先出的特点,以及避免数组由于元素的出队和入队频繁地进行移位操作,ArrayBlockingQueue通过 count 属性记录当前数组元素个数,通过 putIndex 和 takeIndex 来往数组添加、取出元素,每次入队出队操作不用再移动元素位置。
//存放元素的数组final Object[] items;//取指针,记录下一次取操作的位置int takeIndex;//放指针,记录下一次放操作的未知int putIndex;//数组元素个数int count;//锁final ReentrantLock lock;//等待出队的条件(队列不为空)private final Condition notEmpty;//等待入队的条件(队列未满)private final Condition notFull;
每当插入一个元素,putIndex自增1
每当取出一个元素,takeIndex自增1
如果 takeIndex 和 putIndex 重叠,说明队列为空
生产者-消费者模型
我们创建一个阻塞队列,生产者调用 put() 方法负责往队列中添加元素,消费者调用 take() 方法负责往队列中取出元素。阻塞队列是线程安全的,只有抢到锁的线程才能操作队列。所以同一时间内,生产者和消费者只能有一个可以往队列添加元素或删除元素。
如果出现这种情况,当生产者(消费者)获取到了锁,往队列中放(取)元素时,而此时队列放满了(为空),此时生产者(消费者)应该释放锁,让消费者(生产者)抢锁并往队列里面取(放)元素,从而使队列不为满(空),后续可以进行对应操作。基于AQS的特性,我们很容易可以想到,可以直接将生产者(消费者)线程放入CLH队列中并阻塞,等待下一次抢锁。但是如果后续一直是生产者(消费者)来抢锁,抢到锁后情况和之前一样,又重新释放锁。
为了解决上面这中情况,利用到了 Array BlockingQueue 中的两个 Condition 属性(notEmpty 和 notFull)和 AQS 中的条件等待队列。
加入条件等待队列
通过调用 Condition.await() 方法加入条件等待队列,主要干三件事
- 将当前线程加入条件等待队列
- 释放当前线程获得的锁,并唤醒CLH队头节点
阻塞当前线程
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//创建节点并加入条件等待队列Node node = addConditionWaiter();//释放锁int savedState = fullyRelease(node);int interruptMode = 0;//如果当前节点不再CLH队列中while (!isOnSyncQueue(node)) {//将当前线程阻塞LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}//被唤醒后调用acquireQueued()继续尝试获取锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
创建节点并加入条件等待队列
private Node addConditionWaiter() {//获取尾节点Node t = lastWaiter;//如果尾节点的状态不为CONDITIONif (t != null && t.waitStatus != Node.CONDITION) {//清除条件队列中所有条件不为CONDITION的节点unlinkCancelledWaiters();t = lastWaiter;}//创建Node节点,并将节点状态设置为CONDITIONNode node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;//尾节点指向新加入的节点lastWaiter = node;return node;}

释放锁
final int fullyRelease(Node node) {boolean failed = true;try {//获取当前state的值int savedState = getState();//释放锁,就算是重入锁也是直接释放(state - state = 0)if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}-------------------------------------------public final boolean release(int arg) {//释放锁if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)//唤醒CLH队头节点unparkSuccessor(h);return true;}return false;}-----------------------------------------------protected final boolean tryRelease(int releases) {//重新将state设置为0int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;//将当前获取锁线程设置为nullsetExclusiveOwnerThread(null);}setState(c);return free;}
从条件等待队列出队
将条件等待队列中的头节点移到CLH队列中,后续可以争夺锁
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();//获取条件等待队列头节点Node first = firstWaiter;if (first != null)doSignal(first);}-----------------------------------------private void doSignal(Node first) {do {//firstWaiter指针指向头结点的下一个节点if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;//头节点的nextWaiter指针设置为nullfirst.nextWaiter = null;//通过while循环,将条件等待队列的头节点移到CLH队列} while (!transferForSignal(first) &&(first = firstWaiter) != null);}-------------------------------------------------

final boolean transferForSignal(Node node) {//将节点的状态由CONDITION设置为0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//设置失败直接return falsereturn false;//将节点加入CLH队列,返回旧的tail节点Node p = enq(node);int ws = p.waitStatus;//如果节点时CANCELLED状态或者旧的tail节点waitStatus设置失败,则直接唤醒线程(做异常处理)if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
入队
入队通过 putIndex 获取当前插入的位置
private void enqueue(E x) {//获取存放元素的数组final Object[] items = this.items;//通过putIndex添加元素items[putIndex] = x;//如果putIndex等于数组最后一个元素下标,putIndex重置为0if (++putIndex == items.length)putIndex = 0;count++;//将notEmpty条件等待队列中的队头元素放入CLH队列notEmpty.signal();}
add()
如果队列满了,调用add()方法插入元素会抛异常(调用父类AbstractQueue的add方法)
public boolean add(E e) {//调用了父类return super.add(e);}-------------------------------//最终调用ArrayBlockingQueue的offer()方法,如果失败则抛异常public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}
offer()
入队成功返回true,入队失败返回false
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;//加锁lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}}
offer(e, time, unit)
如果队列满了,则阻塞 timeout 时长后再次尝试,如果队列还是已满,则返回false
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;//如果队列已满,则阻塞timeout时长,该方法返回 timeout-实际阻塞时间//阻塞timeout时长后唤醒,while循环重新判断队列是否已满,是则返回false,入队失败nanos = notFull.awaitNanos(nanos);}enqueue(e);return true;} finally {lock.unlock();}}
put()
如果队列已满,则加入条件等待队列,直到出队操作调用 signal() 方法将节点加入CLH队列,后续争夺锁完成入队操作。
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)//如果队列已满,则加入条件等待队列中notFull.await();enqueue(e);} finally {lock.unlock();}}
出队
通过 takeIndex 获取当前出队元素的位置
private E dequeue() {final Object[] items = this.items;E x = (E) items[takeIndex];items[takeIndex] = null;//如果takeIndex等于数组最后一个元素下标,putIndex重置为0if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;}
出队也有四种方法,每种方式表现都不一样
take()
如果队列为空,则加入条件等待队列,直到入队操作调用 signal() 方法将节点加入CLH队列,后续争夺锁完成入队操作。
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)//如果队列为空,则加入条件等待队列阻塞notEmpty.await();return dequeue();} finally {lock.unlock();}}
poll()
如果队列为空,则返回null
public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {//如果队列为空,则返回nullreturn (count == 0) ? null : dequeue();} finally {lock.unlock();}}
poll(time, unit)
如果队列为空,阻塞timeout时长,再次判断是否为空,是则返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;//如果队列为空,则阻塞timeout时长//阻塞timeout时长后唤醒,while循环重新判断队列是否为空,是则返回nullnanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}}
remove()
移除指定元素,通过 takeIndex 开始遍历,直到找到指定元素
public boolean remove(Object o) {if (o == null) return false;final Object[] items = this.items;final ReentrantLock lock = this.lock;lock.lock();try {if (count > 0) {//获取putIndexfinal int putIndex = this.putIndex;//获取takeIndexint i = takeIndex;do {//找到指定元素if (o.equals(items[i])) {//移除该元素removeAt(i);return true;}if (++i == items.length)i = 0;} while (i != putIndex);}return false;} finally {lock.unlock();}}
删除前还要重新调整 putIndex 位置
- 如果删除元素的下标等于 takeIndex,只需将takeIndex自增1
如果删除元素的下标不等于 takeIndex,则将 removeIndex 和 putIndex之间的元素相继往后移动一个单位,最后将 item[putIndex-1]的元素置为null,putIndex也向后移动一个单位
void removeAt(final int removeIndex) {final Object[] items = this.items;//如果要删除元素的下标等于 takeIndexif (removeIndex == takeIndex) {items[takeIndex] = null;//takeIndex自增1,如果自增后等于数组长度,重置为0if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();} else {//获取putIndexfinal int putIndex = this.putIndex;//将 removeIndex 和 putIndex-1之间的元素往后移一位for (int i = removeIndex;;) {int next = i + 1;if (next == items.length)next = 0;if (next != putIndex) {items[i] = items[next];i = next;} else {//将对应数组下标元素置为nullitems[i] = null;//putIndex后移一位this.putIndex = i;break;}}count--;if (itrs != null)itrs.removedAt(removeIndex);}notFull.signal();}

