延迟队列,入队不阻塞。出队,队列空时,会阻塞。对 PriorityQueue 的封装
一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素(实现底层排序),同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力
//泛型规定必须实现Delayed接口public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {}public interface Delayed extends Comparable<Delayed> {//getDelay 方法返回。任务执行前,还需等待的时间,//如果返回 0 或者负数则代表任务已过期。//元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。long getDelay(TimeUnit unit);}
如何实现定时出队。是否是park指定时间。如果当前元素不是最近需要出队的元素,则当前不设置过期时间,直接阻塞。因为最近出队的元素,出队后,会进行唤醒操作。最近出队的是leader线程,其他的为flower线程。flower等待leader唤醒
二、原理分析
基本属性
private final transient ReentrantLock lock = new ReentrantLock();//保证线程安全private final transient ReentrantLock lock = new ReentrantLock();//对元素进行利用二叉堆排序private final PriorityQueue<E> q = new PriorityQueue<E>();/*** 出队操作中,标记是否有线程排队。指向第一个从队列中获取元素,被阻塞的线程*/private Thread leader = null;/*** 队列条件,队列为空时阻塞。当有新元素入队,则唤醒.*/private final Condition available = lock.newCondition();/*** 初始化.*/public DelayQueue() {}/*** 初始化*/public DelayQueue(Collection<? extends E> c) {this.addAll(c);}
put操作
入队不阻塞,
public void put(E e) {offer(e);}public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {//调用PriorityQueue队列方法,进行入队q.offer(e);//判断当前元素是否位于队列头节点if (q.peek() == e) {//为头节点,则进行唤醒操作(条件队列转移到等待队列中)leader = null;available.signal();}return true;} finally {//唤醒等待队列lock.unlock();}}
take操作
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//可中断 锁lock.lockInterruptibly();try {for (;;) {//获取队列第一个元素。E first = q.peek();if (first == null)//如果为空,则阻塞消费者进程available.await();else {//队列不为null场景//返回:任务执行前,还需等待的时间long delay = first.getDelay(NANOSECONDS);if (delay <= 0)//达到出队条件return q.poll();first = null; // don't retain ref while waiting//其他消费线程,进行阻塞if (leader != null)available.await();else {//等待消费队列第一个元素的线程,设置为leader线程Thread thisThread = Thread.currentThread();leader = thisThread;try {//阻塞 指定时间(delay),自动唤醒//1.唤醒后,将leader设置为null,并开始尝试获取队列第一个元素available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {//如果leader为空且堆顶还有元素,就唤醒下一个等待的线程if (leader == null && q.peek() != null)//条件队列转同步队列,准备唤醒阻塞在available上的线程available.signal();//唤醒的等待队列元素lock.unlock();}}
