DelayQueue 并发队列是一个无界阻塞延迟队列,队列中的每个元素都有个过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最快要过期的元素。

DelayQueue 类图结构

DelayQueue 类图结构如图 7-34 所示。

由该图可知,DelayQueue 内部使用 PriorityQueue 存放数据,使用 ReentrantLock 实现线程同步。另外,队列里面的元素要实现 Delayed 接口,由于每个元素都有一个过期时间,所以要实现获知当前元素还剩下多少时间就过期了的接口,由于内部使用优先级队列来实现,所以要实现元素之间相互比较的接口

  1. public interface Delayed extends Comparable<Delayed> {
  2. long getDelay(TimeUnit unit);
  3. }

DelayQueue 原理探究 - 图1

图 7-34

在如下代码中,条件变量 available 与 lock 锁是对应的,其目的是为了实现线程间同步。

  1. private final Condition available = lock.newCondition();

其中 leader 变量的使用基于 Leader-Follower 模式的变体,用于尽量减少不必要的线程等待。当一个线程调用队列的 take 方法变为 leader 线程后,它会调用条件变量 available. awaitNanos(delay)等待 delay 时间,但是其他线程(follwer 线程)则会调用 available. await()进行无限等待。leader 线程延迟时间过期后,会退出 take 方法,并通过调用 available.signal()方法唤醒一个 follwer 线程,被唤醒的 follwer 线程被选举为新的 leader 线程。

主要函数原理讲解

1.offer 操作

插入元素到队列,如果插入元素为 null 则抛出 NullPointerException 异常,否则由于是无界队列,所以一直返回 true。插入元素要实现 Delayed 接口。

  1. public boolean offer(E e) {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. q.offer(e);
  6. if (q.peek() == e) {//(2)
  7. leader = null;
  8. available.signal();
  9. }
  10. return true;
  11. } finally {
  12. lock.unlock();
  13. }
  14. }

如上代码首先获取独占锁,然后添加元素到优先级队列,由于 q 是优先级队列,所以添加元素后,调用 q.peek()方法返回的并不一定是当前添加的元素。如果代码(2)判断结果为 true,则说明当前元素 e 是最先将过期的,那么重置 leader 线程为 null,这时候激活 avaliable 变量条件队列里面的一个线程,告诉它队列里面有元素了。

2.take 操作

获取并移除队列里面延迟时间过期的元素,如果队列里面没有过期元素则等待。

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock
  3. lock.lockInterruptibly();
  4. try {
  5. for (; ; {
  6. //获取但不移除队首元素(1)
  7. E first = q.peek();
  8. if first == null
  9. available.await(); //(2)
  10. else {
  11. long delay = first.getDelay(TimeUnit.NANOSECONDS);
  12. if (delay <= 0)//(3)
  13. return q.poll();
  14. else if (leader ! = null)//(4)
  15. available.await();
  16. else {
  17. Thread thisThread = Thread.currentThread();
  18. leader = thisThread; //(5)
  19. try {
  20. available.awaitNanos(delay); //(6)
  21. } finally {
  22. if (leader == thisThread)
  23. leader = null;
  24. }
  25. }
  26. }
  27. }
  28. } finally {
  29. if (leader == null && q.peek() ! = null)//(7)
  30. available.signal();
  31. lock.unlock(); //(8)
  32. }
  33. }

如上代码首先获取独占锁 lock。假设线程 A 第一次调用队列的 take()方法时队列为空,则执行代码(1)后 first==null,所以会执行代码(2)把当前线程放入 available 的条件队列里阻塞等待。

当有另外一个线程 B 执行 offer(item)方法并且添加元素到队列时,假设此时没有其他线程执行入队操作,则线程 B 添加的元素是队首元素,那么执行 q.peek()。

e 这时候就会重置 leader 线程为 null,并且激活条件变量的条件队列里面的一个线程。此时线程 A 就会被激活。

线程 A 被激活并循环后重新获取队首元素,这时候 first 就是线程 B 新增的元素,可知这时候 first 不为 null,则调用 first.getDelay(TimeUnit.NANOSECONDS)方法查看该元素还剩余多少时间就要过期,如果 delay<=0 则说明已经过期,那么直接出队返回。否则查看 leader 是否为 null,不为 null 则说明其他线程也在执行 take,则把该线程放入条件队列。如果这时候 leader 为 null,则选取当前线程 A 为 leader 线程,然后执行代码(5)等待 delay 时间(这期间该线程会释放锁,所以其他线程可以 offer 添加元素,也可以 take 阻塞自己),剩余过期时间到后,线程 A 会重新竞争得到锁,然后重置 leader 线程为 null,重新进入循环,这时候就会发现队头的元素已经过期了,则会直接返回队头元素。

在返回前会执行 finally 块里面的代码(7),代码(7)执行结果为 true 则说明当前线程从队列移除过期元素后,又有其他线程执行了入队操作,那么这时候调用条件变量的 singal 方法,激活条件队列里面的等待线程。

3.poll 操作

