前言

ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口。是用于执行延时任务和周期任务的线程池。
ScheduledThreadPoolExecutor 的延时执行依赖于 DelayedWorkQueue 和 ScheduledFutureTask。
ScheduledThreadPoolExecutor 源码解析 - 图1

ScheduledExecutorService

这里介绍一下 ScheduledExecutorService 接口。
ScheduledExecutorService 继承于 ExecutorService 接口,并为调度任务额外提供了两种模式。

延时执行

根据参数中的设定的延时,执行一次任务。

  1. public ScheduledFuture<?> schedule(Runnable command,
  2. long delay, TimeUnit unit);
参数 描述
command 执行任务
delay 延时多少时间(以现在开始计算)执行
unit 延迟时间时间参数
  1. public <V> ScheduledFuture<V> schedule(Callable<V> callable,
  2. long delay, TimeUnit unit);
参数 描述
callable 执行任务,明确了返回类型
delay 延时多少时间(以现在开始计算)执行
unit 延迟时间时间参数

周期执行

  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  2. long initialDelay,
  3. long period,
  4. TimeUnit unit);

假设第 n 次任务的开始时间是 t,运行时间是 p,设置的间隔周期为 T,则第 n+1 次任务的开始时间是 max(t+p, t+T)。
也就是说,如果任务执行足够快,则任务之间的间隔就是配置的周期 T,如果任务执行比较慢,耗时超过 T,则在任务结束后立即开始下一次的任务。所以不会有同时并发执行提交的周期任务的情况。
第一次执行时间会在 initialDelay 后,后面的执行时间就在任务执行后+period 。

参数 描述
command 执行任务
initialDelay 第一次任务需要延时多少时间才执行
period 执行周期时间
unit 延迟时间时间参数
  1. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  2. long initialDelay,
  3. long delay,
  4. TimeUnit unit);

第一次任务执行后,会等待 delay 时间执行下一个任务。任务始终在上一次任务执行完成后再延时 delay 时间去执行。

参数 描述
command 执行任务
initialDelay 第一次任务需要延时多少时间才执行
delay 任务之间需要间隔多久执行
unit 延迟时间时间参数

核心内部

ScheduledFutureTask

ScheduledFutureTask 用于封装定时任务和获取任务结果。

  1. ScheduledFutureTask(Runnable r, V result, long ns, long period) {
  2. super(r, result);
  3. this.time = ns;
  4. this.period = period;
  5. this.sequenceNumber = sequencer.getAndIncrement();
  6. }

基本数据
  1. /** 任务排序序列号 */
  2. private final long sequenceNumber;
  3. /** 任务可执行的时间,单位为纳秒 */
  4. private volatile long time;
  5. /**
  6. * 周期任务执行的周期时间,单位为纳秒
  7. * 正数代表 fixed-rate 模式.
  8. * 负数代表 fixed-delay 模式.
  9. * 0 代表非周期任务.
  10. */
  11. private final long period;
  12. /** ScheduledThreadPoolExecutor#decorateTask 允许我们包装一下 Executor 构造的 RunnableScheduledFuture (实现为 ScheduledFutureTask ) 并重新返回一个 RunnableScheduledFuture 给 Executor。
  13. * 所以 ScheduledFutureTask.outerTask 实际上就是 decorateTask 方法包装出来的结果。decorateTask 默认返回的就是参数中的 RunnableScheduledFuture,也就是不进行包装,这种情况下 outerTask 就是 ScheduledFutureTask 自身了。
  14. */
  15. RunnableScheduledFuture<V> outerTask = this;
  16. /**
  17. * 延迟队列的索引,用于支持快速取消
  18. */
  19. int heapIndex;

