延迟队列,入队不阻塞。出队,队列空时,会阻塞。对 PriorityQueue 的封装
一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素(实现底层排序),同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力
//泛型规定必须实现Delayed接口
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
}
public interface Delayed extends Comparable<Delayed> {
//getDelay 方法返回。任务执行前,还需等待的时间,
//如果返回 0 或者负数则代表任务已过期。
//元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。
long getDelay(TimeUnit unit);
}
如何实现定时出队。是否是park指定时间。如果当前元素不是最近需要出队的元素,则当前不设置过期时间,直接阻塞。因为最近出队的元素,出队后,会进行唤醒操作。最近出队的是leader线程,其他的为flower线程。flower等待leader唤醒
二、原理分析
基本属性
private final transient ReentrantLock lock = new ReentrantLock();
//保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
//对元素进行利用二叉堆排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* 出队操作中,标记是否有线程排队。指向第一个从队列中获取元素,被阻塞的线程
*/
private Thread leader = null;
/**
* 队列条件,队列为空时阻塞。当有新元素入队,则唤醒.
*/
private final Condition available = lock.newCondition();
/**
* 初始化.
*/
public DelayQueue() {}
/**
* 初始化
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
put操作
入队不阻塞,
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//调用PriorityQueue队列方法,进行入队
q.offer(e);
//判断当前元素是否位于队列头节点
if (q.peek() == e) {
//为头节点,则进行唤醒操作(条件队列转移到等待队列中)
leader = null;
available.signal();
}
return true;
} finally {
//唤醒等待队列
lock.unlock();
}
}
take操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//可中断 锁
lock.lockInterruptibly();
try {
for (;;) {
//获取队列第一个元素。
E first = q.peek();
if (first == null)
//如果为空,则阻塞消费者进程
available.await();
else {//队列不为null场景
//返回:任务执行前,还需等待的时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)//达到出队条件
return q.poll();
first = null; // don't retain ref while waiting
//其他消费线程,进行阻塞
if (leader != null)
available.await();
else {
//等待消费队列第一个元素的线程,设置为leader线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//阻塞 指定时间(delay),自动唤醒
//1.唤醒后,将leader设置为null,并开始尝试获取队列第一个元素
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
if (leader == null && q.peek() != null)
//条件队列转同步队列,准备唤醒阻塞在available上的线程
available.signal();
//唤醒的等待队列元素
lock.unlock();
}
}