前言

PriorityBlockingQueue 是 Java 里优先级队列的实现,其设计思想与 PriorityQueue 区别不大。
PriorityBlockingQueue 底层的数据结构是数组,同时它也是一个小顶堆(升序排列),通过一棵完全二叉树来表示。
PriorityBlockingQueue 源码分析 - 图1

源码分析

成员变量

  1. /**
  2. * 默认数组大小
  3. */
  4. private static final int DEFAULT_INITIAL_CAPACITY = 11;
  5. /**
  6. * 最大数组容量
  7. */
  8. private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  9. /**
  10. *
  11. * Priority queue 由一棵完全二叉树表示。如果父节点的索引是 n,那么左子节点的索引是 2*n+1,
  12. * 右子节点的索引是 2*(n+1)。priority queue 要么是由传入的 comparator 排序,
  13. * 要么是由元素重写的 comparate 方法排序。
  14. */
  15. private transient Object[] queue;
  16. /**
  17. * 数组元素大小
  18. */
  19. private transient int size;
  20. /**
  21. * 排序器
  22. */
  23. private transient Comparator<? super E> comparator;
  24. /**
  25. * 锁住一些被 public 修饰的方法
  26. */
  27. private final ReentrantLock lock;
  28. /**
  29. * 当数组为空时阻塞
  30. */
  31. private final Condition notEmpty;
  32. /**
  33. * 扩容时的自旋锁标识,扩容时修改为 1,扩容完成后再改回 0
  34. */
  35. private transient volatile int allocationSpinLock;
  36. /**
  37. * 序列化使用
  38. */
  39. private PriorityQueue<E> q;

构造函数

  1. public PriorityBlockingQueue() {
  2. this(DEFAULT_INITIAL_CAPACITY, null);
  3. }
  4. // 传入初数组容量
  5. public PriorityBlockingQueue(int initialCapacity) {
  6. this(initialCapacity, null);
  7. }
  8. // 传入 Comparator
  9. public PriorityBlockingQueue(int initialCapacity,
  10. Comparator<? super E> comparator) {
  11. if (initialCapacity < 1)
  12. throw new IllegalArgumentException();
  13. this.lock = new ReentrantLock();
  14. this.notEmpty = lock.newCondition();
  15. this.comparator = comparator;
  16. this.queue = new Object[initialCapacity];
  17. }

offer 方法

PriorityBlockingQueue 实现了 Queue 接口,不允许放入 null 元素。

  1. public boolean offer(E e) {
  2. // 队列所有的元素不允许为 null
  3. if (e == null)
  4. throw new NullPointerException();
  5. final ReentrantLock lock = this.lock;
  6. // 加锁
  7. lock.lock();
  8. int n, cap;
  9. Object[] array;
  10. // 判断是否需要扩容
  11. while ((n = size) >= (cap = (array = queue).length))
  12. // 扩容操作
  13. tryGrow(array, cap);
  14. try {
  15. Comparator<? super E> cmp = comparator;
  16. // 如果不自定义比较器,则默认为一个小顶堆,从下往上判断进行调整
  17. if (cmp == null)
  18. siftUpComparable(n, e, array);
  19. else
  20. siftUpUsingComparator(n, e, array, cmp);
  21. size = n + 1;
  22. // 唤醒非空条件对象
  23. notEmpty.signal();
  24. } finally {
  25. // 释放锁
  26. lock.unlock();
  27. }
  28. return true;
  29. }

扩容操作

这个方法比较特殊的地方在于先释放了锁,然后通过 CAS 操作判断是否需要初始化新数组,尝试 CAS 失败的线程,会做出一个让步,放弃 CPU 时间片,然后与其他线程一同竞争。这个过程我们可以思考以下几个问题:

  • 为什么不直接加锁而是通过 CAS 加判断操作完成扩容步骤;
  • 为什么尝试 CAS 失败的线程需要让步;
  • 在多线程情况下可能会有多个线程初始化新数组,那如何保证操作一致性
    1. /**
    2. * Q:扩容操作为什么要允许多个线程进来呢?
    3. * A:如果整个扩容过程还加锁的话,其他线程是不能修改队列的,
    4. * 只能等待扩容完后才能继续执行,并发效率比较低
    5. */
    6. private void tryGrow(Object[] array, int oldCap) {
    7. // 释放锁
    8. lock.unlock(); // must release and then re-acquire main lock
    9. Object[] newArray = null;
    10. /**
    11. * compareAndSwapInt:
    12. *
    13. * this:当前对象的引用
    14. * allocationSpinLockOffset:allocationSpinLock 在内存中的偏移量
    15. * 0:allocationSpinLock 的预期值
    16. * 1:更新值
    17. */
    18. if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
    19. try {
    20. // 当容量小于 64 时容量为原来的两倍 + 2,如果大于等于 64 时扩容为原来的 1.5 倍
    21. // 与 PriorityQueue 一致
    22. int newCap = oldCap + ((oldCap < 64) ?
    23. (oldCap + 2) : // grow faster if small
    24. (oldCap >> 1));
    25. if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
    26. int minCap = oldCap + 1;
    27. if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
    28. throw new OutOfMemoryError();
    29. newCap = MAX_ARRAY_SIZE;
    30. }
    31. // 初始化新的数组
    32. if (newCap > oldCap && queue == array)
    33. newArray = new Object[newCap];
    34. } finally {
    35. // allocationSpinLock 置 0
    36. // 因此后面的线程获取锁也可能会尝试 CAS 成功,然后初始化新数组
    37. allocationSpinLock = 0;
    38. }
    39. }
    40. /**
    41. * 如果当前线程尝试 CAS 失败,则尝试让步
    42. * Q:这里为什么要让步?
    43. * A:因为自己不是成功初始化新数组的线程,就算获取到了线程也不能正确扩容,
    44. * 因此让步尽量让成功扩容的线程获取锁
    45. */
    46. if (newArray == null) // back off if another thread is allocating
    47. Thread.yield();
    48. /**
    49. * Q:在加锁之前,可能由多个数组尝试 CAS 成功,且成功的初始化了新的数组,
    50. * 那么是不是后面的新数组会覆盖前面的数组呢?
    51. * A:当然答案肯定是不会的,那么是如何保证正确性的呢?关键在于 queue == array 判断,
    52. * 因此只有第一个判断成功的线程能正确扩容,其他非第一个线程再进行判断的时候会返回 false,
    53. * 自然不会进行数组元素拷贝
    54. */
    55. lock.lock();
    56. if (newArray != null && queue == array) {
    57. // 重置队列内部数组
    58. queue = newArray;
    59. // 元素拷贝,同 PriorityQueue
    60. System.arraycopy(array, 0, newArray, 0, oldCap);
    61. }
    62. }

    调整堆结构

