一、阻塞队列的常见方法

就拿常见的ArrayBlockingQueue来说明:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e, timeout, unit)
移除 remove(e) poll(e) take(e) poll(e, timeout, unit)
检查 element(e) peek(e) / /

可以看到仅仅提供的插入方式就有三个,他们的区别是:
1、offer(e) 方法,有返回值,但是不抛出异常。当队列没有满的时候,成功插入元素后,直接返回true;否则,当队列已满的时候,插入元素失败,直接返回false。此方法不会阻塞线程。

  1. public boolean offer(E e) {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lock();
  5. try {
  6. if (count == items.length)
  7. return false;
  8. else {
  9. enqueue(e);
  10. return true;
  11. }
  12. } finally {
  13. lock.unlock();
  14. }
  15. }

2、add(e) 方法,有返回值,当队列没有满的时候,成功插入元素后,直接返回true;否则,当队列满的时候,插入元素失败,并不是返回false,而是抛出IllegalStateException。此方法不会阻塞线程。

  1. public boolean add(E e) {
  2. if (offer(e))
  3. return true;
  4. else
  5. throw new IllegalStateException("Queue full");
  6. }

3、put(e) 方法,有返回值,当队列没有满的时候,成功插入元素后,直接返回false;否则,当队列满的时候,一直阻塞到能够成功插入元素为止。此方法会阻塞线程,并且会响应线程中断。

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. while (count == items.length)
  7. notFull.await();
  8. enqueue(e);
  9. } finally {
  10. lock.unlock();
  11. }
  12. }

它是如何实现阻塞线程的呢?可以看到是通过Condition的await方法的,底层还是通过LockSupport.park实现的。

二、阻塞队列的实现目标

1、插入元素,从队列尾部插入元素。
a、成功插入元素后,需要通知因为队列为空导致阻塞的线程(通过take移除数据)。
b、当队列已满时,没有空间可以存储新的元素,需要阻塞线程,直到有空间能够成功插入元素为止。
2、移除元素,从队列头部移除元素。
a、成功移除元素后,需要通知因为队列满了导致插入阻塞的线程(通过put插入元素)。
b、当队列为空时,没有数据可以移除,这个时候需要阻塞线程,直到其他线程成功插入元素为止。

思考:
1、当队列已满,如果这时还有线程继续往队列插入元素,那么这些线程将会阻塞,是不是需要一个数据结构将这些线程保存起来?当有其他线程将队列中元素成功移除,再尝试通知数据结构中保存的线程?
2、当队列为空,如果这时还有线程继续移除线程中的元素,那么这些线程将会阻塞,是不是也同样需要一个数据结构将这些线程保存起来?当有其他线程成功插入元素时,再尝试通知数据结构中保存的线程?
3、阻塞或者通知线程的方式有哪些?wait/notify,LockSupport.park/unpark,还有Condition的await/signal。

三、阻塞队列的实现

其实,二中分析出的数据结构可以使用数组或者单向链表来实现,并且需要两个,大概率可能通过Condition构造的数据结构实现。

  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. /** items index for next take, poll, peek or remove */
  4. int takeIndex;
  5. /** items index for next put, offer, or add */
  6. int putIndex;
  7. public ArrayBlockingQueue(int capacity, boolean fair) {
  8. if (capacity <= 0)
  9. throw new IllegalArgumentException();
  10. this.items = new Object[capacity];
  11. lock = new ReentrantLock(fair);
  12. notEmpty = lock.newCondition();
  13. notFull = lock.newCondition();
  14. }
  15. }

可以看出:
a、底层是通过一个数组结构来实现队列的。
b、默认使用的非公平锁ReentrantLock来保证线程安全的。
c、构造了两个Condition,即两个队列。

1、分析put过程,每次put都会尝试通知notEmpty。

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. // 如果数组满了,则线程进入notFull,等待数组不满时系统的通知
  7. while (count == items.length)
  8. notFull.await();
  9. enqueue(e);
  10. } finally {
  11. lock.unlock();
  12. }
  13. }

注意在while循环中挂起,可以防止伪唤醒,继续看enqueue(e)。

  1. private void enqueue(E x) {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[putIndex] == null;
  4. final Object[] items = this.items;
  5. // 在putIndex处插入元素
  6. items[putIndex] = x;
  7. // 插入元素后,putIndex加1,并且当putIndex等于数组长度的时候,重制putIndex
  8. if (++putIndex == items.length)
  9. putIndex = 0;
  10. count++;
  11. // 尝试唤醒notEmpty中的线程,这些线程就会进入AQS队列,并等待系统唤醒
  12. notEmpty.signal();
  13. }

2、分析take过程,每次take元素都会通知notFull。

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. // 如果数组为空的,则线程进入notEmpty队列,等待数组不为空时系统的唤醒
  6. while (count == 0)
  7. notEmpty.await();
  8. return dequeue();
  9. } finally {
  10. lock.unlock();
  11. }
  12. }

注意在while循环中阻塞,会防止伪唤醒,继续看dequeue(e)。

  1. private E dequeue() {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[takeIndex] != null;
  4. final Object[] items = this.items;
  5. @SuppressWarnings("unchecked")
  6. // 暂存takeIndex处的元素
  7. E x = (E) items[takeIndex];
  8. //并将takeIndex处元素置空
  9. items[takeIndex] = null;
  10. //taskIndex加1,并且takeIndex等于数组长度,重制takeIndex
  11. if (++takeIndex == items.length)
  12. takeIndex = 0;
  13. count--;
  14. if (itrs != null)
  15. itrs.elementDequeued();
  16. // 通知notFull对应的线程,这些线程会进入AQS队列,等待系统的唤醒
  17. notFull.signal();
  18. return x;
  19. }

四、用图说明

1、插入元素的时候,是在队列尾部插入的。
ArrayBlockingQueue#put.png

2、移除元素的时候,是从队列的头部取出元素的。
ArrayBlockingQueue#take.png