DelayedWorkQueue是ScheduledThreadPoolExecutor自己内部实现的延迟队列, DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以 DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面(维护了一个小顶堆

有关堆树的具体性质以及堆的插入、删除、堆化操作可以阅读之前的文章《堆树》

ScheduledThreadPoolExecutor中为什么要使用DelayedWorkQueue?

  1. 定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。
  2. DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中 执行时间最靠前的,由于它是基于堆树结构的队列,根据堆树的性质:父节点小于(大于)其子节点,并且堆树结构在执行插入和删除操作时的最坏时间复杂度是 O(logN),堆树可以达到上述目的。

DelayedWorkQueue的属性

微信截图_20210712223129.png
INITIAL_CAPACITY:初始化数组长度
queue:数组实现的队列,用来存放线程池提交过来的任务
size:数组元素个数
leader:执行队列头任务的leader线程
available:当较新的任务在队列的头部可用时或者新线程可能需要成为leader,则通过该 条件发出信号

  1. // 初始化数组长度
  2. private static final int INITIAL_CAPACITY = 16;
  3. // 使用数组来储存线程池提交的任务
  4. private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
  5. // 使用lock来保证多线程并发安全问题
  6. private final ReentrantLock lock = new ReentrantLock();
  7. // 队列中储存元素的大小
  8. private int size = 0;
  9. //特指队列头任务所在leader线程
  10. private Thread leader = null;
  11. //当较新的任务在队列的头部可用时,或者新线程可能需要成为leader,则通过该条件发出信号
  12. private final Condition available = lock.newCondition();

注意这里的leader,对于多线程的网络模型来说:所有线程会有三种身份中的一种:leaderfollower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待 成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事 件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己 就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下 次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。

入队方法

DelayedWorkQueue 提供了 put/add/offer 三个插入元素方法。与普通阻塞队列相比,这三个添加方法都是调用offer方法,那是因为它没有队列已满的条件,也就是说可以不断地向DelayedWorkQueue中添加元素,当元素个数超过数组长度时,数组会进行扩容。

  1. public void put(Runnable e) {
  2. offer(e);
  3. }
  4. public boolean add(Runnable e) {
  5. return offer(e);
  6. }

offer()入队

我们来看一看offer()方法的具体实现

  1. 如果当前队列已满(size >= queue.length),调用grow()进行扩容
  2. 队列未满,size+1
  3. 判断添加的元素是否是第一个,是则不需要堆化
  4. 添加的元素不是第一个,则需要堆化siftUp
  5. 如果堆顶元素刚好是此时被添加的元素,则唤醒调用take()方法中被阻塞的线程消费
    1. public boolean offer(Runnable x) {
    2. if (x == null)
    3. throw new NullPointerException();
    4. RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    5. final ReentrantLock lock = this.lock;
    6. lock.lock();
    7. try {
    8. int i = size;
    9. //队列已满,扩容50%
    10. if (i >= queue.length)
    11. grow();
    12. size = i + 1;
    13. //若队列为空,直接将任务放到队头
    14. if (i == 0) {
    15. queue[0] = e;
    16. setIndex(e, 0);
    17. //队列不为空,任务放入队列后需要进行堆化,重新调整为小顶堆
    18. } else {
    19. siftUp(i, e);
    20. }
    21. //堆顶任务刚好为添加的任务,则唤醒线程take
    22. if (queue[0] == e) {
    23. leader = null;
    24. available.signal();
    25. }
    26. } finally {
    27. lock.unlock();
    28. }
    29. return true;
    30. }
    微信截图_20210713091615.png

grow()数组扩容

可以看出,扩容50%,扩容后的长度为原来的1.5倍

  1. private void grow() {
  2. int oldCapacity = queue.length;
  3. int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
  4. if (newCapacity < 0) // overflow
  5. newCapacity = Integer.MAX_VALUE;
  6. queue = Arrays.copyOf(queue, newCapacity);
  7. }

siftup()堆化

新添加的元素先会加到当前数组的尾部,然后一步步和上面的父亲节点比较,若小于父亲节点则和父亲节点互换位置,循环比较直至大于父亲节点才结束循环。

  1. 获取数组最后一个节点的父节点下标
  2. 比较新插入节点的任务开始时间与其父节点的任务开始时间,若大于其父节点任务开始时间,则符合小顶堆的要求,直接退出
  3. 若不满足,则子节点与父节点互换位置
  4. 循环执行上面操作,直到该堆树满足小顶堆
    1. private void siftUp(int k, RunnableScheduledFuture<?> key) {
    2. while (k > 0) {
    3. //获取数组最后一个节点的父节点下标
    4. int parent = (k - 1) >>> 1;
    5. RunnableScheduledFuture<?> e = queue[parent];
    6. //比较新插入节点的任务开始时间与其父节点的任务开始时间,若大于其父节点任务开始时间,则符合小顶堆的要求,直接退出
    7. if (key.compareTo(e) >= 0)
    8. break;
    9. //否则父节点下移
    10. queue[k] = e;
    11. setIndex(e, k);
    12. k = parent;
    13. }
    14. //找到当前插入的位置
    15. queue[k] = key;
    16. setIndex(key, k);
    17. }
    根据key节点与它的父节点任务开始时间判断,如果key节点的执行时间小于父节点,则将两个节点交换,使执行时间靠前的节点排列在队列的前面。

出队方法

DelayedWorkQueue 提供了以下几个出队方法

  • take(),等待获取队列头元素
  • poll() ,立即获取队列头元素
  • poll(long timeout, TimeUnit unit) ,超时等待获取队列头元素

take()

Worker工作线程启动后就会循环消费阻塞队列中的任务,因为定时线程池的keepAliveTime=0,所以消费任务其只调用了DelayedWorkQueue的take()方法:

  1. 获取堆顶元素,判断堆顶元素是否是空,空的则阻塞线程 available.await(),放入条件等待队列中。
  2. 堆顶元素不为空,则获取任务开始执行时间delay,delay <= 0说明到了执行时间(执行时间小于当前时间),finishPoll()堆顶元素出队重新堆化
  3. delay > 0还没到执行时间(执行时间大于当前时间),判断leader线程是否为空,不为空则说明有其他线程成为leader,等待执行下次堆顶中到来的新任务,当前线程将阻塞等待,直到leader执行完任务唤醒。
  4. leader线程为空,当前take线程设置为leader,并阻塞等待delay时长。
  5. 当前leader线程等待delay时长自动唤醒,放弃leader身份(leader = null),重新进行for循环再一次判断delay,重复上述操作
  6. 跳出循环后判断leader为空并且堆顶元素不为空,则唤醒其他take线程,最后是否锁。

    1. public RunnableScheduledFuture<?> take() throws InterruptedException {
    2. final ReentrantLock lock = this.lock;
    3. lock.lockInterruptibly();
    4. try {
    5. for (;;) {
    6. //获取队头任务
    7. RunnableScheduledFuture<?> first = queue[0];
    8. //如果任务为空,则放入条件等待队列中
    9. if (first == null)
    10. available.await();
    11. else {
    12. //获取任务的执行时间
    13. long delay = first.getDelay(NANOSECONDS);
    14. //如果delay<=0,说明任务开始执行时间小于当前时间,返回堆顶任务,重新堆化
    15. if (delay <= 0)
    16. return finishPoll(first);
    17. first = null;
    18. //leader线程不为空,则说明有其他线程已经成为leader,等待下次执行任务,当前线程阻塞
    19. if (leader != null)
    20. available.await();
    21. //leader线程为空,将当前线程设置为leader线程
    22. else {
    23. Thread thisThread = Thread.currentThread();
    24. leader = thisThread;
    25. try {
    26. //阻塞等待delay时长
    27. available.awaitNanos(delay);
    28. //线程唤醒后让出leader身份,可以去获取任务
    29. } finally {
    30. if (leader == thisThread)
    31. leader = null;
    32. }
    33. }
    34. }
    35. }
    36. } finally {
    37. if (leader == null && queue[0] != null)
    38. //唤醒被阻塞的线程
    39. available.signal();
    40. lock.unlock();
    41. }
    42. }

    微信截图_20210713105139.png

    线程具体如何进入条件等待队列进行阻塞和被唤醒,可以看我之前的文章《阻塞队列-ArrayBlockingQueue》

leader线程的作用
当一个线程变成leader线程时,只需要等待下一次的延迟时间,而不是leader线程的其他线程被阻塞,需要等leader线程执行完任务才唤醒其他被阻塞的线程。

举例来说,如果没有leader,那么在执行take()时,都要执行available.awaitNanos(delay),假设当前线程执行了该段代码,这时还没有signal,第二个线程也执行了该段代码,则第二个线程也要被阻塞。多个这时执行该段代码是没有作用的,因为只能有一个线程会从take中返回queue[0](因为有lock),其他线程这时再返回for循环执行时取的queue[0],已经不是之前的queue[0]了,然后又要继续阻塞。

finishPoll()

堆顶元素出堆,具体方法是将数组的第一个元素和最后一个元素互换位置,然后向下进行堆化

  1. private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
  2. int s = --size;
  3. RunnableScheduledFuture<?> x = queue[s];
  4. queue[s] = null;
  5. if (s != 0)
  6. siftDown(0, x);
  7. setIndex(f, -1);
  8. return f;
  9. }

