介绍

PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。其内部是使用平衡二叉树堆实现的,所以直接遍历队列元素不保证有序。默认使用对象的 compareTo 方法提供比较规则,如果你需要自定义比较规则则可以自定义 comparators

PriorityBlockingQueue 类图结构

下面首先通过类图结构(见图 7-32)来从全局了解 PriorityBlockingQueue 的原理。

由图 7-32 可知,PriorityBlockingQueue 内部有一个数组 queue,用来存放队列元素,size 用来存放队列元素个数。allocationSpinLock 是个自旋锁,其使用 CAS 操作来保证同时只有一个线程可以扩容队列,状态为 0 或者 1,其中 0 表示当前没有进行扩容,1 表示当前正在扩容。

由于这是一个优先级队列,所以有一个比较器 comparator 用来比较元素大小。lock 独占锁对象用来控制同时只能有一个线程可以进行入队、出队操作。notEmpty 条件变量用来实现 take 方法阻塞模式。这里没有 notFull 条件变量是因为这里的 put 操作是非阻塞的,为啥要设计为非阻塞的,是因为这是无界队列

在如下构造函数中,默认队列容量为 11,默认比较器为 null,也就是使用元素的 compareTo 方法进行比较来确定元素的优先级,这意味着队列元素必须实现了 Comparable 接口。

  1. private static final int DEFAULT_INITIAL_CAPACITY = 11;
  2. public PriorityBlockingQueue() {
  3. this(DEFAULT_INITIAL_CAPACITY, null);
  4. }
  5. public PriorityBlockingQueue(int initialCapacity) {
  6. this(initialCapacity, null);
  7. }
  8. public PriorityBlockingQueue(int initialCapacity,
  9. Comparator<? super E> comparator) {
  10. if (initialCapacity < 1)
  11. throw new IllegalArgumentException();
  12. this.lock = new ReentrantLock();
  13. this.notEmpty = lock.newCondition();
  14. this.comparator = comparator;
  15. this.queue = new Object[initialCapacity];
  16. }

PriorityBlockingQueue 原理探究 - 图1

原理介绍

1.offer 操作

offer 操作的作用是在队列中插入一个元素,由于是无界队列,所以一直返回 true。如下是 offer 函数的代码。

  1. public boolean offerE e {
  2. if e == null
  3. throw new NullPointerException();
  4. //获取独占锁
  5. final ReentrantLock lock = this.lock
  6. lock.lock();
  7. int n cap
  8. Object[] array
  9. //(1)如果当前元素个数 >= 队列容量,则扩容
  10. while ((n = size) >= (cap = (array = queue).length))
  11. tryGrowarray, cap);
  12. try {
  13. Comparator<? super E> cmp = comparator
  14. //(2)默认比较器为 null
  15. if cmp == null
  16. siftUpComparablen, e, array);
  17. else
  18. //(3)自定义比较器
  19. siftUpUsingComparatorn, e, array, cmp);
  20. //(9)将队列元素数增加 1,并且激活 notEmpty 的条件队列里面的一个阻塞线程
  21. size = n + 1
  22. notEmpty.signal(); //激活因调用 take()方法被阻塞的线程
  23. } finally {
  24. //释放独占锁
  25. lock.unlock();
  26. }
  27. return true
  28. }