ScheduledThreadPoolExecutor 源码解析 - 图2
ScheduledFutureTask 是 ScheduleThreadPoolExecutor 对内和对外的桥梁。对内,它的形态是 Runnable 来执行任务,对外,它的形态是 Future。
它覆盖了 FutureTask 的 run 方法来实现对延时执行、周期执行的支持。
对于 ScheduledFutureTask#run 方法来说,它并不需要关心 run 的时候是否到了执行时间,因为这个职责会由 ScheduleThreadPoolExecutor 中的工作队列来完成,保证在任务可执行的时候被 Worker 线程从队列中获取除。
从继承关系中可以看到,其父接口有 Delayed 接口。 Delayed 接口继承了 Comparable 接口。这两个接口十分重要。 DelayedWorkQueue 根据 compareTo 方法给队列元素排序,执行时间早的元素放在队列的前面。

conpareTo 方法

  1. // 升序排列
  2. public int compareTo(Delayed other) {
  3. if (other == this) // 同一个对象,返回 0
  4. return 0;
  5. if (other instanceof ScheduledFutureTask) {
  6. ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
  7. long diff = time - x.time;
  8. if (diff < 0)
  9. return -1;
  10. else if (diff > 0)
  11. return 1;
  12. else if (sequenceNumber < x.sequenceNumber)
  13. return -1;
  14. else
  15. return 1;
  16. }
  17. // 如果 other 不是 ScheduledFutureTask 类型
  18. long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
  19. return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
  20. }

getDelay 方法

  1. public long getDelay(TimeUnit unit) {
  2. return unit.convert(time - System.nanoTime(), NANOSECONDS);
  3. }

DelayedWorkQueue

用于存储延时任务的无界队列。内部用数组实现,初始容量是 16,容量不足时会扩容 50%。
这里用它来表示一个二叉堆,实现一个优先队列,并通过 Leader/Follower 模式避免线程不必要的等待。
DelayedWorkQueue 是 ScheduledThreadPoolExecutor 使用的工作队列,从队列中取出时,任务已经到了可以被执行的时间了。
它内部维护了一个最小堆,根据任务的执行开始时间来维护任务顺序。对于 ScheduledFutureTask 类型的元素,它额外维护了元素在队列中堆数组的索引,用于实现快速取消。
DelayedWorkQueue 用独占锁来实现管程,保证数据的线程安全性。

二叉堆(英语:binary heap)是一种特殊的堆,二叉堆是完全二叉树或者是近似完全二叉树。二叉堆满足堆特性:父节点的键值总是保持固定的序关系于任何一个子节点的键值,且每个节点的左子树和右子树都是一个二叉堆。 当父节点的键值总是大于或等于任何一个子节点的键值时为“最大堆”。当父节点的键值总是小于或等于任何一个子节点的键值时为“最小堆”。

入队(offer)方法

  1. public boolean offer(Runnable x) {
  2. if (x == null)
  3. throw new NullPointerException();
  4. RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
  5. final ReentrantLock lock = this.lock;
  6. lock.lock();
  7. try {
  8. int i = size;
  9. if (i >= queue.length)
  10. // 容量扩增50%。
  11. grow();
  12. size = i + 1;
  13. // 第一个元素,其实这里也可以统一进行sift-up操作,没必要特判。
  14. if (i == 0) {
  15. queue[0] = e;
  16. setIndex(e, 0);
  17. } else {
  18. // 插入堆尾。
  19. siftUp(i, e);
  20. }
  21. // 如果新加入的元素成为了堆顶,则原先的leader就无效了。
  22. if (queue[0] == e) {
  23. leader = null;
  24. // 由于原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来取走堆顶任务。
  25. available.signal();
  26. }
  27. } finally {
  28. lock.unlock();
  29. }
  30. return true;
  31. }

出队

