概述

BlockingQueue是Queue的子类,叫做阻塞队列,是一个经常用于并发编程中的并发容器。阻塞队列广泛地在最常用的“生产者——消费者模型”中用作数据容器。这样,可以对各个模块的业务功能进行解耦,生产者将“生产”出来的数据放置在数据容器中,而消费者仅仅只需要在“数据容器”中进行获取数据即可,两种线程只需专注于自己的业务。

BlockingQueue提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

image.png

方法

  1. public interface BlockingQueue<E> extends Queue<E> {
  2. //添加一个元素。如果队列满,则抛出IllegalStateException异常
  3. boolean add(E e);
  4. //添加一个元素并返回true。如果队列满,返回false
  5. boolean offer(E e);
  6. //添加一个元素,如果队列满,则阻塞
  7. void put(E e) throws InterruptedException;
  8. boolean offer(E e, long timeout, TimeUnit unit)
  9. throws InterruptedException;
  10. //移出并返回头元素,如果队列空,则阻塞
  11. E take() throws InterruptedException;
  12. //移除并返回队列的头元素,如果队列空,则返回null
  13. E poll(long timeout, TimeUnit unit)
  14. throws InterruptedException;
  15. //返回队列剩余容量
  16. int remainingCapacity();
  17. //溢出并返回头元素,如果队列空,则抛出NoSuchElementException异常
  18. boolean remove(Object o);
  19. public boolean contains(Object o);
  20. int drainTo(Collection<? super E> c);
  21. int drainTo(Collection<? super E> c, int maxElements);
  22. }

BlockingQueue继承于Queue接口,其(Queue)基本操作有

  1. 添加元素

add(E e) :往队列插入数据,当队列满时,插入元素时会抛出IllegalStateException异常;
offer(E e):当往队列插入数据时,插入成功返回true,否则则返回false。当队列满时不会抛出异常;

  1. 删除元素

remove(Object o):从队列中删除数据,成功则返回true,否则为false
poll:删除数据,当队列为空时,返回null;

  1. 查看元素

element:获取队头元素,如果队列为空时则抛出NoSuchElementException异常;
peek:获取队头元素,如果队列为空则抛出NoSuchElementException异常

对于BlockingQueue

  1. 插入数据

put:添加元素,当队列满时,线程阻塞,直至队列有空余容量
offer(E e, long timeout, TimeUnit unit):添加元素,当队列满时,线程阻塞,直至队列有空余容量。与put方法不同的是,该方法会有一个超时时间,若超过当前给定的超时时间,插入数据的线程会退出;

  1. 删除数据

take():出队,当队列为空时,获取队头数据的线程会被阻塞;
poll(long timeout, TimeUnit unit):出队,当队列为空时,获取数据的线程会被阻塞,另外,如果被阻塞的线程超过了给定的时长,该线程会退出。

ArrayBlockingQueue

image.png

ArrayBlockingQueue是BlockingQueue的一个实现类,其使用的是数组来实现有界阻塞队列。

  1. /** 数组实现的队列 */
  2. final Object[] items;
  3. /** Number of elements in the queue */
  4. int count;
  5. /** Main lock guarding all access */
  6. final ReentrantLock lock;
  7. /** Condition for waiting takes */
  8. private final Condition notEmpty;
  9. /** Condition for waiting puts */
  10. private final Condition notFull;

从上面属性可以看出,ArrayBlockingQueue使用的数组来实现队列,另外采用的ReetrantLock锁来实现同步机制,采用Condition条件机制来实现线程通信。

此外,ArrayBlockingQueue的队列容量,即数组大小一旦设定便无法更改。

添加元素

主要留意offer()和put()两个添加元素的方法

  1. /**
  2. *添加元素,当队列满时,返回false,直至队列有空余容量。
  3. */
  4. public boolean offer(E e) {
  5. //检测元素非空
  6. checkNotNull(e);
  7. //创建一个lock
  8. final ReentrantLock lock = this.lock;
  9. //申请锁
  10. lock.lock();
  11. try {
  12. //如果当前队列内容已满,则直接返回false
  13. if (count == items.length)
  14. return false;
  15. else {
  16. //如果不为空,则执行入队操作,返回true
  17. enqueue(e);
  18. return true;
  19. }
  20. } finally {
  21. //释放锁
  22. lock.unlock();
  23. }
  24. }
  25. /**
  26. *添加元素,当队列满时,线程阻塞
  27. */
  28. public void put(E e) throws InterruptedException {
  29. //检测元素非空
  30. checkNotNull(e);
  31. final ReentrantLock lock = this.lock;
  32. //申请可中断锁
  33. lock.lockInterruptibly();
  34. try {
  35. //如果队列已满,调用await进入阻塞状态
  36. while (count == items.length)
  37. notFull.await();
  38. //未满,入队
  39. enqueue(e);
  40. } finally {
  41. //释放锁
  42. lock.unlock();
  43. }
  44. }
  45. /**
  46. *入对操作,只有持有锁时才能进入该方法
  47. */
  48. private void enqueue(E x) {
  49. // assert lock.getHoldCount() == 1;
  50. // assert items[putIndex] == null;
  51. final Object[] items = this.items;
  52. items[putIndex] = x;
  53. //队列的操作,如果入队指针指到数组最后一位,重新回到第一位
  54. if (++putIndex == items.length)
  55. putIndex = 0;
  56. count++;
  57. //通知其他线程,已入队,队列不为空
  58. notEmpty.signal();
  59. }

