一、应用

基本介绍

入队不阻塞,因为是无界。出队会阻塞,当队列为空时,阻塞
一个基于数组无界的优先级阻塞队列,数组的默认长度是11,但是可以自动无限的扩充,直到资源消耗尽为止,每次出队都返回优先级别最高的元素。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。
需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。优先级队列PriorityQueue:队列中每个元素都有一个优先级,出队的时候,优先级最高的先出。
底层基于二叉堆排序

二叉堆扩展

完全二叉树
除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序
二叉堆
完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,
二叉堆的两个类型:
大顶堆(最大堆)
父结点的键值总是大于或等于任何一个子节点的键值;
小顶堆(最小堆)
父结点的键值总是小于或等于任何一个子节点的键值。
仅保证根元素最大(或最小),不保证其他节点有序

二、原理学习

设计思想

优先级队列,底层使用数数组存储二叉堆。在元素入队时,将最大或者最小元素置于根节点。其他元素没有顺序
入队,即插入二叉堆元素,元素插到完全二叉树,最后一个位置。进行上浮操作
出队,最上层节点出队。首先最底层节点和最上面根节点交换,然后根节点下沉操作
构建二叉树参考:https://blog.csdn.net/xiaojiajia/article/details/82755722
数组坐标和二叉堆节点关系
假设元素在数组中的坐标为i;则
其父节点坐标:(i-1)>>>1 等价于 (i-1)/2;
左子节点坐标:(i<<1)+1 等价于2*i+1
右子节点坐标:(i<<1)+2 等价于2*i+2
PriorityBolckingQueue - 图1

put操作

  1. //入队不会阻塞,加锁保证线程安全。当空间不足时,自动 进行扩容。
  2. public void put(E e) {
  3. offer(e); // never need to block
  4. }
  5. public boolean offer(E e) {
  6. if (e == null)
  7. throw new NullPointerException();
  8. final ReentrantLock lock = this.lock;
  9. lock.lock();
  10. int n, cap;
  11. Object[] array;
  12. while ((n = size) >= (cap = (array = queue).length))
  13. tryGrow(array, cap);
  14. try {
  15. Comparator<? super E> cmp = comparator;
  16. if (cmp == null)
  17. //构建默认二叉堆
  18. siftUpComparable(n, e, array);
  19. else
  20. siftUpUsingComparator(n, e, array, cmp);
  21. size = n + 1;
  22. //新元素入队,则唤醒因队列为空,导致的阻塞的线程
  23. notEmpty.signal();
  24. } finally {
  25. lock.unlock();
  26. }
  27. return true;
  28. }

siftUpComparable(入队上浮)

  1. //使用数组存储,二叉堆(数组坐标和二叉堆节点关系,见上面)
  2. /**
  3. * k 当前数据元素个数(默认入队元素在数组中的位置)。x 当前入队元素 array 底层数据
  4. * 1.查找新入元素的父节点。
  5. * 如果新节点大于父节点,则不进行上浮动作。
  6. * 如果新节点小于父节点,则与父节点进行交换【即:上浮操作】。
  7. * 2.重复上面动作,直到,新节点达到根节点(k=0)或者新节点不用上浮
  8. **/
  9. private static <T> void siftUpComparable(int k, T x, Object[] array) {
  10. Comparable<? super T> key = (Comparable<? super T>) x;
  11. //查找当前元素父节点,并与之比较,如果小于父节点,则节点上浮
  12. while (k > 0) {
  13. int parent = (k - 1) >>> 1;
  14. Object e = array[parent];
  15. if (key.compareTo((T) e) >= 0)
  16. break;
  17. array[k] = e;
  18. k = parent;
  19. }
  20. //上浮完成
  21. array[k] = key;
  22. }

tryGrow扩容逻辑

  1. //扩容逻辑
  2. private void tryGrow(Object[] array, int oldCap) {
  3. lock.unlock(); // must release and then re-acquire main lock
  4. Object[] newArray = null;
  5. if (allocationSpinLock == 0 &&
  6. UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
  7. 0, 1)) {
  8. try {
  9. //当前容量小于64,则扩容后:2*oldCap+2
  10. //当前容量大于64,则扩容后:oldCap+(oldCap/2)
  11. int newCap = oldCap + ((oldCap < 64) ?
  12. (oldCap + 2) : // grow faster if small
  13. (oldCap >> 1));
  14. if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
  15. int minCap = oldCap + 1;
  16. if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
  17. throw new OutOfMemoryError();
  18. newCap = MAX_ARRAY_SIZE;
  19. }
  20. if (newCap > oldCap && queue == array)
  21. newArray = new Object[newCap];
  22. } finally {
  23. allocationSpinLock = 0;
  24. }
  25. }
  26. if (newArray == null) // back off if another thread is allocating
  27. Thread.yield();
  28. lock.lock();
  29. if (newArray != null && queue == array) {
  30. queue = newArray;
  31. System.arraycopy(array, 0, newArray, 0, oldCap);
  32. }
  33. }

take方法

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. E result;
  5. try {
  6. //如果队列为空,则进行阻塞
  7. while ( (result = dequeue()) == null)//await外部使用while,防止线程唤醒后,条件不在满足
  8. notEmpty.await();
  9. } finally {
  10. lock.unlock();
  11. }
  12. return result;
  13. }

dequeue方法

  1. private E dequeue() {
  2. int n = size - 1;
  3. if (n < 0)
  4. return null;
  5. else {
  6. Object[] array = queue;
  7. //入队时,已经保证数组0坐标位置元素,(默认)最小或者最大
  8. E result = (E) array[0];
  9. E x = (E) array[n];
  10. array[n] = null;
  11. Comparator<? super E> cmp = comparator;
  12. if (cmp == null)
  13. siftDownComparable(0, x, array, n);
  14. else
  15. siftDownUsingComparator(0, x, array, n, cmp);
  16. size = n;
  17. return result;
  18. }
  19. }

siftDownComparable(出队下沉)

在位置k处插入项x,通过反复将x降级到树下,直到它小于或等于其子项或是叶,从而保持堆不变。

  1. //k=0:表示二叉堆根节点。
  2. //将最后一个叶子节点,放置根节点,进行不断下沉,【重新整理二叉堆】
  3. private static <T> void siftDownComparable(int k, T x, Object[] array,
  4. int n) {
  5. if (n > 0) {
  6. Comparable<? super T> key = (Comparable<? super T>)x;
  7. int half = n >>> 1; // loop while a non-leaf
  8. while (k < half) {
  9. //计算子节点
  10. int child = (k << 1) + 1; // assume left child is least
  11. Object c = array[child];
  12. int right = child + 1;
  13. //比较左右子节点,选择最小
  14. if (right < n &&
  15. ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
  16. c = array[child = right];
  17. //与选择的元素比较,判读是否需要下沉
  18. if (key.compareTo((T) c) <= 0)
  19. break;
  20. array[k] = c;
  21. k = child;
  22. }
  23. //下沉完成
  24. array[k] = key;
  25. }
  26. }