take 方法

  1. public RunnableScheduledFuture<?> take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. /*
  6. * 循环读取当前堆中最小也就执行开始时间最近的任务。
  7. * 如果当前队列为空无任务,则在available条件上等待。
  8. * 否则如果最近任务的delay<=0则返回这个任务以执行,否则的话根据是否可以作为leader分类:
  9. * 如果可以作为leader,则根据delay进行有时限等待。
  10. * 否则无限等待直至被leader唤醒。
  11. */
  12. for (;;) {
  13. RunnableScheduledFuture<?> first = queue[0];
  14. // 如果当前队列无元素,则在available条件上无限等待直至有任务通过offer入队并唤醒。
  15. if (first == null)
  16. available.await();
  17. else {
  18. long delay = first.getDelay(NANOSECONDS);
  19. // 如果delay小于0说明任务该立刻执行了。
  20. if (delay <= 0)
  21. // 从堆中移除元素并返回结果。
  22. return finishPoll(first);
  23. /*
  24. * 在接下来等待的过程中,first应该清为null。
  25. * 因为下一轮重新拿到的最近需要执行的任务很可能已经不是这里的first了。
  26. * 所以对于接下来的逻辑来说first已经没有任何用处了,不该持有引用。
  27. */
  28. first = null;
  29. // 如果目前有leader的话,当前线程作为follower在available条件上无限等待直至唤醒。
  30. if (leader != null)
  31. available.await();
  32. else {
  33. Thread thisThread = Thread.currentThread();
  34. leader = thisThread;
  35. try {
  36. available.awaitNanos(delay);
  37. } finally {
  38. /*
  39. * 如果从available条件中被唤醒当前线程仍然是leader,则清空leader。
  40. *
  41. * 分析一下这里不等的情况:
  42. * 1. 原先 thisThread == leader, 然后堆顶更新了,leader 为 null。
  43. * 2. 堆顶更新,offer方法释放锁后,有其它线程通过 take/poll 拿到锁,读到 leader == null(offer 方法触发此行为),然后在此将自身更新为 leader。
  44. *
  45. * 对于这两种情况统一的处理逻辑就是只要 leader 为 thisThread,则将 leader 为 null 用以接下来判断是否需要唤醒后继线程。
  46. */
  47. if (leader == thisThread)
  48. leader = null;
  49. }
  50. }
  51. }
  52. }
  53. } finally {
  54. /*
  55. * 如果当前堆中无元素(根据堆顶判断)则直接释放锁。
  56. *
  57. *
  58. * 否则如果leader有值,说明当前线程一定不是leader,当前线程不用去唤醒后续等待线程。
  59. * 否则由当前线程来唤醒后继等待线程。不过这并不代表当前线程原来是leader。
  60. */
  61. if (leader == null && queue[0] != null)
  62. available.signal();
  63. lock.unlock();
  64. }
  65. }

执行方法

ScheduledThreadPoolExecutor 任务提交的入口方法主要是 execute, schedule, scheduleAtFixedRate 以及 scheduleWithFixedDelay 这几类。

execute,schedule 方法

  1. /**
  2. * 覆盖了父类execute的实现,以零延时任务的形式实现。
  3. */
  4. public void execute(Runnable command) {
  5. schedule(command, 0, NANOSECONDS);
  6. }
  7. public ScheduledFuture<?> schedule(Runnable command,
  8. long delay,
  9. TimeUnit unit) {
  10. if (command == null || unit == null)
  11. throw new NullPointerException();
  12. // 包装ScheduledFutureTask。
  13. RunnableScheduledFuture<?> t = decorateTask(command,
  14. new ScheduledFutureTask<Void>(command, null,
  15. triggerTime(delay, unit)));
  16. delayedExecute(t);
  17. return t;
  18. }
  19. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  20. long initialDelay,
  21. long period,
  22. TimeUnit unit) {
  23. if (command == null || unit == null)
  24. throw new NullPointerException();
  25. if (period <= 0)
  26. throw new IllegalArgumentException();
  27. // fixed-rate模式period为正数。
  28. ScheduledFutureTask<Void> sft =
  29. new ScheduledFutureTask<Void>(command,
  30. null,
  31. triggerTime(initialDelay, unit),
  32. unit.toNanos(period));
  33. // 包装ScheduledFutureTask,默认返回本身。
  34. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  35. // 将构造出的ScheduledFutureTask的outerTask设置为经过包装的结果。
  36. sft.outerTask = t;
  37. delayedExecute(t);
  38. return t;
  39. }
  40. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  41. long initialDelay,
  42. long delay,
  43. TimeUnit unit) {
  44. if (command == null || unit == null)
  45. throw new NullPointerException();
  46. if (delay <= 0)
  47. throw new IllegalArgumentException();
  48. // fixed-delay模式delay为正数。
  49. ScheduledFutureTask<Void> sft =
  50. new ScheduledFutureTask<Void>(command,
  51. null,
  52. triggerTime(initialDelay, unit),
  53. unit.toNanos(-delay));
  54. // 包装ScheduledFutureTask,默认返回本身。
  55. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  56. // 将构造出的ScheduledFutureTask的outerTask设置为经过包装的结果。
  57. sft.outerTask = t;
  58. delayedExecute(t);
  59. return t;
  60. }