如上代码的主流程比较简单,下面主要看看如何进行扩容和在内部建堆。首先看下面的扩容逻辑。

  1. private void tryGrowObject[] array, int oldCap {
  2. lock.unlock(); //释放获取的锁
  3. Object[] newArray = null
  4. //(4)CAS 成功则扩容
  5. if allocationSpinLock == 0 &&
  6. UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
  7. 0, 1)) {
  8. try {
  9. //oldGap<64 则扩容,执行 oldcap+2,否则扩容 50%,并且最大为 MAX_ARRAY_SIZE
  10. int newCap = oldCap + ((oldCap < 64) ?
  11. (oldCap + 2) : // grow faster if small
  12. (oldCap >> 1));
  13. if newCap - MAX_ARRAY_SIZE > 0 { // possible overflow
  14. int minCap = oldCap + 1
  15. if minCap < 0 || minCap > MAX_ARRAY_SIZE
  16. throw new OutOfMemoryError();
  17. newCap = MAX_ARRAY_SIZE
  18. }
  19. if newCap > oldCap && queue == array
  20. newArray = new Object[newCap];
  21. } finally {
  22. allocationSpinLock = 0
  23. }
  24. }
  25. //(5)第一个线程 CAS 成功后,第二个线程会进入这段代码,然后第二个线程让出 CPU,尽量让第一个线程获取锁,但是这得不到保证。
  26. if newArray == null // back off if another thread is allocating
  27. Thread.yield();
  28. lock.lock(); //(6)
  29. if newArray ! = null && queue == array {
  30. queue = newArray
  31. System.arraycopyarray, 0, newArray, 0, oldCap);
  32. }
  33. }

tryGrow 的作用是扩容。这里为啥在扩容前要先释放锁,然后使用 CAS 控制只有一个线程可以扩容成功?其实这里不先释放锁,也是可行的,也就是在整个扩容期间一直持有锁,但是扩容是需要花时间的,如果扩容时还占用锁那么其他线程在这个时候是不能进行出队和入队操作的,这大大降低了并发性。所以为了提高性能,使用 CAS 控制只有一个线程可以进行扩容,并且在扩容前释放锁,让其他线程可以进行入队和出队操作。

spinlock 锁使用 CAS 控制只有一个线程可以进行扩容,CAS 失败的线程会调用 Thread.yield()让出 CPU,目的是让扩容线程扩容后优先调用 lock.lock 重新获取锁,但是这得不到保证。有可能 yield 的线程在扩容线程扩容完成前已经退出,并执行代码(6)获取到了锁,这时候获取到锁的线程发现 newArray 为 null 就会执行代码(1)。如果当前数组扩容还没完毕,当前线程会再次调用 tryGrow 方法,然后释放锁,这又给扩容线程获取锁提供了机会,如果这时候扩容线程还没扩容完毕,则当前线程释放锁后又调用 yield 方法让出 CPU。所以当扩容线程进行扩容时,其他线程原地自旋通过代码(1)检查当前扩容是否完毕,扩容完毕后才退出代码(1)的循环。

扩容线程扩容完毕后会重置自旋锁变量 allocationSpinLock 为 0,这里并没有使用 UNSAFE 方法的 CAS 进行设置是因为同时只可能有一个线程获取到该锁,并且 allocationSpinLock 被修饰为了 volatile 的。当扩容线程扩容完毕后会执行代码(6)获取锁,获取锁后复制当前 queue 里面的元素到新数组。

然后看下面的具体建堆算法。

  1. private static <T> void siftUpComparableint k, T x, Object[] array {
  2. Comparable<? super T> key = Comparable<? super T>) x
  3. //队列元素个数 >0 则判断插入位置,否则直接入队(7)
  4. while k > 0 {
  5. int parent = k -1 >>> 1
  6. Object e = array[parent];
  7. if key.compareTo((T) e) >= 0
  8. break
  9. array[k] = e
  10. k = parent
  11. }
  12. array[k] = key (8)
  13. }

下面用图来解释上面算法过程,假设队列初始化容量为 2,创建的优先级队列的泛型参数为 Integer。

I.首先调用队列的 offer(2)方法,希望向队列插入元素 2,插入前队列状态如下所示:

PriorityBlockingQueue 原理探究 - 图2

首先执行代码(1),从图中的变量值可知判断结果为 false,所以紧接着执行代码(2)。由于 k=n=size=0,所以代码(7)的判断结果为 false,因此会执行代码(8)直接把元素 2 入队。最后执行代码(9)将 size 的值加 1,这时候队列的状态如下所示:

PriorityBlockingQueue 原理探究 - 图3

II.第二次调用队列的 offer(4)时,首先执行代码(1),从图中的变量值可知判断结果为 false,所以执行代码(2)。由于 k=1,所以进入 while 循环,由于 parent=0;e=2;key=4;默认元素比较器使用元素的 compareTo 方法,可知 key>e,所以执行 break 退出 siftUpComparable 中的循环,然后把元素存到数组下标为 1 的地方。最后执行代码(9)将 size 的值加 1,这时候队列状态如下所示:

PriorityBlockingQueue 原理探究 - 图4

III.第三次调用队列的 offer(6)时,首先执行代码(1),从图中的变量值知道,这时候判断结果为 true,所以调用 tryGrow 进行数组扩容。由于 2<64,所以执行 newCap=2 +(2+2)=6,然后创建新数组并复制,之后调用 siftUpComparable 方法。由于 k=2>0,故进入 while 循环,由于 parent=0;e=2;key=6;key>e,所以执行 break 后退出 while 循环,并把元素 6 放入数组下标为 2 的地方。最后将 size 的值加 1,现在队列状态如下所示:

PriorityBlockingQueue 原理探究 - 图5

IV.第四次调用队列的 offer(1)时,首先执行代码(1),从图中的变量值知道,这次判断结果为 false,所以执行代码(2)。由于 k=3,所以进入 while 循环,由于 parent=1;e=4;key=1;key<e,所以把元素 4 复制到数组下标为 3 的地方。然后执行 k=1,再次循环,发现 e=2,key=1,key<e,所以复制元素 2 到数组下标 1 处,然后 k=0 退出循环。最后把元素 1 存放到下标为 0 的地方,现在的状态如下所示:

PriorityBlockingQueue 原理探究 - 图6

这时候二叉树堆的树形图如下所示:

PriorityBlockingQueue 原理探究 - 图7

由此可见,堆的根元素是 1,也就是这是一个最小堆,那么当调用这个优先级队列的 poll 方法时,会依次返回堆里面值最小的元素。

2.poll 操作

poll 操作的作用是获取队列内部堆树的根节点元素,如果队列为空,则返回 null。poll 函数的代码如下。

  1. public E poll() {
  2. final ReentrantLock lock = this.lock
  3. lock.lock(); //获取独占锁
  4. try {
  5. return dequeue();
  6. } finally {
  7. lock.unlock(); //释放独占锁
  8. }
  9. }

如以上代码所示,在进行出队操作时要先加锁,这意味着,当前线程在进行出队操作时,其他线程不能再进行入队和出队操作,但是前面在介绍 offer 函数时介绍过,这时候其他线程可以进行扩容。下面看下具体执行出队操作的 dequeue 方法的代码:

  1. private E dequeue() {
  2. //队列为空,则返回 null
  3. int n = size -1
  4. if n < 0
  5. return null
  6. else {
  7. //(1)获取队头元素
  8. Object[] array = queue
  9. E result = E array[0];
  10. //(2)获取队尾元素,并赋值为 null
  11. E x = E array[n];
  12. array[n] = null
  13. Comparator<? super E> cmp = comparator
  14. if cmp == null//(3)
  15. siftDownComparable0, x, array, n);
  16. else
  17. siftDownUsingComparator0, x, array, n, cmp);
  18. size = n //(4)
  19. return result
  20. }
  21. }