从根节点开始向下堆化

  1. private void siftDown(int k, RunnableScheduledFuture<?> key) {
  2. int half = size >>> 1;
  3. while (k < half) {
  4. //获取孩子节点
  5. int child = (k << 1) + 1;
  6. RunnableScheduledFuture<?> c = queue[child];
  7. int right = child + 1;
  8. //选出最小的孩子节点
  9. if (right < size && c.compareTo(queue[right]) > 0)
  10. c = queue[child = right];
  11. //若父节点小于孩子节点,堆化完成,跳出循环
  12. if (key.compareTo(c) <= 0)
  13. break;
  14. //父节点向下调整
  15. queue[k] = c;
  16. setIndex(c, k);
  17. //设置新的parent,开始下一轮堆化
  18. k = child;
  19. }
  20. //找到堆顶元素插入的位置
  21. queue[k] = key;
  22. setIndex(key, k);
  23. }

例如数组 {3, 5, 6, 7, 10, 12, 8},要删除根节点3,首先与尾节点交换位置,删除尾节点
微信截图_20210713110344.png
依次向下调整直到符合堆树条件
微信截图_20210713110423.png

poll()

立即获取队列头元素,当队列头任务是null,或者任务延时时间没有到,表示这个任务还不能返回,因此直接返回null。否则调用finishPoll方法,返回堆顶元素并重新堆化。

  1. public RunnableScheduledFuture<?> poll() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. RunnableScheduledFuture<?> first = queue[0];
  6. if (first == null || first.getDelay(NANOSECONDS) > 0)
  7. return null;
  8. else
  9. return finishPoll(first);
  10. } finally {
  11. lock.unlock();
  12. }
  13. }