删除元素

  1. /**
  2. *删除元素,如果队列为空,则阻塞
  3. */
  4. public E take() throws InterruptedException {
  5. final ReentrantLock lock = this.lock;
  6. //申请可中断锁
  7. lock.lockInterruptibly();
  8. try {
  9. //若队列为空,则await,线程阻塞
  10. while (count == 0)
  11. notEmpty.await();
  12. //出队,返回出队元素
  13. return dequeue();
  14. } finally {
  15. //释放锁
  16. lock.unlock();
  17. }
  18. }
  19. /**
  20. *删除元素,如果队列为空,返回null
  21. */
  22. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  23. long nanos = unit.toNanos(timeout);
  24. final ReentrantLock lock = this.lock;
  25. //申请可中断锁
  26. lock.lockInterruptibly();
  27. try {
  28. //如果队列为空,且没有设置超时时间,直接返回null
  29. while (count == 0) {
  30. if (nanos <= 0)
  31. return null;
  32. //否则,阻塞nanos秒
  33. nanos = notEmpty.awaitNanos(nanos);
  34. }
  35. //队列不空,出队
  36. return dequeue();
  37. } finally {
  38. //释放锁
  39. lock.unlock();
  40. }
  41. }
  42. private E dequeue() {
  43. // assert lock.getHoldCount() == 1;
  44. // assert items[takeIndex] != null;
  45. final Object[] items = this.items;
  46. @SuppressWarnings("unchecked")
  47. E x = (E) items[takeIndex];
  48. items[takeIndex] = null;
  49. //队列性质
  50. if (++takeIndex == items.length)
  51. takeIndex = 0;
  52. count--;
  53. if (itrs != null)
  54. itrs.elementDequeued();
  55. //通知其他线程,已出队,队列不满
  56. notFull.signal();
  57. return x;
  58. }

ArrayBlockingQueue默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到ArrayBlockingQueue。而非公平性则是指访问ArrayBlockingQueue的顺序不是遵守严格的时间顺序,有可能存在,一旦ArrayBlockingQueue可以被访问时,长时间阻塞的线程依然无法访问到ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量

其他实现类

LinkedBlockingQueue

LinkedBlockingQueue是用链表实现的有界阻塞队列,同样满足FIFO的特性,与ArrayBlockingQueue相比起来具有更高的吞吐量,为了防止LinkedBlockingQueue容量迅速增,损耗大量内存。通常在创建LinkedBlockingQueue对象时,会指定其大小,如果未指定,容量等于Integer.MAX_VALUE

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现compareTo()方法来指定元素排序规则,或者初始化时通过构造器参数Comparator来指定排序规则。

SynchronousQueue

SynchronousQueue每个插入操作必须等待另一个线程进行相应的删除操作,因此,SynchronousQueue实际上没有存储任何数据元素,因为只有线程在删除数据时,其他线程才能插入数据,同样的,如果当前有线程在插入数据时,线程才能删除数据。SynchronousQueue也可以通过构造器参数来为其指定公平性。

LinkedTransferQueue

LinkedTransferQueue是一个由链表数据结构构成的无界阻塞队列,由于该队列实现了TransferQueue接口,与其他阻塞队列相比主要有以下不同的方法:
transfer(E e)
如果当前有线程(消费者)正在调用take()方法或者可延时的poll()方法进行消费数据时,生产者线程可以调用transfer方法将数据传递给消费者线程。如果当前没有消费者线程的话,生产者线程就会将数据插入到队尾,直到有消费者能够进行消费才能退出;
tryTransfer(E e)
tryTransfer方法如果当前有消费者线程(调用take方法或者具有超时特性的poll方法)正在消费数据的话,该方法可以将数据立即传送给消费者线程,如果当前没有消费者线程消费数据的话,就立即返回false。因此,与transfer方法相比,transfer方法是必须等到有消费者线程消费数据时,生产者线程才能够返回。而tryTransfer方法能够立即返回结果退出。
tryTransfer(E e,long timeout,imeUnit unit)
与transfer基本功能一样,只是增加了超时特性,如果数据才规定的超时时间内没有消费者进行消费的话,就返回false。

LinkedBlockingDeque

LinkedBlockingDeque是基于链表数据结构的有界阻塞双端队列,如果在创建对象时为指定大小时,其默认大小为Integer.MAX_VALUE。与LinkedBlockingQueue相比,主要的不同点在于,LinkedBlockingDeque具有双端队列的特性。

DelayQueue

DelayQueue是一个存放实现Delayed接口的数据的无界阻塞队列,只有当数据对象的延时时间达到时才能插入到队列进行存储。如果当前所有的数据都还没有达到创建时所指定的延时期,则队列没有队头,并且线程通过poll等方法获取数据元素则返回null。所谓数据延时期满时,则是通过Delayed接口的getDelay(TimeUnit.NANOSECONDS)来进行判定,如果该方法返回的是小于等于0则说明该数据元素的延时期已满。