PriorityBlockingQueue
PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,数组的默认长度是11,虽然指定了数组的长度,但是可以无限的扩充,直到资源消耗尽为止,每次出队都返回优先级别最高的或者最低的元素。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。
优先级队列PriorityQueue: 队列中每个元素都有一个优先级,出队的时候,优先级最高的先出。
使用
//默认队列长度private static final int DEFAULT_INITIAL_CAPACITY = 11;//队列最大值private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;//存放的优先级队列private transient Object[] queue;//队列长度private transient int size;//优先级比较器private transient Comparator<? super E> comparator;//锁,PriorityBlockingQueue的入队和出队都用的同一把锁private final ReentrantLock lock;
优先级队列使用小顶堆或者大顶堆实现
完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型:大顶堆和小顶堆。
- 大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值;
- 小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。

用数组实现小顶堆
入队方法:
public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}/*** k=队列的长度,x是传来的值,array是优先级队列* 入队的时候,*/private static <T> void siftUpComparable(int k, T x, Object[] array) {Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {//表示当前队列有元素int parent = (k - 1) >>> 1;//获取父节点索引Object e = array[parent];//父节点if (key.compareTo((T) e) >= 0)//如果当前的值比父节点值要大,跳过break;//当前的x比父节点小,要把小的值进行上浮//把当前节点修改为父节点的值array[k] = e;//然后把k置为parent的索引,这里的k就是最小堆堆顶索引k = parent;}//在k=0的时候,是第一次入队,所以可以直接把array[k] = key//在k>0时,就是设置堆顶的索引array[k] = key;}
应用场景
- 电商抢购活动,会员级别高的用户优先抢购到商品s
- 银行办理业务,vip客户插队
DelayQueue
DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
数据结构
//用于保证队列操作的线程安全private final transient ReentrantLock lock = new ReentrantLock();// 优先级队列,存储元素,用于保证延迟低的优先执行private final PriorityQueue<E> q = new PriorityQueue<E>();// 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程private Thread leader = null;// 条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知private final Condition available = lock.newCondition();
入队put方法
public void put(E e) {offer(e);}public boolean offer(E e) {final ReentrantLock lock = this.lock;//加锁lock.lock();try {//入队q.offer(e);if (q.peek() == e) {// 若入队的元素位于队列头部,说明当前元素延迟最小// 将 leader 置空leader = null;// available条件队列转同步队列,准备唤醒阻塞在available上的线程available.signal();}return true;} finally {lock.unlock();// 解锁,真正唤醒阻塞的线程}}
出队take方法
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//加锁lock.lockInterruptibly();try {for (;;) {E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。else {long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间if (delay <= 0)return q.poll();// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素// 如果delay大于0 ,则下面要阻塞了// 将first置为空方便gcfirst = null; // don't retain ref while waitingif (leader != null)// 如果有线程争抢的Leader线程,则进行无限期等待。available.await();else {// 如果leader为null,把当前线程赋值给它Thread thisThread = Thread.currentThread();leader = thisThread;try {// 等待剩余等待时间available.awaitNanos(delay);} finally {// 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素if (leader == thisThread)leader = null;}}}}} finally {// 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程if (leader == null && q.peek() != null)available.signal();// available条件队列转同步队列,准备唤醒阻塞在available上的线程lock.unlock();}}
阻塞队列选择策略
- 功能
- 第 1 个需要考虑的就是功能层面,比如是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。
- 容量
- 需要根据任务数量来推算出合适的容量。容量固定的,如 ArrayBlockingQueue。容量固定的,如 ArrayBlockingQueue。有的里面没有任何容量,如 SynchronousQueue。对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。
- 能否扩容
- 如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。
- 内存结构
- ArrayBlockingQueue的内部结构是“数组”的形式,LinkedBlockingQueue 的内部是用链表实现。ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
- 性能
- LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好