PriorityBlockingQueue 调整堆结构的方式与 PriorityQueue 一致,先看看 PriorityQueue 的实现。
PriorityBlockingQueue 源码分析 - 图2

上图中我们给每个元素按照层序遍历的方式进行了编号,如果你足够细心,会发现父节点和子节点的编号是有联系的,更确切的说父子节点的编号之间有如下关系:

  1. leftNo = parentNo*2+1
  2. rightNo = parentNo*2+2
  3. parentNo = (nodeNo-1)/2

通过上述三个公式,可以轻易计算出某个节点的父节点以及子节点的下标。这也就是为什么可以直接用数组来存储堆的原因。
PriorityQueue 的 peek 和 element 操作是常数时间,add, offer, 无参数的 remove 以及 poll 方法的时间复杂度都是 log(N)。
向上调整示意图:

PriorityBlockingQueue 源码分析 - 图3

  1. // k 表示插入索引,x 表示插入元素
  2. private static <T> void siftUpComparable(int k, T x, Object[] array) {
  3. Comparable<? super T> key = (Comparable<? super T>) x;
  4. while (k > 0) {
  5. // 计算父节点索引
  6. int parent = (k - 1) >>> 1;
  7. // 父节点元素
  8. Object e = array[parent];
  9. // 只要元素大于父节点元素就退出
  10. if (key.compareTo((T) e) >= 0)
  11. break;
  12. // 父节点元素下沉
  13. array[k] = e;
  14. // 更新父节点索引
  15. k = parent;
  16. }
  17. // 调整后的最后位置
  18. array[k] = key;
  19. }

take 方法

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. // 加锁(可响应中断)
  4. lock.lockInterruptibly();
  5. E result;
  6. try {
  7. // 如果队列为空,take 方法会阻塞出队线程
  8. while ( (result = dequeue()) == null)
  9. /**
  10. * 如果队列中没有元素,会阻塞后续调用 take 方法出队的线程
  11. * 直到队列添加了元素后唤醒 notEmpty,才可以继续执行
  12. */
  13. notEmpty.await();
  14. } finally {
  15. // 释放锁
  16. lock.unlock();
  17. }
  18. return result;
  19. }

deque 方法

  1. private E dequeue() {
  2. int n = size - 1;
  3. if (n < 0)
  4. return null;
  5. else {
  6. Object[] array = queue;
  7. // 堆顶的元素
  8. E result = (E) array[0];
  9. // 堆最底层的元素(最后一个)
  10. E x = (E) array[n];
  11. // 把最后一个元素置 null,因为要把它放到堆顶,向下逐步调整堆结构,与 PriorityQueue 一致
  12. array[n] = null;
  13. Comparator<? super E> cmp = comparator;
  14. if (cmp == null)
  15. siftDownComparable(0, x, array, n);
  16. else
  17. siftDownUsingComparator(0, x, array, n, cmp);
  18. size = n;
  19. return result;
  20. }
  21. }

向下调整

PriorityBlockingQueue 的 take 方法会把堆顶元素取出,并把堆中最后一个元素放在堆顶,然后逐步将这个元素向下调整。
更新后的堆顶元素向下调整,始终与较小的子节点做比较。如果发现子节点比父节点大,说明调整结束。

  1. private static <T> void siftDownComparable(int k, T x, Object[] array,
  2. int n) {
  3. if (n > 0) {
  4. Comparable<? super T> key = (Comparable<? super T>)x;
  5. // 叶子节点在最后一层
  6. int half = n >>> 1; // loop while a non-leaf
  7. while (k < half) {
  8. // 假设左子节点是最小节点
  9. int child = (k << 1) + 1;
  10. Object c = array[child];
  11. int right = child + 1;
  12. // 左子节点和右子节点比较。如果左子节点大于右子节点,说明元素该与右子节点比较
  13. if (right < n &&
  14. ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
  15. c = array[child = right];
  16. //如果队尾元素比根元素孩子都要小,则不需"下移",结束
  17. if (key.compareTo((T) c) <= 0)
  18. break;
  19. array[k] = c;
  20. k = child;
  21. }
  22. array[k] = key;
  23. }
  24. }