delayExecute 方法

  1. // ScheduledThreadPoolExecutor
  2. private void delayedExecute(RunnableScheduledFuture<?> task) {
  3. // 非RUNNING态,根据饱和策略处理任务。
  4. if (isShutdown())
  5. reject(task);
  6. else {
  7. // 往work queue中插入任务。
  8. super.getQueue().add(task);
  9. /*
  10. * 检查任务是否可以被执行。
  11. * 如果任务不应该被执行,并且从队列中成功移除的话(说明没被worker拿取执行),则调用cancel取消任务。
  12. */
  13. if (isShutdown() &&
  14. !canRunInCurrentRunState(task.isPeriodic()) &&
  15. remove(task))
  16. // 参数中false表示不试图中断执行任务的线程。
  17. task.cancel(false);
  18. else
  19. ensurePrestart();
  20. }
  21. }
  22. /**
  23. * 这是父类 ThreadPoolExecutor 的方法用于确保有worker线程来执行任务。
  24. */
  25. void ensurePrestart() {
  26. int wc = workerCountOf(ctl.get());
  27. // worker数目小于corePoolSize,则添加一个worker。
  28. if (wc < corePoolSize)
  29. addWorker(null, true);
  30. // wc==orePoolSize==0的情况也添加一个worker。
  31. else if (wc == 0)
  32. addWorker(null, false);
  33. }

ScheduledThreadPoolExecutor 源码解析 - 图3

run 方法

  1. //ScheduledThreadPoolExecutor#ScheduledFutureTask#run
  2. public void run() {
  3. // 是否周期性,就是判断period是否为0。
  4. boolean periodic = isPeriodic();
  5. // 检查任务是否可以被执行。
  6. if (!canRunInCurrentRunState(periodic))
  7. cancel(false);
  8. // 如果非周期性任务直接调用run运行即可。
  9. else if (!periodic)
  10. ScheduledFutureTask.super.run();
  11. // 如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。
  12. else if (ScheduledFutureTask.super.runAndReset()) {
  13. setNextRunTime();
  14. // 需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及。
  15. reExecutePeriodic(outerTask);
  16. }
  17. }

ScheduledThreadPoolExecutor 源码解析 - 图4

canRunInCurrentRunState 方法

JDK8 和 JDK11 的源码略有不同,这是 JDK8 的源码。

  1. // ScheduledThreadPoolExecutor
  2. boolean canRunInCurrentRunState(boolean periodic) {
  3. /*
  4. * isRunningOrShutdown 的参数为布尔值,true 则表示 shutdown 状态也返回 true ,否则只有 running 状态返回 ture。
  5. * 如果为周期性任务则根据 continueExistingPeriodicTasksAfterShutdown 来判断是否 shutdown了仍然可以执行。
  6. * 否则根据 executeExistingDelayedTasksAfterShutdown 来判断是否 shutdown 了仍然可以执行。
  7. */
  8. return isRunningOrShutdown(periodic ?
  9. continueExistingPeriodicTasksAfterShutdown :
  10. executeExistingDelayedTasksAfterShutdown);
  11. }

runAndReset 方法