poll(long timeout, TimeUnit unit)

超时等待获取队列头元素,与take方法相比较,就要考虑设置的超时时间,如果超时时间到了,还没有获取到有用任务,那么就返回null。其他的与take方法中逻辑一样。

  1. public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. long nanos = unit.toNanos(timeout);
  4. final ReentrantLock lock = this.lock;
  5. lock.lockInterruptibly();
  6. try {
  7. for (;;) {
  8. RunnableScheduledFuture<?> first = queue[0];
  9. // 如果没有任务。
  10. if (first == null) {
  11. // 超时时间已到,那么就直接返回null
  12. if (nanos <= 0)
  13. return null;
  14. else
  15. // 否则就让线程在available条件下等待nanos时间
  16. nanos = available.awaitNanos(nanos);
  17. } else {
  18. // 获取任务的剩余延时时间
  19. long delay = first.getDelay(NANOSECONDS);
  20. // 如果延时时间到了,就返回这个任务,用来执行。
  21. if (delay <= 0)
  22. return finishPoll(first);
  23. // 如果超时时间已到,那么就直接返回null
  24. if (nanos <= 0)
  25. return null;
  26. // 将first设置为null,当线程等待时,不持有first的引用
  27. first = null;
  28. // 如果超时时间小于任务的剩余延时时间,那么就有可能获取不到任务。
  29. // 在这里让线程等待超时时间nanos
  30. if (nanos < delay || leader != null)
  31. nanos = available.awaitNanos(nanos);
  32. else {
  33. Thread thisThread = Thread.currentThread();
  34. leader = thisThread;
  35. try {
  36. // 当任务的延时时间到了时,能够自动超时唤醒。
  37. long timeLeft = available.awaitNanos(delay);
  38. // 计算剩余的超时时间
  39. nanos -= delay - timeLeft;
  40. } finally {
  41. if (leader == thisThread)
  42. leader = null;
  43. }
  44. }
  45. }
  46. }
  47. } finally {
  48. if (leader == null && queue[0] != null)
  49. // 唤醒等待任务的线程
  50. available.signal();
  51. lock.unlock();
  52. }
  53. }

总结

使用优先级队列DelayedWorkQueue,保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取。

  1. DelayedWorkQueue的数据结构是基于堆树实现的,内部维护了一个小顶堆
  2. DelayedWorkQueue采用数组实现堆,根节点出队,用最后叶子节点替换,然后下推至满足堆成立条件;最后叶子节点入队,然后向上推至满足堆成立条件;
  3. DelayedWorkQueue添加元素满了之后会自动扩容原来容量的1/2,即永远不会阻塞,最大扩容可达Integer.MAX_VALUE,所以线程池中至多有corePoolSize个工作线程正在运行;
  4. DelayedWorkQueue 消费元素take,在堆顶元素为空和delay >0 时,阻塞等待;
  5. DelayedWorkQueue 是一个生产永远不会阻塞,消费可以阻塞的生产者消费者模式;
  6. DelayedWorkQueue 有一个leader线程的变量,是Leader-Follower模式的变种。当一个take线程变成leader线程时,只需要等待下一次的延迟时间,而不是leader线程的其他take线程则需要等leader线程出队列了才唤醒其他take线程。