获取并移除队头过期元素,如果没有过期元素则返回 null。

  1. public E poll() {
  2. final ReentrantLock lock = this.lock
  3. lock.lock();
  4. try {
  5. E first = q.peek();
  6. //如果队列为空,或者不为空但是队头元素没有过期则返回 null
  7. if first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0
  8. return null
  9. else
  10. return q.poll();
  11. } finally {
  12. lock.unlock();
  13. }
  14. }

这段代码比较简单,首先获取独占锁,然后获取队头元素,如果队头元素为 null 或者还没过期则返回 null,否则返回队头元素。

4.size 操作

计算队列元素个数,包含过期的和没有过期的。

  1. public int size() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. return q.size();
  6. } finally {
  7. lock.unlock();
  8. }
  9. }

这段代码比较简单,首先获取独占锁,然后调用优先级队列的 size 方法。

案例介绍

下面我们通过一个简单的案例来加深对 DelayQueue 的理解,代码如下。

  1. public class TestDelay {
  2. static class DelayedEle implements Delayed {
  3. private final long delayTime // 延迟时间
  4. private final long expire // 到期时间
  5. private String taskName // 任务名称
  6. public DelayedElelong delay, String taskName {
  7. delayTime = delay
  8. this.taskName = taskName
  9. expire = System.currentTimeMillis() + delay
  10. }
  11. /**
  12. 剩余时间 = 到期时间-当前时间
  13. */
  14. @Override
  15. public long getDelayTimeUnit unit {
  16. return unit.convertthis.expire - System.currentTimeMillis(), TimeUnit.
  17. MILLISECONDS);
  18. }
  19. /**
  20. 优先级队列里面的优先级规则
  21. */
  22. @Override
  23. public int compareToDelayed o {
  24. return int this.getDelay(TimeUnit.MILLISECONDS) -
  25. o.getDelay(TimeUnit.MILLISECONDS));
  26. }
  27. @Override
  28. public String toString() {
  29. final StringBuilder sb = new StringBuilder(「DelayedEle{」);
  30. sb.append(「delay=」).appenddelayTime);
  31. sb.append(「, expire=」).appendexpire);
  32. sb.append(「, taskName=『」).appendtaskName).append(』\『』);
  33. sb.append(『}』);
  34. return sb.toString();
  35. }
  36. }
  37. public static void mainString[] args {
  38. //(1)创建 delay 队列
  39. DelayQueue<DelayedEle> delayQueue = new DelayQueue<DelayedEle>();
  40. //(2)创建延迟任务
  41. Random random = new Random();
  42. for int i = 0 i < 10 ++i {
  43. DelayedEle element = new DelayedEle(random.nextInt(500), task:」 + i);
  44. delayQueue.offerelement);
  45. }
  46. //(3)依次取出任务并打印
  47. DelayedEle ele = null
  48. try {
  49. //(3.1)循环,如果想避免虚假唤醒,则不能把全部元素都打印出来
  50. for(; ){
  51. //(3.2)获取过期任务并打印
  52. while ((ele = delayQueue.take()) = null) {
  53. System.out.println(ele.toString());
  54. }
  55. }
  56. } catch InterruptedException e {
  57. e.printStackTrace();
  58. }
  59. }
  60. }

如上代码首先创建延迟任务 DelayedEle 类,其中 delayTime 表示当前任务需要延迟多少 ms 时间过期,expire 则是当前时间的 ms 值加上 delayTime 的值。另外,实现了 Delayed 接口,实现了 long getDelay(TimeUnit unit)方法用来获取当前元素还剩下多少时间过期,实现了 int compareTo(Delayed o)方法用来决定优先级队列元素的比较规则。

在 main 函数内首先创建了一个延迟队列,然后使用随机数生成器生成了 10 个延迟任务,最后通过循环依次获取延迟任务,并打印。运行上面代码,一个可能的输出如下所示。

  1. DelayedEle{delay=73, expire=1523428917194, taskName='task:4'}
  2. DelayedEle{delay=97, expire=1523428917218, taskName='task:5'}
  3. DelayedEle{delay=150, expire=1523428917272, taskName='task:9'}
  4. DelayedEle{delay=205, expire=1523428917326, taskName='task:3'}
  5. DelayedEle{delay=236, expire=1523428917354, taskName='task:1'}
  6. DelayedEle{delay=324, expire=1523428917446, taskName='task:7'}
  7. DelayedEle{delay=340, expire=1523428917461, taskName='task:2'}
  8. DelayedEle{delay=392, expire=1523428917510, taskName='task:0'}
  9. DelayedEle{delay=403, expire=1523428917525, taskName='task:8'}
  10. DelayedEle{delay=416, expire=1523428917538, taskName='task:6'}

可见,出队的顺序和 delay 时间有关,而与创建任务的顺序无关。

小结

本节讲解了 DelayQueue 队列(见图 7-34),其内部使用 PriorityQueue 存放数据,使用 ReentrantLock 实现线程同步。另外队列里面的元素要实现 Delayed 接口,其中一个是获取当前元素到过期时间剩余时间的接口,在出队时判断元素是否过期了,一个是元素之间比较的接口,因为这是一个有优先级的队列。

DelayQueue 原理探究 - 图2