FutureTask#runAndReset 方法保证了在任务正常执行完成后返回 true,此时 FutureTask 的状态 state 保持为 NEW,由于没有调用 set 方法,也就是没有调用 finishCompletion 方法,它内部持有的 Callable 任务引用不会置为 null,等待获取结果的线程集合也不会解除阻塞。
这种设计方案专门针对可以周期性重复执行的任务。异常执行情况和取消的情况导致的最终结果和 run 方法是一致的。

  1. // FutureTask
  2. // 执行任务并且重置状态
  3. // 由于没有执行set()方法设置执行结果,这个方法除了执行过程中抛出异常或者主动取消会到导致state由NEW更变为其他值,正常执行完毕一个任务之后,state是保持为NEW不变
  4. protected boolean runAndReset() {
  5. // 如果状态不为NEW(0)或者CAS(null,当前线程实例)更新runner-真正的执行Callable对象的线程实例失败,那么直接返回false,不执行任务
  6. if (state != NEW ||
  7. !RUNNER.compareAndSet(this, null, Thread.currentThread()))
  8. return false;
  9. boolean ran = false;
  10. int s = state;
  11. try {
  12. Callable<V> c = callable;
  13. if (c != null && s == NEW) {
  14. try {
  15. // 这里会忽略执行结果,只记录是否正常执行
  16. c.call();
  17. ran = true;
  18. } catch (Throwable ex) {
  19. // 记录执行异常结果
  20. setException(ex);
  21. }
  22. }
  23. } finally {
  24. runner = null;
  25. s = state;
  26. if (s >= INTERRUPTING)
  27. handlePossibleCancellationInterrupt(s);
  28. }
  29. // 正常情况下的执行完毕,ran会更新为true,state此时也保持为NEW,这个时候方法返回true
  30. return ran && s == NEW;
  31. }

setNextRunTime 方法

setNextRunTime 方法是为了确认下一次任务执行的时间。

  1. private void setNextRunTime() {
  2. long p = period;
  3. /*
  4. * fixed-rate模式,时间设置为上一次时间+p。
  5. * 提一句,这里的时间其实只是可以被执行的最小时间,不代表到点就要执行。
  6. * 如果这次任务还没执行完是肯定不会执行下一次的。
  7. */
  8. if (p > 0)
  9. time += p;
  10. /**
  11. * fixed-delay模式,计算下一次任务可以被执行的时间。
  12. * 简单来说差不多就是当前时间+delay值。因为代码走到这里任务就已经结束了,now()可以认为就是任务结束时间。
  13. */
  14. else
  15. time = triggerTime(-p);
  16. }
  17. long triggerTime(long delay) {
  18. /*
  19. * 如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。
  20. *
  21. * 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。
  22. */
  23. return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
  24. }
  25. /**
  26. * 主要就是有这么一种情况:
  27. * 某个任务的delay为负数,说明当前可以执行(其实早该执行了)。
  28. * 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高。
  29. *
  30. * 那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果。
  31. *
  32. * 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。
  33. * 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。
  34. * 不然就把当前delay值给调整为Long.MAX_VALUE + 队首delay。
  35. */
  36. private long overflowFree(long delay) {
  37. Delayed head = (Delayed) super.getQueue().peek();
  38. if (head != null) {
  39. long headDelay = head.getDelay(NANOSECONDS);
  40. if (headDelay < 0 && (delay - headDelay < 0))
  41. delay = Long.MAX_VALUE + headDelay;
  42. }
  43. return delay;
  44. }

reExecutePeriodic 方法

  1. // ScheduledThreadPoolExecutor
  2. void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  3. if (canRunInCurrentRunState(true)) {
  4. // 塞到工作队列中。
  5. super.getQueue().add(task);
  6. // 再次检查是否可以执行,如果不能执行且任务还在队列中未被取走则取消任务。
  7. if (!canRunInCurrentRunState(true) && remove(task))
  8. task.cancel(false);
  9. else
  10. ensurePrestart();
  11. }
  12. }

cancel 方法

  1. public boolean cancel(boolean mayInterruptIfRunning) {
  2. // 先调用父类FutureTask#cancel来取消任务。
  3. boolean cancelled = super.cancel(mayInterruptIfRunning);
  4. /*
  5. * removeOnCancel开关用于控制任务取消后是否应该从队列中移除。
  6. *
  7. * 如果已经成功取消,并且removeOnCancel开关打开,并且 heapIndex >= 0 (说明仍然在队列中), 则从队列中删除该任务。
  8. */
  9. if (cancelled && removeOnCancel && heapIndex >= 0)
  10. remove(this);
  11. return cancelled;
  12. }

