一、阻塞队列的常见方法
就拿常见的ArrayBlockingQueue来说明:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
移除 | remove(e) | poll(e) | take(e) | poll(e, timeout, unit) |
检查 | element(e) | peek(e) | / | / |
可以看到仅仅提供的插入方式就有三个,他们的区别是:
1、offer(e) 方法,有返回值,但是不抛出异常。当队列没有满的时候,成功插入元素后,直接返回true;否则,当队列已满的时候,插入元素失败,直接返回false。此方法不会阻塞线程。
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
2、add(e) 方法,有返回值,当队列没有满的时候,成功插入元素后,直接返回true;否则,当队列满的时候,插入元素失败,并不是返回false,而是抛出IllegalStateException。此方法不会阻塞线程。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
3、put(e) 方法,有返回值,当队列没有满的时候,成功插入元素后,直接返回false;否则,当队列满的时候,一直阻塞到能够成功插入元素为止。此方法会阻塞线程,并且会响应线程中断。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
它是如何实现阻塞线程的呢?可以看到是通过Condition的await方法的,底层还是通过LockSupport.park实现的。
二、阻塞队列的实现目标
1、插入元素,从队列尾部插入元素。
a、成功插入元素后,需要通知因为队列为空导致阻塞的线程(通过take移除数据)。
b、当队列已满时,没有空间可以存储新的元素,需要阻塞线程,直到有空间能够成功插入元素为止。
2、移除元素,从队列头部移除元素。
a、成功移除元素后,需要通知因为队列满了导致插入阻塞的线程(通过put插入元素)。
b、当队列为空时,没有数据可以移除,这个时候需要阻塞线程,直到其他线程成功插入元素为止。
思考:
1、当队列已满,如果这时还有线程继续往队列插入元素,那么这些线程将会阻塞,是不是需要一个数据结构将这些线程保存起来?当有其他线程将队列中元素成功移除,再尝试通知数据结构中保存的线程?
2、当队列为空,如果这时还有线程继续移除线程中的元素,那么这些线程将会阻塞,是不是也同样需要一个数据结构将这些线程保存起来?当有其他线程成功插入元素时,再尝试通知数据结构中保存的线程?
3、阻塞或者通知线程的方式有哪些?wait/notify,LockSupport.park/unpark,还有Condition的await/signal。
三、阻塞队列的实现
其实,二中分析出的数据结构可以使用数组或者单向链表来实现,并且需要两个,大概率可能通过Condition构造的数据结构实现。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
}
可以看出:
a、底层是通过一个数组结构来实现队列的。
b、默认使用的非公平锁ReentrantLock来保证线程安全的。
c、构造了两个Condition,即两个队列。
1、分析put过程,每次put都会尝试通知notEmpty。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果数组满了,则线程进入notFull,等待数组不满时系统的通知
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
注意在while循环中挂起,可以防止伪唤醒,继续看enqueue(e)。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// 在putIndex处插入元素
items[putIndex] = x;
// 插入元素后,putIndex加1,并且当putIndex等于数组长度的时候,重制putIndex
if (++putIndex == items.length)
putIndex = 0;
count++;
// 尝试唤醒notEmpty中的线程,这些线程就会进入AQS队列,并等待系统唤醒
notEmpty.signal();
}
2、分析take过程,每次take元素都会通知notFull。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果数组为空的,则线程进入notEmpty队列,等待数组不为空时系统的唤醒
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
注意在while循环中阻塞,会防止伪唤醒,继续看dequeue(e)。
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 暂存takeIndex处的元素
E x = (E) items[takeIndex];
//并将takeIndex处元素置空
items[takeIndex] = null;
//taskIndex加1,并且takeIndex等于数组长度,重制takeIndex
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 通知notFull对应的线程,这些线程会进入AQS队列,等待系统的唤醒
notFull.signal();
return x;
}
四、用图说明
1、插入元素的时候,是在队列尾部插入的。
2、移除元素的时候,是从队列的头部取出元素的。