Java 并发包里面 Queue 这类并发容器是最复杂的,可以从以下两个维度来分类。一是阻塞与非阻塞,所谓阻塞指的是当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。

Java 并发包里阻塞队列都用 Blocking 关键字标识,并且单端队列使用 Queue 标识,而双端队列使用 Deque 标识。BlockingQueue 支持阻塞式的插入和移除元素,常用于生产者和消费者的场景。BlockingQueue 有多种不同的方法用于插入和移除队列元素,如果请求的操作不能得到立即执行的话,每个方法的表现也会不同。
BlockingQueue - 图1

常用方法

在常规队列操作基础上,Blocking 意味着其提供了特定的等待性操作,获取时(take)等待元素进队,或者插入时(put)等待队列出现空位。

  1. public interface BlockingQueue<E> extends Queue<E> {
  2. // 立即将指定的元素插入此队列,成功则返回true
  3. // 如果由于容量限制而无法添加元素,则抛出IllegalStateException
  4. // 使用容量受限的队列时,通常最好使用offer方法
  5. boolean add(E e);
  6. // 立即将指定的元素插入此队列,成功则返回true
  7. // 如果当前没有可用空间,则返回false
  8. boolean offer(E e);
  9. // 将指定元素插入此队列,如果当前没有可用空间,则阻塞等待直到有空间可用
  10. // 如果在等待期间被中断,则抛出InterruptedException异常
  11. void put(E e) throws InterruptedException;
  12. // 将指定的元素插入此队列,如果当前没有可用空间则会等待直到超时或被中断
  13. boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
  14. // 获取并删除此队列的头,如果当前队列中没有元素,则阻塞等待直到有队列元素可以出队
  15. E take() throws InterruptedException;
  16. // 获取并删除此队列的头,如果当前队列中没有元素,则等待指定时间
  17. E poll(long timeout, TimeUnit unit) throws InterruptedException;
  18. // 从队列中删除指定元素的单个实例,如果存在则将其删除
  19. // 如果此队列包含一个或多个此类元素,则删除其中一个元素
  20. boolean remove(Object o);
  21. ......
  22. }

ArrayBlockingQueue

ArrayBlockingQueue 是一个底层用数组实现的有界阻塞队列,其内部以 final 的数组来保存数据,数组的大小就决定了队列的边界,所以我们在创建 ArrayBlockingQueue 时都要指定容量。内部还使用 ReentrantLock 实现线程安全的入队和出队操作,使用 Condition 实现等待-通知机制。

  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. // 保存队列元素的数组
  4. final Object[] items;
  5. // items index for next take, poll, peek or remove
  6. int takeIndex;
  7. // items index for next put, offer, or add
  8. int putIndex;
  9. // 队列保存的元素个数
  10. int count;
  11. // 通过可重入锁实现线程安全的入队、出队和等待-通知机制
  12. final ReentrantLock lock;
  13. private final Condition notEmpty;
  14. private final Condition notFull;
  15. }

1. put

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. // 如果队列已满,则让当前线程在notFull上等待
  7. while (count == items.length)
  8. notFull.await();
  9. enqueue(e);
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. private void enqueue(E x) {
  15. final Object[] items = this.items;
  16. items[putIndex] = x;
  17. if (++putIndex == items.length)
  18. putIndex = 0;
  19. count++;
  20. // 有元素入队,则进行一次notEmpty的通知,通知等待出队的线程
  21. notEmpty.signal();
  22. }

2. take

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. // 如果队列为空,则让当前线程在notEmpty上等待
  6. while (count == 0)
  7. notEmpty.await();
  8. return dequeue();
  9. } finally {
  10. lock.unlock();
  11. }
  12. }
  13. private E dequeue() {
  14. final Object[] items = this.items;
  15. E x = (E) items[takeIndex];
  16. items[takeIndex] = null;
  17. if (++takeIndex == items.length)
  18. takeIndex = 0;
  19. count--;
  20. if (itrs != null)
  21. itrs.elementDequeued();
  22. // 有元素出队,则进行一次notFull的通知,通知等待入队的线程
  23. notFull.signal();
  24. return x;
  25. }

LinkedBlockingQueue

LinkedBlockingQueue 是一个底层用链表实现的无界阻塞队列,队列的最大长度默认为 Integer.MAX_VALUE,但我们可以在创建队列时指定容量。

