BlockingQueue 是一种具有阻塞功能的容器。当入队时,若队列已满,则阻塞调用者;出队时,若队列已空,则阻塞调用者。

BlockingQueue 的特点

  1. 阻塞添加

阻塞添加是指当阻塞队列元素已满时,队列会阻塞添加元素的线程,直到队列元素不满时,才重新唤醒线程执行元素添加操作。

  1. 阻塞删除

阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空时,才重新唤醒删除线程,再执行删除操作。

BlockingQueue 主要方法

  1. public interface BlockingQueue<E> extends Queue<E> {
  2. /**
  3. * 添加元素到队列的尾部,成功则返回true,若队列已满,抛出IllegalArgumentException异常
  4. */
  5. boolean add(E e);
  6. /**
  7. * 添加元素到队列的尾部,成功则返回true,若队列已满,则返回false
  8. */
  9. boolean offer(E e);
  10. /**
  11. * 限时阻塞式添加:添加元素到队列尾部,如果队列已满,则一直等待,直到超时返回false
  12. *
  13. * 该方法可中断
  14. */
  15. boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
  16. /**
  17. * 阻塞式添加:添加元素到队列尾部,如果队列已满,则一直等待
  18. */
  19. void put(E e) throws InterruptedException;
  20. /**
  21. * 阻塞式删除:移除队列头部元素,如果队列已空,则一直等待
  22. *
  23. * 该方法可中断
  24. */
  25. E take() throws InterruptedException;
  26. /**
  27. * 非阻塞式删除:移除队列头部元素,如果没有获取到元素,则返回null
  28. *
  29. * 该方法可中断
  30. */
  31. E poll();
  32. /**
  33. * 非阻塞式删除:移除队列头部元素,如在指定时间内没有获取到元素,则返回null
  34. *
  35. * 该方法可中断
  36. */
  37. E poll(long timeout, TimeUnit unit) throws InterruptedException;
  38. /**
  39. * 获取但不移除队列头部元素,没有则抛出NoSuchElementException异常
  40. */
  41. E element();
  42. /**
  43. * 获取但不移除队列头部元素,没有则返回null
  44. */
  45. E peek();
  46. /**
  47. * 获取并移除队列头部元素,没有则抛出NoSuchElementException异常
  48. */
  49. E remove();
  50. /**
  51. * 返回此队列在理想情况下(在没有内存或资源限制的情况下)可以在不阻塞的情况下接受的附加元素
  52. * 的数量,如果没有内在限制,则返回 {@code Integer.MAX_VALUE}。
  53. *
  54. * 注意,不能总是通过检查 {@code remainingCapacity} 来判断插入元素的尝试是否会成功,因为可
  55. * 能是另一个线程即将插入或删除元素的情况。
  56. */
  57. int remainingCapacity();
  58. /**
  59. * 移除队列头部元素,返回值表示是否移除成功
  60. */
  61. boolean remove(Object o);
  62. /**
  63. * 队列是否包含此元素
  64. */
  65. public boolean contains(Object o);
  66. }

添加类方法

boolean add(E e) 添加成功则返回 true,失败就抛出 IllegalStateException 异常
boolean offer(E e) 成功则返回 true,如果此队列已满,就返回 false
void put(E e) throws InterruptedException 将元素添加至此队列的尾部,如果该队列已满,就一直阻塞

删除类方法

E poll() 获取并移除此队列的头元素,若队列为空,则返回null
E take() throws InterruptedException 获取并移除此队列的头元素,若没有元素,则一直阻塞
E remove() 获取并移除队列头部元素,没有则抛出NoSuchElementException
boolean remove(Object o) 移除指定元素,成功则返回true,失败则返回false

获取类方法

E element() 获取但不移除此队列的头元素,没有元素则抛出NoSuchElementException
E peek() 获取但不移除此队列的头元素,若队列为空,则返回null

BlockingQueue常用实现类

ArrayBlockingQueue

ArrayBlockingQueue 是一个常用的阻塞队列,是基于数组实现的,其内部使用一个定长数组来存储元素。除了一个定长数组外,ArrayBlockingQueue 内部还保存着两个整型变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue 的添加和删除操作共用同一个锁对象,由此意味着添加和删除无法并行运行,这点不同于 LinkedBlockingQueue。

ArrayBlockingQueue在添加或删除元素时不会产生或销毁任何额外的 Node 实例,而 LinkedBlockingQueue 会生成一个额外的 Node 实例。在长时间、高并发处理大批量数据的场景中, LinkedBlockingQueue 产生的额外 Node 实例会加大系统的 GC 压力。

LinkedBlockingQueue

LinkedBlockingQueue 是基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(该队列由一个链表构成)。LinkedBlockingQueue 对于添加和删除元素分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下,生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

注意:LinkedBlockingQueue 的默认容量为 Integer.MAX_VALUE,意味着如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就已经被消耗殆尽了。

DelayQueue

DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中添加数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

PriorityBlockingQueue

基于优先级的阻塞队列和 DelayQueue 类似,PriorityBlockingQueue 并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。在使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。

SynchronousQueue

一种无缓冲的等待队列,相对于有缓冲的阻塞队列(如 LinkedBlockingQueue)来说,SynchronousQueue 少了中间缓冲区的环节。所以对单个消息的响应要求高的场景可以使用 SynchronousQueue。

SynchronousQueue 有公平和非公平两种声明方式。公平模式的 SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体现出整体的公平特征。非公平模式(默认情况)的 SynchronousQueue 采用非公平锁,同时配合一个 LIFO 堆栈(TransferStack 内部实例)来管理多余的生产者和消费者。

注意:对于非公平模式来说,如果生产者的速度远大于消费者处理速度,就很容易出现线程饥渴的情况,即可能出现某些生产者或者消费者的数据永远都得不到处理。