延迟队列,入队不阻塞。出队,队列空时,会阻塞。对 PriorityQueue 的封装

一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素(实现底层排序),同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力

  1. //泛型规定必须实现Delayed接口
  2. public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
  3. implements BlockingQueue<E> {
  4. }
  5. public interface Delayed extends Comparable<Delayed> {
  6. //getDelay 方法返回。任务执行前,还需等待的时间,
  7. //如果返回 0 或者负数则代表任务已过期。
  8. //元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。
  9. long getDelay(TimeUnit unit);
  10. }

如何实现定时出队。是否是park指定时间。如果当前元素不是最近需要出队的元素,则当前不设置过期时间,直接阻塞。因为最近出队的元素,出队后,会进行唤醒操作。最近出队的是leader线程,其他的为flower线程。flower等待leader唤醒

二、原理分析

基本属性

  1. private final transient ReentrantLock lock = new ReentrantLock();
  2. //保证线程安全
  3. private final transient ReentrantLock lock = new ReentrantLock();
  4. //对元素进行利用二叉堆排序
  5. private final PriorityQueue<E> q = new PriorityQueue<E>();
  6. /**
  7. * 出队操作中,标记是否有线程排队。指向第一个从队列中获取元素,被阻塞的线程
  8. */
  9. private Thread leader = null;
  10. /**
  11. * 队列条件,队列为空时阻塞。当有新元素入队,则唤醒.
  12. */
  13. private final Condition available = lock.newCondition();
  14. /**
  15. * 初始化.
  16. */
  17. public DelayQueue() {}
  18. /**
  19. * 初始化
  20. */
  21. public DelayQueue(Collection<? extends E> c) {
  22. this.addAll(c);
  23. }

put操作

入队不阻塞,

  1. public void put(E e) {
  2. offer(e);
  3. }
  4. public boolean offer(E e) {
  5. final ReentrantLock lock = this.lock;
  6. lock.lock();
  7. try {
  8. //调用PriorityQueue队列方法,进行入队
  9. q.offer(e);
  10. //判断当前元素是否位于队列头节点
  11. if (q.peek() == e) {
  12. //为头节点,则进行唤醒操作(条件队列转移到等待队列中)
  13. leader = null;
  14. available.signal();
  15. }
  16. return true;
  17. } finally {
  18. //唤醒等待队列
  19. lock.unlock();
  20. }
  21. }

take操作

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. //可中断 锁
  4. lock.lockInterruptibly();
  5. try {
  6. for (;;) {
  7. //获取队列第一个元素。
  8. E first = q.peek();
  9. if (first == null)
  10. //如果为空,则阻塞消费者进程
  11. available.await();
  12. else {//队列不为null场景
  13. //返回:任务执行前,还需等待的时间
  14. long delay = first.getDelay(NANOSECONDS);
  15. if (delay <= 0)//达到出队条件
  16. return q.poll();
  17. first = null; // don't retain ref while waiting
  18. //其他消费线程,进行阻塞
  19. if (leader != null)
  20. available.await();
  21. else {
  22. //等待消费队列第一个元素的线程,设置为leader线程
  23. Thread thisThread = Thread.currentThread();
  24. leader = thisThread;
  25. try {
  26. //阻塞 指定时间(delay),自动唤醒
  27. //1.唤醒后,将leader设置为null,并开始尝试获取队列第一个元素
  28. available.awaitNanos(delay);
  29. } finally {
  30. if (leader == thisThread)
  31. leader = null;
  32. }
  33. }
  34. }
  35. }
  36. } finally {
  37. //如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
  38. if (leader == null && q.peek() != null)
  39. //条件队列转同步队列,准备唤醒阻塞在available上的线程
  40. available.signal();
  41. //唤醒的等待队列元素
  42. lock.unlock();
  43. }
  44. }