Java 并发包里面 Queue 这类并发容器是最复杂的,可以从以下两个维度来分类。一是阻塞与非阻塞,所谓阻塞指的是当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。
Java 并发包里阻塞队列都用 Blocking 关键字标识,并且单端队列使用 Queue 标识,而双端队列使用 Deque 标识。BlockingQueue 支持阻塞式的插入和移除元素,常用于生产者和消费者的场景。BlockingQueue 有多种不同的方法用于插入和移除队列元素,如果请求的操作不能得到立即执行的话,每个方法的表现也会不同。
常用方法
在常规队列操作基础上,Blocking 意味着其提供了特定的等待性操作,获取时(take)等待元素进队,或者插入时(put)等待队列出现空位。
public interface BlockingQueue<E> extends Queue<E> {
// 立即将指定的元素插入此队列,成功则返回true
// 如果由于容量限制而无法添加元素,则抛出IllegalStateException
// 使用容量受限的队列时,通常最好使用offer方法
boolean add(E e);
// 立即将指定的元素插入此队列,成功则返回true
// 如果当前没有可用空间,则返回false
boolean offer(E e);
// 将指定元素插入此队列,如果当前没有可用空间,则阻塞等待直到有空间可用
// 如果在等待期间被中断,则抛出InterruptedException异常
void put(E e) throws InterruptedException;
// 将指定的元素插入此队列,如果当前没有可用空间则会等待直到超时或被中断
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 获取并删除此队列的头,如果当前队列中没有元素,则阻塞等待直到有队列元素可以出队
E take() throws InterruptedException;
// 获取并删除此队列的头,如果当前队列中没有元素,则等待指定时间
E poll(long timeout, TimeUnit unit) throws InterruptedException;
// 从队列中删除指定元素的单个实例,如果存在则将其删除
// 如果此队列包含一个或多个此类元素,则删除其中一个元素
boolean remove(Object o);
......
}
ArrayBlockingQueue
ArrayBlockingQueue 是一个底层用数组实现的有界阻塞队列,其内部以 final 的数组来保存数据,数组的大小就决定了队列的边界,所以我们在创建 ArrayBlockingQueue 时都要指定容量。内部还使用 ReentrantLock 实现线程安全的入队和出队操作,使用 Condition 实现等待-通知机制。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 保存队列元素的数组
final Object[] items;
// items index for next take, poll, peek or remove
int takeIndex;
// items index for next put, offer, or add
int putIndex;
// 队列保存的元素个数
int count;
// 通过可重入锁实现线程安全的入队、出队和等待-通知机制
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
}
1. put
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列已满,则让当前线程在notFull上等待
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 有元素入队,则进行一次notEmpty的通知,通知等待出队的线程
notEmpty.signal();
}
2. take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列为空,则让当前线程在notEmpty上等待
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 有元素出队,则进行一次notFull的通知,通知等待入队的线程
notFull.signal();
return x;
}
LinkedBlockingQueue
LinkedBlockingQueue 是一个底层用链表实现的无界阻塞队列,队列的最大长度默认为 Integer.MAX_VALUE,但我们可以在创建队列时指定容量。
其内部也是使用 ReentrantLock 实现线程安全的入队和出队操作,使用 Condition 实现等待-通知机制。但不同的是,其内部针对 take 和 put 操作使用了不同的锁,使得这两个操作之间并不冲突。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
// 当前元素的数量
private final AtomicInteger count = new AtomicInteger();
// 头、尾节点
transient Node<E> head;
private transient Node<E> last;
// take方法需要持有takeLock
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
// put方法需要持有putLock
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
}
takeLock 和 putLock 分别在 take() 和 put() 方法中使用。因为 take 和 put 彼此相互独立,不存在锁竞争关系,因此只需要在 take 和 take 间、put 和 put 间分别对 takeLock 和 putLock 进行加锁即可。从而减少了锁竞争的可能性,提高了并发度。
1. put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
// 不能有两个线程同时存数据
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 如果队列已经满了,则等待
while (count.get() == capacity) {
notFull.await();
}
// 插入数据
enqueue(node);
// 数量加1,原子操作,因为可能会和take同时访问count
c = count.getAndIncrement();
// 有足够的空间,通知其他put线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 插入成功后,通知take()方法取数据
if (c == 0)
signalNotEmpty();
}
2. take
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
// 不能有两个线程同时取数据
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果当前没有可用数据,就一直等待
while (count.get() == 0) {
notEmpty.await();
}
// 取第一个数据
x = dequeue();
// 数量减1,原子操作,因为会和put同时访问count
c = count.getAndDecrement();
// 通知其他take方法操作
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
// capacity为队列容量,默认为Integer.MAX_VALUE。
// 如果在take前已达到了最大容量,则take后会有剩余空间,此时会通知put方法操作
signalNotFull();
return x;
}
PriorityBlockingQueue
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。其内部采用平衡二叉树堆维护元素优先级,使用数组作为元素存储的数据结构。底层数组的最大长度默认为 Integer.MAX_VALUE - 8。
队列元素默认采取自然顺序升序排列,不保证同优先级元素的顺序。也可以自定义队列元素实现 Comparable 接口的 compareTo() 方法或在初始化队列时传入 Comparator 来对元素排序。
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 默认的队列初始化大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 队列最大容量,减8是兼容某些VM分配数组的实现
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 队列
private transient Object[] queue;
// 队列元素数量
private transient int size;
private final ReentrantLock lock;
private final Condition notEmpty;
}
DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列,其内部维护了一个 PriorityQueue 来实现延时。该队列中的保存的元素必须实现 Delayed 接口,通过该接口可以指定延迟时间。当从队列里获取元素时,如果元素没有到达延迟时间则阻塞当前线程,只有在延迟时间过后才能从队列中获取该元素。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();
}
public interface Delayed extends Comparable<Delayed> {
// 该方法返回当前元素还需要延长多长时间
long getDelay(TimeUnit unit);
}
DelayQueue 非常有用,可以将 DelayQueue 运用在以下应用场景:
- 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
- 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就表示执行时间到了,可以开始执行了。
代码示例:
public class DelayQueueDemo {
public static void main(String[] args) {
DelayQueue<User> queue = new DelayQueue<>();
// 创建延迟任务
Random random = new Random();
for (int i = 0; i < 10; i++) {
User user = new User("task:" + i, random.nextInt(500));
queue.offer(user);
}
// 获取任务
User user;
try {
for (int i = 0; i < 10; i++) {
user = queue.take();
System.out.println(user);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Getter
@Setter
private static class User implements Delayed {
private final String name;
// timestamp
private final long delayTime;
private final long expire;
public User(String name, long delay) {
this.name = name;
this.delayTime = delay;
this.expire = System.currentTimeMillis() + delay;
}
/**
* 剩余时间=到期时间-当前时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "User{name=" + name + ", delayTime=" + delayTime + ", expire=" + expire + "}";
}
}
}
SynchronousQueue
SynchronousQueue 是一个容量为 0 的阻塞队列,该队列本身并不存储元素,每一个 put 操作必须等待一个 take 操作,反之亦然。它内部维护了一个线程等待队列,保存等待线程及相关信息。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 线程等待队列
private transient volatile Transferer<E> transferer;
......
}
SynchronousQueue 将 put() 和 take() 这两种功能不同的方法抽象成一个通用方法 Transferer.transfer(),从字面上看,就是数据传递的意思。当参数 e 为非空时,表示当前操作传递给一个消费者,如果为空,则表示当前操作需要请求一个数据。如果返回值不为空,则表示数据己经接受或者正常提供;如果为空则表示失败。
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
Transferer 是一个抽象类,有两个子类的实现,分别是 TransferQueue 和 TransferStack。前者是一个 FIFO 队列,维护了队列头和队列尾两个指针;而后者只维护了栈顶指针。这两个子类分别代表了公平和非公平模式,通过构造函数来进行指定。
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
SynchronousQueue 的特性决定了它非常适合用于传递性的场景,在 Java 6 中,其实现发生了非常大的变化,利用 CAS 替换掉了原本基于锁的逻辑,同步开销比较小。因此它的吞吐量要高于 LinkedBlockingQueue 和 ArrayBlockingQueue。尤其是在队列元素较小的场景,它的性能表现往往大大超过其他实现。
LinkedTransferQueue
LinkedTransferQueue 是一个底层由链表构成的无界阻塞队列,它相对于其他阻塞队列多了 transfer 和 tryTransfer 方法。
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {
transient volatile Node head;
private transient volatile Node tail;
}
transfer 方法:
如果当前有消费者正在等待接收元素(消费者使用 take() 方法或带时间限制的 poll() 方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回。
tryTransfer 方法:
tryTransfer 方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收都会立即返回,而 transfer 方法则必须等到消费者消费了才返回。