PriorityBlockingQueue

PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,数组的默认长度是11,虽然指定了数组的长度,但是可以无限的扩充,直到资源消耗尽为止,每次出队都返回优先级别最高的或者最低的元素。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。
优先级队列PriorityQueue: 队列中每个元素都有一个优先级,出队的时候,优先级最高的先出。

使用

  1. //默认队列长度
  2. private static final int DEFAULT_INITIAL_CAPACITY = 11;
  3. //队列最大值
  4. private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  5. //存放的优先级队列
  6. private transient Object[] queue;
  7. //队列长度
  8. private transient int size;
  9. //优先级比较器
  10. private transient Comparator<? super E> comparator;
  11. //锁,PriorityBlockingQueue的入队和出队都用的同一把锁
  12. private final ReentrantLock lock;

优先级队列使用小顶堆或者大顶堆实现

完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型:大顶堆和小顶堆。

  • 大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值;
  • 小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。

截屏2022-04-01 22.56.44.png

用数组实现小顶堆

入队方法:

  1. public boolean offer(E e) {
  2. if (e == null)
  3. throw new NullPointerException();
  4. final ReentrantLock lock = this.lock;
  5. lock.lock();
  6. int n, cap;
  7. Object[] array;
  8. while ((n = size) >= (cap = (array = queue).length))
  9. tryGrow(array, cap);
  10. try {
  11. Comparator<? super E> cmp = comparator;
  12. if (cmp == null)
  13. siftUpComparable(n, e, array);
  14. else
  15. siftUpUsingComparator(n, e, array, cmp);
  16. size = n + 1;
  17. notEmpty.signal();
  18. } finally {
  19. lock.unlock();
  20. }
  21. return true;
  22. }
  23. /**
  24. * k=队列的长度,x是传来的值,array是优先级队列
  25. * 入队的时候,
  26. */
  27. private static <T> void siftUpComparable(int k, T x, Object[] array) {
  28. Comparable<? super T> key = (Comparable<? super T>) x;
  29. while (k > 0) {//表示当前队列有元素
  30. int parent = (k - 1) >>> 1;//获取父节点索引
  31. Object e = array[parent];//父节点
  32. if (key.compareTo((T) e) >= 0)//如果当前的值比父节点值要大,跳过
  33. break;
  34. //当前的x比父节点小,要把小的值进行上浮
  35. //把当前节点修改为父节点的值
  36. array[k] = e;
  37. //然后把k置为parent的索引,这里的k就是最小堆堆顶索引
  38. k = parent;
  39. }
  40. //在k=0的时候,是第一次入队,所以可以直接把array[k] = key
  41. //在k>0时,就是设置堆顶的索引
  42. array[k] = key;
  43. }

应用场景

  • 电商抢购活动,会员级别高的用户优先抢购到商品s
  • 银行办理业务,vip客户插队

DelayQueue

DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。

数据结构

  1. //用于保证队列操作的线程安全
  2. private final transient ReentrantLock lock = new ReentrantLock();
  3. // 优先级队列,存储元素,用于保证延迟低的优先执行
  4. private final PriorityQueue<E> q = new PriorityQueue<E>();
  5. // 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程
  6. private Thread leader = null;
  7. // 条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知
  8. private final Condition available = lock.newCondition();

入队put方法

  1. public void put(E e) {
  2. offer(e);
  3. }
  4. public boolean offer(E e) {
  5. final ReentrantLock lock = this.lock;
  6. //加锁
  7. lock.lock();
  8. try {
  9. //入队
  10. q.offer(e);
  11. if (q.peek() == e) {
  12. // 若入队的元素位于队列头部,说明当前元素延迟最小
  13. // 将 leader 置空
  14. leader = null;
  15. // available条件队列转同步队列,准备唤醒阻塞在available上的线程
  16. available.signal();
  17. }
  18. return true;
  19. } finally {
  20. lock.unlock();// 解锁,真正唤醒阻塞的线程
  21. }
  22. }

出队take方法

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. //加锁
  4. lock.lockInterruptibly();
  5. try {
  6. for (;;) {
  7. E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)
  8. if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
  9. available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。
  10. else {
  11. long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间
  12. if (delay <= 0)
  13. return q.poll();// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
  14. // 如果delay大于0 ,则下面要阻塞了
  15. // 将first置为空方便gc
  16. first = null; // don't retain ref while waiting
  17. if (leader != null)// 如果有线程争抢的Leader线程,则进行无限期等待。
  18. available.await();
  19. else {
  20. // 如果leader为null,把当前线程赋值给它
  21. Thread thisThread = Thread.currentThread();
  22. leader = thisThread;
  23. try {
  24. // 等待剩余等待时间
  25. available.awaitNanos(delay);
  26. } finally {
  27. // 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
  28. if (leader == thisThread)
  29. leader = null;
  30. }
  31. }
  32. }
  33. }
  34. } finally {
  35. // 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
  36. if (leader == null && q.peek() != null)
  37. available.signal();// available条件队列转同步队列,准备唤醒阻塞在available上的线程
  38. lock.unlock();
  39. }
  40. }

阻塞队列选择策略

  • 功能
    • 第 1 个需要考虑的就是功能层面,比如是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。
  • 容量
    • 需要根据任务数量来推算出合适的容量。容量固定的,如 ArrayBlockingQueue。容量固定的,如 ArrayBlockingQueue。有的里面没有任何容量,如 SynchronousQueue。对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。
  • 能否扩容
    • 如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。
  • 内存结构
    • ArrayBlockingQueue的内部结构是“数组”的形式,LinkedBlockingQueue 的内部是用链表实现。ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
  • 性能
    • LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好