其内部也是使用 ReentrantLock 实现线程安全的入队和出队操作,使用 Condition 实现等待-通知机制。但不同的是,其内部针对 takeput 操作使用了不同的锁,使得这两个操作之间并不冲突。

  1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. static class Node<E> {
  4. E item;
  5. Node<E> next;
  6. Node(E x) { item = x; }
  7. }
  8. // 当前元素的数量
  9. private final AtomicInteger count = new AtomicInteger();
  10. // 头、尾节点
  11. transient Node<E> head;
  12. private transient Node<E> last;
  13. // take方法需要持有takeLock
  14. private final ReentrantLock takeLock = new ReentrantLock();
  15. private final Condition notEmpty = takeLock.newCondition();
  16. // put方法需要持有putLock
  17. private final ReentrantLock putLock = new ReentrantLock();
  18. private final Condition notFull = putLock.newCondition();
  19. }

takeLock 和 putLock 分别在 take() 和 put() 方法中使用。因为 take 和 put 彼此相互独立,不存在锁竞争关系,因此只需要在 take 和 take 间、put 和 put 间分别对 takeLock 和 putLock 进行加锁即可。从而减少了锁竞争的可能性,提高了并发度。

1. put

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();
  3. int c = -1;
  4. Node<E> node = new Node<E>(e);
  5. // 不能有两个线程同时存数据
  6. final ReentrantLock putLock = this.putLock;
  7. final AtomicInteger count = this.count;
  8. putLock.lockInterruptibly();
  9. try {
  10. // 如果队列已经满了,则等待
  11. while (count.get() == capacity) {
  12. notFull.await();
  13. }
  14. // 插入数据
  15. enqueue(node);
  16. // 数量加1,原子操作,因为可能会和take同时访问count
  17. c = count.getAndIncrement();
  18. // 有足够的空间,通知其他put线程
  19. if (c + 1 < capacity)
  20. notFull.signal();
  21. } finally {
  22. putLock.unlock();
  23. }
  24. // 插入成功后,通知take()方法取数据
  25. if (c == 0)
  26. signalNotEmpty();
  27. }

2. take

  1. public E take() throws InterruptedException {
  2. E x;
  3. int c = -1;
  4. final AtomicInteger count = this.count;
  5. // 不能有两个线程同时取数据
  6. final ReentrantLock takeLock = this.takeLock;
  7. takeLock.lockInterruptibly();
  8. try {
  9. // 如果当前没有可用数据,就一直等待
  10. while (count.get() == 0) {
  11. notEmpty.await();
  12. }
  13. // 取第一个数据
  14. x = dequeue();
  15. // 数量减1,原子操作,因为会和put同时访问count
  16. c = count.getAndDecrement();
  17. // 通知其他take方法操作
  18. if (c > 1)
  19. notEmpty.signal();
  20. } finally {
  21. takeLock.unlock();
  22. }
  23. if (c == capacity)
  24. // capacity为队列容量,默认为Integer.MAX_VALUE。
  25. // 如果在take前已达到了最大容量,则take后会有剩余空间,此时会通知put方法操作
  26. signalNotFull();
  27. return x;
  28. }

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。其内部采用平衡二叉树堆维护元素优先级,使用数组作为元素存储的数据结构。底层数组的最大长度默认为 Integer.MAX_VALUE - 8

队列元素默认采取自然顺序升序排列,不保证同优先级元素的顺序。也可以自定义队列元素实现 Comparable 接口的 compareTo() 方法或在初始化队列时传入 Comparator 来对元素排序。

  1. public class PriorityBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. // 默认的队列初始化大小
  4. private static final int DEFAULT_INITIAL_CAPACITY = 11;
  5. // 队列最大容量,减8是兼容某些VM分配数组的实现
  6. private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  7. // 队列
  8. private transient Object[] queue;
  9. // 队列元素数量
  10. private transient int size;
  11. private final ReentrantLock lock;
  12. private final Condition notEmpty;
  13. }

DelayQueue

