前言
PriorityBlockingQueue 是 Java 里优先级队列的实现,其设计思想与 PriorityQueue 区别不大。
PriorityBlockingQueue 底层的数据结构是数组,同时它也是一个小顶堆(升序排列),通过一棵完全二叉树来表示。
源码分析
成员变量
/*** 默认数组大小*/private static final int DEFAULT_INITIAL_CAPACITY = 11;/*** 最大数组容量*/private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;/**** Priority queue 由一棵完全二叉树表示。如果父节点的索引是 n,那么左子节点的索引是 2*n+1,* 右子节点的索引是 2*(n+1)。priority queue 要么是由传入的 comparator 排序,* 要么是由元素重写的 comparate 方法排序。*/private transient Object[] queue;/*** 数组元素大小*/private transient int size;/*** 排序器*/private transient Comparator<? super E> comparator;/*** 锁住一些被 public 修饰的方法*/private final ReentrantLock lock;/*** 当数组为空时阻塞*/private final Condition notEmpty;/*** 扩容时的自旋锁标识,扩容时修改为 1,扩容完成后再改回 0*/private transient volatile int allocationSpinLock;/*** 序列化使用*/private PriorityQueue<E> q;
构造函数
public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null);}// 传入初数组容量public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);}// 传入 Comparatorpublic PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) {if (initialCapacity < 1)throw new IllegalArgumentException();this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity];}
offer 方法
PriorityBlockingQueue 实现了 Queue 接口,不允许放入 null 元素。
public boolean offer(E e) {// 队列所有的元素不允许为 nullif (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;// 加锁lock.lock();int n, cap;Object[] array;// 判断是否需要扩容while ((n = size) >= (cap = (array = queue).length))// 扩容操作tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;// 如果不自定义比较器,则默认为一个小顶堆,从下往上判断进行调整if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;// 唤醒非空条件对象notEmpty.signal();} finally {// 释放锁lock.unlock();}return true;}
扩容操作
这个方法比较特殊的地方在于先释放了锁,然后通过 CAS 操作判断是否需要初始化新数组,尝试 CAS 失败的线程,会做出一个让步,放弃 CPU 时间片,然后与其他线程一同竞争。这个过程我们可以思考以下几个问题:
- 为什么不直接加锁而是通过 CAS 加判断操作完成扩容步骤;
 - 为什么尝试 CAS 失败的线程需要让步;
 - 在多线程情况下可能会有多个线程初始化新数组,那如何保证操作一致性
/*** Q:扩容操作为什么要允许多个线程进来呢?* A:如果整个扩容过程还加锁的话,其他线程是不能修改队列的,* 只能等待扩容完后才能继续执行,并发效率比较低*/private void tryGrow(Object[] array, int oldCap) {// 释放锁lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;/*** compareAndSwapInt:** this:当前对象的引用* allocationSpinLockOffset:allocationSpinLock 在内存中的偏移量* 0:allocationSpinLock 的预期值* 1:更新值*/if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {try {// 当容量小于 64 时容量为原来的两倍 + 2,如果大于等于 64 时扩容为原来的 1.5 倍// 与 PriorityQueue 一致int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}// 初始化新的数组if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {// allocationSpinLock 置 0// 因此后面的线程获取锁也可能会尝试 CAS 成功,然后初始化新数组allocationSpinLock = 0;}}/*** 如果当前线程尝试 CAS 失败,则尝试让步* Q:这里为什么要让步?* A:因为自己不是成功初始化新数组的线程,就算获取到了线程也不能正确扩容,* 因此让步尽量让成功扩容的线程获取锁*/if (newArray == null) // back off if another thread is allocatingThread.yield();/*** Q:在加锁之前,可能由多个数组尝试 CAS 成功,且成功的初始化了新的数组,* 那么是不是后面的新数组会覆盖前面的数组呢?* A:当然答案肯定是不会的,那么是如何保证正确性的呢?关键在于 queue == array 判断,* 因此只有第一个判断成功的线程能正确扩容,其他非第一个线程再进行判断的时候会返回 false,* 自然不会进行数组元素拷贝*/lock.lock();if (newArray != null && queue == array) {// 重置队列内部数组queue = newArray;// 元素拷贝,同 PriorityQueueSystem.arraycopy(array, 0, newArray, 0, oldCap);}}
调整堆结构
 
PriorityBlockingQueue 调整堆结构的方式与 PriorityQueue 一致,先看看 PriorityQueue 的实现。
上图中我们给每个元素按照层序遍历的方式进行了编号,如果你足够细心,会发现父节点和子节点的编号是有联系的,更确切的说父子节点的编号之间有如下关系:
leftNo = parentNo*2+1rightNo = parentNo*2+2parentNo = (nodeNo-1)/2
通过上述三个公式,可以轻易计算出某个节点的父节点以及子节点的下标。这也就是为什么可以直接用数组来存储堆的原因。
PriorityQueue 的 peek 和 element 操作是常数时间,add, offer, 无参数的 remove 以及 poll 方法的时间复杂度都是 log(N)。
向上调整示意图:

// k 表示插入索引,x 表示插入元素private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {// 计算父节点索引int parent = (k - 1) >>> 1;// 父节点元素Object e = array[parent];// 只要元素大于父节点元素就退出if (key.compareTo((T) e) >= 0)break;// 父节点元素下沉array[k] = e;// 更新父节点索引k = parent;}// 调整后的最后位置array[k] = key;}
take 方法
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 加锁(可响应中断)lock.lockInterruptibly();E result;try {// 如果队列为空,take 方法会阻塞出队线程while ( (result = dequeue()) == null)/*** 如果队列中没有元素,会阻塞后续调用 take 方法出队的线程* 直到队列添加了元素后唤醒 notEmpty,才可以继续执行*/notEmpty.await();} finally {// 释放锁lock.unlock();}return result;}
deque 方法
private E dequeue() {int n = size - 1;if (n < 0)return null;else {Object[] array = queue;// 堆顶的元素E result = (E) array[0];// 堆最底层的元素(最后一个)E x = (E) array[n];// 把最后一个元素置 null,因为要把它放到堆顶,向下逐步调整堆结构,与 PriorityQueue 一致array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}}
向下调整
PriorityBlockingQueue 的 take 方法会把堆顶元素取出,并把堆中最后一个元素放在堆顶,然后逐步将这个元素向下调整。
更新后的堆顶元素向下调整,始终与较小的子节点做比较。如果发现子节点比父节点大,说明调整结束。
private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;// 叶子节点在最后一层int half = n >>> 1; // loop while a non-leafwhile (k < half) {// 假设左子节点是最小节点int child = (k << 1) + 1;Object c = array[child];int right = child + 1;// 左子节点和右子节点比较。如果左子节点大于右子节点,说明元素该与右子节点比较if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];//如果队尾元素比根元素孩子都要小,则不需"下移",结束if (key.compareTo((T) c) <= 0)break;array[k] = c;k = child;}array[k] = key;}}