在如上代码中,如果队列为空则直接返回 null,否则执行代码(1)获取数组第一个元素作为返回值存放到变量 Result 中,这里需要注意,数组里面的第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。然后代码(2)获取队列尾部元素并存放到变量 x 中,且置空尾部节点,然后执行代码(3)将变量 x 插入到数组下标为 0 的位置,之后重新调整堆为最大或者最小堆,然后返回。这里重要的是,去掉堆的根节点后,如何使用剩下的节点重新调整一个最大或者最小堆。下面我们看下 siftDownComparable 的实现代码。

  1. private static <T> void siftDownComparable(int k, T x, Object[] array,
  2. int n) {
  3. if (n > 0) {
  4. Comparable<? super T> key = (Comparable<? super T>)x;
  5. int half = n >>> 1; // first non-leaf node
  6. while (k < half) { // loop while a non-leaf
  7. int child = (k << 1) + 1; // assume left child is least
  8. Object c = array[child];(5
  9. int right = child + 1;(6)
  10. if (right < n &&
  11. ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
  12. c = array[child = right];
  13. if (key.compareTo((T) c) <= 0)(8)
  14. break;
  15. array[k] = c;
  16. k = child;
  17. }
  18. array[k] = key; (9)
  19. }
  20. }

同样下面我们结合图来介绍上面调整堆的算法过程。接着上节队列的状态继续讲解,在上一节中队列元素序列为 1、2、6、4。

I.第一次调用队列的 poll()方法时,首先执行代码(1)和代码(2),这时候变量 size =4;n=3;result=1;x=4;此时队列状态如下所示。

PriorityBlockingQueue 原理探究 - 图8

然后执行代码(3)调整堆后队列状态为

PriorityBlockingQueue 原理探究 - 图9

II.第二次调用队列的 poll()方法时,首先执行代码(1)和代码(2),这时候变量 size =3;n=2;result=2;x=6;此时队列状态为

PriorityBlockingQueue 原理探究 - 图10

然后执行代码(3)调整堆后队列状态为

PriorityBlockingQueue 原理探究 - 图11

III.第三次调用队列的 poll()方法时,首先执行代码(1)和代码(2),这时候变量 size =2;n=1;result=4;x=6;此时队列状态为

PriorityBlockingQueue 原理探究 - 图12

然后执行代码(3)调整堆后队列状态为

PriorityBlockingQueue 原理探究 - 图13

IV.第四次直接返回元素 6。

下面重点说说 siftDownComparable 调整堆的算法。首先介绍下堆调整的思路。由于队列数组第 0 个元素为树根,因此出队时要移除它。这时数组就不再是最小的堆了,所以需要调整堆。具体是从被移除的树根的左右子树中找一个最小的值来当树根,左右子树又会找自己左右子树里面那个最小值,这是一个递归过程,直到树叶节点结束递归。如果不太明白,没关系,下面我们结合图来说明,假如当前队列内容如下:

PriorityBlockingQueue 原理探究 - 图14

其对应的二叉堆树为:

PriorityBlockingQueue 原理探究 - 图15

这时候如果调用了 poll(),那么 result=2;x=11,并且队列末尾的元素被设置为 null,然后对于剩下的元素,调整堆的步骤如下图所示:

图(1)中树根的 leftChildVal = 4;rightChildVal = 6;由于 4<6,所以 c=4。然后由于 11>4,也就是 key>c,所以使用元素 4 覆盖树根节点的值,现在堆对应的树如图(2)所示。

然后树根的左子树树根的左右孩子节点中的 leftChildVal = 8;rightChildVal = 10;由于 8<10,所以 c=8。然后由于 11>8,也就是 key>c,所以元素 8 作为树根左子树的根节点,现在树的形状如图(3)所示。这时候判断是否 k<half,结果为 false,所以退出循环。然后把 x=11 的元素设置到数组下标为 3 的地方,这时候堆树如图(4)所示,至此调整堆完毕。siftDownComparable 返回的 result=2,所以 poll 方法也返回了。

PriorityBlockingQueue 原理探究 - 图16

3.put 操作

put 操作内部调用的是 offer 操作,由于是无界队列,所以不需要阻塞。

  1. public void put(E e) {
  2. offer(e); // never need to block
  3. }

4.take 操作

take 操作的作用是获取队列内部堆树的根节点元素,如果队列为空则阻塞,如以下代码所示。

  1. public E take() throws InterruptedException {
  2. //获取锁,可被中断
  3. final ReentrantLock lock = this.lock
  4. lock.lockInterruptibly();
  5. E result
  6. try {
  7. //如果队列为空,则阻塞,把当前线程放入 notEmpty 的条件队列
  8. while (result = dequeue()) == null
  9. notEmpty.await(); //阻塞当前线程
  10. } finally {
  11. lock.unlock(); //释放锁
  12. }
  13. return result;
  14. }

在如上代码中,首先通过 lock.lockInterruptibly()获取独占锁,以这个方式获取的锁会对中断进行响应。然后调用 dequeue 方法返回堆树根节点元素,如果队列为空,则返回 false。然后当前线程调用 notEmpty.await()阻塞挂起自己,直到有线程调用了 offer()方法(在 offer 方法内添加元素成功后会调用 notEmpty.signal 方法,这会激活一个阻塞在 notEmpty 的条件队列里面的一个线程)。另外,这里使用 while 循环而不是 if 语句是为了避免虚假唤醒。

5.size 操作

计算队列元素个数。如下代码在返回 size 前加了锁,以保证在调用 size()方法时不会有其他线程进行入队和出队操作。另外,由于 size 变量没有被修饰为 volatie 的,所以这里加锁也保证了在多线程下 size 变量的内存可见性。

  1. public int size() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. return size;
  6. } finally {
  7. lock.unlock();
  8. }
  9. }