remove 方法

ScheduledThreadPoolExecutor 支持任务取消的时候快速从队列中移除,因为大部分情况下队列中的元素是 ScheduledFutureTask 类型,内部维护了 heapIndex,即堆数组的索引。
如果维护了 heapIndex,查找元素的时间复杂度为 O(1)。

  1. public boolean remove(Object x) {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. int i = indexOf(x);
  6. if (i < 0)
  7. return false;
  8. setIndex(queue[i], -1);
  9. /*
  10. * 堆的删除某个元素操作就是将最后一个元素移到那个元素。
  11. * 这时候有可能需要向上调整堆,也可能需要向下维护。
  12. *
  13. * 对于小根堆而言,如果移过去后比父元素小,则需要向上维护堆结构,
  14. * 否则将左右两个子节点中较小值与当前元素比较,如果当前元素较大,则需要向下维护堆结构。
  15. */
  16. int s = --size;
  17. RunnableScheduledFuture<?> replacement = queue[s];
  18. queue[s] = null;
  19. // 如果参数x就是堆数组中最后一个元素则删除操作已经完毕了。
  20. if (s != i) {
  21. // 尝试向下维护堆。
  22. siftDown(i, replacement);
  23. // 相等说明replacement比子节点都要小,尝试向上维护堆。
  24. if (queue[i] == replacement)
  25. siftUp(i, replacement);
  26. }
  27. return true;
  28. } finally {
  29. lock.unlock();
  30. }
  31. }
  32. private int indexOf(Object x) {
  33. if (x != null) {
  34. if (x instanceof ScheduledFutureTask) {
  35. int i = ((ScheduledFutureTask) x).heapIndex;
  36. // 再次判断i确实是本线程池的,因为remove方法的参数x完全可以是个其它池子里拿到的ScheduledFutureTask。
  37. if (i >= 0 && i < size && queue[i] == x)
  38. return i;
  39. } else {
  40. for (int i = 0; i < size; i++)
  41. if (x.equals(queue[i]))
  42. return i;
  43. }
  44. }
  45. return -1;
  46. }

onShutdown 方法

onShutdown 方法是 ThreadPoolExecutor 的一个钩子方法,在 shutdown 方法中调用,默认实现为空。
ScheduledThreadPoolExecutor 覆盖了此方法,用于删除并取消工作队列中的不需要执行的任务。

  1. // ScheduledThreadPoolExecutor
  2. @Override
  3. void onShutdown() {
  4. BlockingQueue<Runnable> q = super.getQueue();
  5. // shutdown是否仍然执行延时任务。
  6. boolean keepDelayed =
  7. getExecuteExistingDelayedTasksAfterShutdownPolicy();
  8. // shutdown是否仍然执行周期任务。
  9. boolean keepPeriodic =
  10. getContinueExistingPeriodicTasksAfterShutdownPolicy();
  11. // 如果两者皆不可则对队列中所有RunnableScheduledFuture调用cancel取消并清空队列。
  12. if (!keepDelayed && !keepPeriodic) {
  13. for (Object e : q.toArray())
  14. if (e instanceof RunnableScheduledFuture<?>)
  15. ((RunnableScheduledFuture<?>) e).cancel(false);
  16. q.clear();
  17. }
  18. else {
  19. for (Object e : q.toArray()) {
  20. if (e instanceof RunnableScheduledFuture) {
  21. RunnableScheduledFuture<?> t =
  22. (RunnableScheduledFuture<?>)e;
  23. /*
  24. * 不需要执行的任务删除并取消。
  25. * 已经取消的任务也需要从队列中删除。
  26. * 所以这里就判断下是否需要执行或者任务是否已经取消。
  27. */
  28. if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
  29. t.isCancelled()) {
  30. if (q.remove(t))
  31. t.cancel(false);
  32. }
  33. }
  34. }
  35. }
  36. // 因为任务被从队列中清理掉,所以这里需要调用tryTerminate尝试跃迁executor的状态。
  37. tryTerminate();
  38. }