Java 并发包里面 Queue 这类并发容器是最复杂的,可以从以下两个维度来分类。一是阻塞与非阻塞,所谓阻塞指的是当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。
Java 并发包里阻塞队列都用 Blocking 关键字标识,并且单端队列使用 Queue 标识,而双端队列使用 Deque 标识。BlockingQueue 支持阻塞式的插入和移除元素,常用于生产者和消费者的场景。BlockingQueue 有多种不同的方法用于插入和移除队列元素,如果请求的操作不能得到立即执行的话,每个方法的表现也会不同。
常用方法
在常规队列操作基础上,Blocking 意味着其提供了特定的等待性操作,获取时(take)等待元素进队,或者插入时(put)等待队列出现空位。
public interface BlockingQueue<E> extends Queue<E> {// 立即将指定的元素插入此队列,成功则返回true// 如果由于容量限制而无法添加元素,则抛出IllegalStateException// 使用容量受限的队列时,通常最好使用offer方法boolean add(E e);// 立即将指定的元素插入此队列,成功则返回true// 如果当前没有可用空间,则返回falseboolean offer(E e);// 将指定元素插入此队列,如果当前没有可用空间,则阻塞等待直到有空间可用// 如果在等待期间被中断,则抛出InterruptedException异常void put(E e) throws InterruptedException;// 将指定的元素插入此队列,如果当前没有可用空间则会等待直到超时或被中断boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;// 获取并删除此队列的头,如果当前队列中没有元素,则阻塞等待直到有队列元素可以出队E take() throws InterruptedException;// 获取并删除此队列的头,如果当前队列中没有元素,则等待指定时间E poll(long timeout, TimeUnit unit) throws InterruptedException;// 从队列中删除指定元素的单个实例,如果存在则将其删除// 如果此队列包含一个或多个此类元素,则删除其中一个元素boolean remove(Object o);......}
ArrayBlockingQueue
ArrayBlockingQueue 是一个底层用数组实现的有界阻塞队列,其内部以 final 的数组来保存数据,数组的大小就决定了队列的边界,所以我们在创建 ArrayBlockingQueue 时都要指定容量。内部还使用 ReentrantLock 实现线程安全的入队和出队操作,使用 Condition 实现等待-通知机制。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 保存队列元素的数组final Object[] items;// items index for next take, poll, peek or removeint takeIndex;// items index for next put, offer, or addint putIndex;// 队列保存的元素个数int count;// 通过可重入锁实现线程安全的入队、出队和等待-通知机制final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;}
1. put
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 如果队列已满,则让当前线程在notFull上等待while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;// 有元素入队,则进行一次notEmpty的通知,通知等待出队的线程notEmpty.signal();}
2. take
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 如果队列为空,则让当前线程在notEmpty上等待while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}private E dequeue() {final Object[] items = this.items;E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();// 有元素出队,则进行一次notFull的通知,通知等待入队的线程notFull.signal();return x;}
LinkedBlockingQueue
LinkedBlockingQueue 是一个底层用链表实现的无界阻塞队列,队列的最大长度默认为 Integer.MAX_VALUE,但我们可以在创建队列时指定容量。
其内部也是使用 ReentrantLock 实现线程安全的入队和出队操作,使用 Condition 实现等待-通知机制。但不同的是,其内部针对 take 和 put 操作使用了不同的锁,使得这两个操作之间并不冲突。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}// 当前元素的数量private final AtomicInteger count = new AtomicInteger();// 头、尾节点transient Node<E> head;private transient Node<E> last;// take方法需要持有takeLockprivate final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();// put方法需要持有putLockprivate final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();}
takeLock 和 putLock 分别在 take() 和 put() 方法中使用。因为 take 和 put 彼此相互独立,不存在锁竞争关系,因此只需要在 take 和 take 间、put 和 put 间分别对 takeLock 和 putLock 进行加锁即可。从而减少了锁竞争的可能性,提高了并发度。
1. put
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);// 不能有两个线程同时存数据final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {// 如果队列已经满了,则等待while (count.get() == capacity) {notFull.await();}// 插入数据enqueue(node);// 数量加1,原子操作,因为可能会和take同时访问countc = count.getAndIncrement();// 有足够的空间,通知其他put线程if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}// 插入成功后,通知take()方法取数据if (c == 0)signalNotEmpty();}
2. take
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;// 不能有两个线程同时取数据final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {// 如果当前没有可用数据,就一直等待while (count.get() == 0) {notEmpty.await();}// 取第一个数据x = dequeue();// 数量减1,原子操作,因为会和put同时访问countc = count.getAndDecrement();// 通知其他take方法操作if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)// capacity为队列容量,默认为Integer.MAX_VALUE。// 如果在take前已达到了最大容量,则take后会有剩余空间,此时会通知put方法操作signalNotFull();return x;}
PriorityBlockingQueue
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。其内部采用平衡二叉树堆维护元素优先级,使用数组作为元素存储的数据结构。底层数组的最大长度默认为 Integer.MAX_VALUE - 8。
队列元素默认采取自然顺序升序排列,不保证同优先级元素的顺序。也可以自定义队列元素实现 Comparable 接口的 compareTo() 方法或在初始化队列时传入 Comparator 来对元素排序。
public class PriorityBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 默认的队列初始化大小private static final int DEFAULT_INITIAL_CAPACITY = 11;// 队列最大容量,减8是兼容某些VM分配数组的实现private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;// 队列private transient Object[] queue;// 队列元素数量private transient int size;private final ReentrantLock lock;private final Condition notEmpty;}
DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列,其内部维护了一个 PriorityQueue 来实现延时。该队列中的保存的元素必须实现 Delayed 接口,通过该接口可以指定延迟时间。当从队列里获取元素时,如果元素没有到达延迟时间则阻塞当前线程,只有在延迟时间过后才能从队列中获取该元素。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();private final Condition available = lock.newCondition();}public interface Delayed extends Comparable<Delayed> {// 该方法返回当前元素还需要延长多长时间long getDelay(TimeUnit unit);}
DelayQueue 非常有用,可以将 DelayQueue 运用在以下应用场景:
- 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
- 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就表示执行时间到了,可以开始执行了。
代码示例:
public class DelayQueueDemo {public static void main(String[] args) {DelayQueue<User> queue = new DelayQueue<>();// 创建延迟任务Random random = new Random();for (int i = 0; i < 10; i++) {User user = new User("task:" + i, random.nextInt(500));queue.offer(user);}// 获取任务User user;try {for (int i = 0; i < 10; i++) {user = queue.take();System.out.println(user);}} catch (Exception e) {e.printStackTrace();}}@Getter@Setterprivate static class User implements Delayed {private final String name;// timestampprivate final long delayTime;private final long expire;public User(String name, long delay) {this.name = name;this.delayTime = delay;this.expire = System.currentTimeMillis() + delay;}/*** 剩余时间=到期时间-当前时间*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString() {return "User{name=" + name + ", delayTime=" + delayTime + ", expire=" + expire + "}";}}}
SynchronousQueue
SynchronousQueue 是一个容量为 0 的阻塞队列,该队列本身并不存储元素,每一个 put 操作必须等待一个 take 操作,反之亦然。它内部维护了一个线程等待队列,保存等待线程及相关信息。
public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 线程等待队列private transient volatile Transferer<E> transferer;......}
SynchronousQueue 将 put() 和 take() 这两种功能不同的方法抽象成一个通用方法 Transferer.transfer(),从字面上看,就是数据传递的意思。当参数 e 为非空时,表示当前操作传递给一个消费者,如果为空,则表示当前操作需要请求一个数据。如果返回值不为空,则表示数据己经接受或者正常提供;如果为空则表示失败。
abstract static class Transferer<E> {abstract E transfer(E e, boolean timed, long nanos);}
Transferer 是一个抽象类,有两个子类的实现,分别是 TransferQueue 和 TransferStack。前者是一个 FIFO 队列,维护了队列头和队列尾两个指针;而后者只维护了栈顶指针。这两个子类分别代表了公平和非公平模式,通过构造函数来进行指定。
public SynchronousQueue() {this(false);}public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}
SynchronousQueue 的特性决定了它非常适合用于传递性的场景,在 Java 6 中,其实现发生了非常大的变化,利用 CAS 替换掉了原本基于锁的逻辑,同步开销比较小。因此它的吞吐量要高于 LinkedBlockingQueue 和 ArrayBlockingQueue。尤其是在队列元素较小的场景,它的性能表现往往大大超过其他实现。
LinkedTransferQueue
LinkedTransferQueue 是一个底层由链表构成的无界阻塞队列,它相对于其他阻塞队列多了 transfer 和 tryTransfer 方法。
public class LinkedTransferQueue<E> extends AbstractQueue<E>implements TransferQueue<E>, java.io.Serializable {transient volatile Node head;private transient volatile Node tail;}
transfer 方法:
如果当前有消费者正在等待接收元素(消费者使用 take() 方法或带时间限制的 poll() 方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回。
tryTransfer 方法:
tryTransfer 方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收都会立即返回,而 transfer 方法则必须等到消费者消费了才返回。