DelayQueue 是一个支持延时获取元素的无界阻塞队列,其内部维护了一个 PriorityQueue 来实现延时。该队列中的保存的元素必须实现 Delayed 接口,通过该接口可以指定延迟时间。当从队列里获取元素时,如果元素没有到达延迟时间则阻塞当前线程,只有在延迟时间过后才能从队列中获取该元素。

  1. public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
  2. implements BlockingQueue<E> {
  3. private final transient ReentrantLock lock = new ReentrantLock();
  4. private final PriorityQueue<E> q = new PriorityQueue<E>();
  5. private final Condition available = lock.newCondition();
  6. }
  7. public interface Delayed extends Comparable<Delayed> {
  8. // 该方法返回当前元素还需要延长多长时间
  9. long getDelay(TimeUnit unit);
  10. }

DelayQueue 非常有用,可以将 DelayQueue 运用在以下应用场景:

  • 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。


  • 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就表示执行时间到了,可以开始执行了。

代码示例:

  1. public class DelayQueueDemo {
  2. public static void main(String[] args) {
  3. DelayQueue<User> queue = new DelayQueue<>();
  4. // 创建延迟任务
  5. Random random = new Random();
  6. for (int i = 0; i < 10; i++) {
  7. User user = new User("task:" + i, random.nextInt(500));
  8. queue.offer(user);
  9. }
  10. // 获取任务
  11. User user;
  12. try {
  13. for (int i = 0; i < 10; i++) {
  14. user = queue.take();
  15. System.out.println(user);
  16. }
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. @Getter
  22. @Setter
  23. private static class User implements Delayed {
  24. private final String name;
  25. // timestamp
  26. private final long delayTime;
  27. private final long expire;
  28. public User(String name, long delay) {
  29. this.name = name;
  30. this.delayTime = delay;
  31. this.expire = System.currentTimeMillis() + delay;
  32. }
  33. /**
  34. * 剩余时间=到期时间-当前时间
  35. */
  36. @Override
  37. public long getDelay(TimeUnit unit) {
  38. return unit.convert(expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  39. }
  40. @Override
  41. public int compareTo(Delayed o) {
  42. return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
  43. }
  44. @Override
  45. public String toString() {
  46. return "User{name=" + name + ", delayTime=" + delayTime + ", expire=" + expire + "}";
  47. }
  48. }
  49. }

SynchronousQueue

SynchronousQueue 是一个容量为 0 的阻塞队列,该队列本身并不存储元素,每一个 put 操作必须等待一个 take 操作,反之亦然。它内部维护了一个线程等待队列,保存等待线程及相关信息。

  1. public class SynchronousQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. // 线程等待队列
  4. private transient volatile Transferer<E> transferer;
  5. ......
  6. }

SynchronousQueue 将 put()take() 这两种功能不同的方法抽象成一个通用方法 Transferer.transfer(),从字面上看,就是数据传递的意思。当参数 e 为非空时,表示当前操作传递给一个消费者,如果为空,则表示当前操作需要请求一个数据。如果返回值不为空,则表示数据己经接受或者正常提供;如果为空则表示失败。

  1. abstract static class Transferer<E> {
  2. abstract E transfer(E e, boolean timed, long nanos);
  3. }

Transferer 是一个抽象类,有两个子类的实现,分别是 TransferQueue 和 TransferStack。前者是一个 FIFO 队列,维护了队列头和队列尾两个指针;而后者只维护了栈顶指针。这两个子类分别代表了公平和非公平模式,通过构造函数来进行指定。

  1. public SynchronousQueue() {
  2. this(false);
  3. }
  4. public SynchronousQueue(boolean fair) {
  5. transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
  6. }

SynchronousQueue 的特性决定了它非常适合用于传递性的场景,在 Java 6 中,其实现发生了非常大的变化,利用 CAS 替换掉了原本基于锁的逻辑,同步开销比较小。因此它的吞吐量要高于 LinkedBlockingQueue 和 ArrayBlockingQueue。尤其是在队列元素较小的场景,它的性能表现往往大大超过其他实现。

LinkedTransferQueue

LinkedTransferQueue 是一个底层由链表构成的无界阻塞队列,它相对于其他阻塞队列多了 transfer 和 tryTransfer 方法。

  1. public class LinkedTransferQueue<E> extends AbstractQueue<E>
  2. implements TransferQueue<E>, java.io.Serializable {
  3. transient volatile Node head;
  4. private transient volatile Node tail;
  5. }

transfer 方法:
如果当前有消费者正在等待接收元素(消费者使用 take() 方法或带时间限制的 poll() 方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回。

tryTransfer 方法:
tryTransfer 方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收都会立即返回,而 transfer 方法则必须等到消费者消费了才返回。