案例介绍

下面我们通过一个案例来体会 PriorityBlockingQueue 的使用方法。在这个案例中,会把具有优先级的任务放入队列,然后从队列里面逐个获取优先级最高的任务来执行。

  1. public class TestPriorityBlockingQueue {
  2. static class Task implements Comparable<Task> {
  3. public int getPriority() {
  4. return priority;
  5. }
  6. public void setPriorityint priority {
  7. this.priority = priority
  8. }
  9. public String getTaskName() {
  10. return taskName
  11. }
  12. public void setTaskNameString taskName {
  13. this.taskName = taskName
  14. }
  15. private int priority = 0
  16. private String taskName
  17. @Override
  18. public int compareToTask o {
  19. if this.priority >= o.getPriority()) {
  20. return 1
  21. } else {
  22. return -1
  23. }
  24. }
  25. public void doSomeThing(){
  26. System.out.printlntaskName + 「:」 + priority);
  27. }
  28. }
  29. public static void mainString[] args {
  30. //创建任务,并添加到队列
  31. PriorityBlockingQueue<Task> priorityQueue = new
  32. PriorityBlockingQueue<Task>();
  33. Random random = new Random();
  34. forint i=0 i<10 ++i){
  35. Task task = new Task();
  36. task.setPriority(random.nextInt(10));
  37. task.setTaskName(「taskName +i);
  38. priorityQueue.offertask);
  39. }
  40. //取出任务执行
  41. while(! priorityQueue.isEmpty()){
  42. Task task = priorityQueue.poll();
  43. ifnull ! = task){
  44. task.doSomeThing();
  45. }
  46. }
  47. }
  48. }

如上代码首先创建了一个 Task 类,该类继承了 Comparable 方法并重写了 compareTo 方法,自定义了元素优先级比较规则。然后在 main 函数里面创建了一个优先级队列,并使用随机数生成器生成 10 个随机的有优先级的任务,并将它们添加到优先级队列。最后从优先级队列里面逐个获取任务并执行。运行上面代码,一个可能的输出如下所示。

  1. taskName7:0
  2. taskName6:1
  3. taskName9:1
  4. taskName1:2
  5. taskName5:3
  6. taskName0:3
  7. taskName3:4
  8. taskName8:5
  9. taskName2:7
  10. taskName4:7

从结果可知,任务执行的先后顺序和它们被放入队列的先后顺序没有关系,而是和它们的优先级有关系。

小结

PriorityBlockingQueue 队列在内部使用二叉树堆维护元素优先级,使用数组作为元素存储的数据结构,这个数组是可扩容的。当当前元素个数 >= 最大容量时会通过 CAS 算法扩容,出队时始终保证出队的元素是堆树的根节点,而不是在队列里面停留时间最长的元素。使用元素的 compareTo 方法提供默认的元素优先级比较规则,用户可以自定义优先级的比较规则(compareTo 大的放后面)

如图 7-33 所示,PriorityBlockingQueue 类似于 ArrayBlockingQueue,在内部使用一个独占锁来控制同时只有一个线程可以进行入队和出队操作。另外,前者只使用了一个 notEmpty 条件变量而没有使用 notFull,这是因为前者是无界队列,执行 put 操作时永远不会处于 await 状态,所以也不需要被唤醒。而 take 方法是阻塞方法,并且是可被中断的。当需要存放有优先级的元素时该队列比较有用。

PriorityBlockingQueue 原理探究 - 图17