一、阻塞队列的常见方法
就拿常见的ArrayBlockingQueue来说明:
| 方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
|---|---|---|---|---|
| 插入 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
| 移除 | remove(e) | poll(e) | take(e) | poll(e, timeout, unit) |
| 检查 | element(e) | peek(e) | / | / |
可以看到仅仅提供的插入方式就有三个,他们的区别是:
1、offer(e) 方法,有返回值,但是不抛出异常。当队列没有满的时候,成功插入元素后,直接返回true;否则,当队列已满的时候,插入元素失败,直接返回false。此方法不会阻塞线程。
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}}
2、add(e) 方法,有返回值,当队列没有满的时候,成功插入元素后,直接返回true;否则,当队列满的时候,插入元素失败,并不是返回false,而是抛出IllegalStateException。此方法不会阻塞线程。
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}
3、put(e) 方法,有返回值,当队列没有满的时候,成功插入元素后,直接返回false;否则,当队列满的时候,一直阻塞到能够成功插入元素为止。此方法会阻塞线程,并且会响应线程中断。
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}
它是如何实现阻塞线程的呢?可以看到是通过Condition的await方法的,底层还是通过LockSupport.park实现的。
二、阻塞队列的实现目标
1、插入元素,从队列尾部插入元素。
a、成功插入元素后,需要通知因为队列为空导致阻塞的线程(通过take移除数据)。
b、当队列已满时,没有空间可以存储新的元素,需要阻塞线程,直到有空间能够成功插入元素为止。
2、移除元素,从队列头部移除元素。
a、成功移除元素后,需要通知因为队列满了导致插入阻塞的线程(通过put插入元素)。
b、当队列为空时,没有数据可以移除,这个时候需要阻塞线程,直到其他线程成功插入元素为止。
思考:
1、当队列已满,如果这时还有线程继续往队列插入元素,那么这些线程将会阻塞,是不是需要一个数据结构将这些线程保存起来?当有其他线程将队列中元素成功移除,再尝试通知数据结构中保存的线程?
2、当队列为空,如果这时还有线程继续移除线程中的元素,那么这些线程将会阻塞,是不是也同样需要一个数据结构将这些线程保存起来?当有其他线程成功插入元素时,再尝试通知数据结构中保存的线程?
3、阻塞或者通知线程的方式有哪些?wait/notify,LockSupport.park/unpark,还有Condition的await/signal。
三、阻塞队列的实现
其实,二中分析出的数据结构可以使用数组或者单向链表来实现,并且需要两个,大概率可能通过Condition构造的数据结构实现。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** items index for next take, poll, peek or remove */int takeIndex;/** items index for next put, offer, or add */int putIndex;public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}}
可以看出:
a、底层是通过一个数组结构来实现队列的。
b、默认使用的非公平锁ReentrantLock来保证线程安全的。
c、构造了两个Condition,即两个队列。
1、分析put过程,每次put都会尝试通知notEmpty。
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 如果数组满了,则线程进入notFull,等待数组不满时系统的通知while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}
注意在while循环中挂起,可以防止伪唤醒,继续看enqueue(e)。
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;// 在putIndex处插入元素items[putIndex] = x;// 插入元素后,putIndex加1,并且当putIndex等于数组长度的时候,重制putIndexif (++putIndex == items.length)putIndex = 0;count++;// 尝试唤醒notEmpty中的线程,这些线程就会进入AQS队列,并等待系统唤醒notEmpty.signal();}
2、分析take过程,每次take元素都会通知notFull。
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 如果数组为空的,则线程进入notEmpty队列,等待数组不为空时系统的唤醒while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}
注意在while循环中阻塞,会防止伪唤醒,继续看dequeue(e)。
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")// 暂存takeIndex处的元素E x = (E) items[takeIndex];//并将takeIndex处元素置空items[takeIndex] = null;//taskIndex加1,并且takeIndex等于数组长度,重制takeIndexif (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();// 通知notFull对应的线程,这些线程会进入AQS队列,等待系统的唤醒notFull.signal();return x;}
四、用图说明
1、插入元素的时候,是在队列尾部插入的。
2、移除元素的时候,是从队列的头部取出元素的。
