前言
ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口。是用于执行延时任务和周期任务的线程池。
ScheduledThreadPoolExecutor 的延时执行依赖于 DelayedWorkQueue 和 ScheduledFutureTask。
ScheduledExecutorService
这里介绍一下 ScheduledExecutorService 接口。
ScheduledExecutorService 继承于 ExecutorService 接口,并为调度任务额外提供了两种模式。
延时执行
根据参数中的设定的延时,执行一次任务。
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
参数 | 描述 |
---|---|
command | 执行任务 |
delay | 延时多少时间(以现在开始计算)执行 |
unit | 延迟时间时间参数 |
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
参数 | 描述 |
---|---|
callable | 执行任务,明确了返回类型 |
delay | 延时多少时间(以现在开始计算)执行 |
unit | 延迟时间时间参数 |
周期执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
假设第 n 次任务的开始时间是 t,运行时间是 p,设置的间隔周期为 T,则第 n+1 次任务的开始时间是 max(t+p, t+T)。
也就是说,如果任务执行足够快,则任务之间的间隔就是配置的周期 T,如果任务执行比较慢,耗时超过 T,则在任务结束后立即开始下一次的任务。所以不会有同时并发执行提交的周期任务的情况。
第一次执行时间会在 initialDelay 后,后面的执行时间就在任务执行后+period 。
参数 | 描述 |
---|---|
command | 执行任务 |
initialDelay | 第一次任务需要延时多少时间才执行 |
period | 执行周期时间 |
unit | 延迟时间时间参数 |
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
第一次任务执行后,会等待 delay 时间执行下一个任务。任务始终在上一次任务执行完成后再延时 delay 时间去执行。
参数 | 描述 |
---|---|
command | 执行任务 |
initialDelay | 第一次任务需要延时多少时间才执行 |
delay | 任务之间需要间隔多久执行 |
unit | 延迟时间时间参数 |
核心内部
ScheduledFutureTask
ScheduledFutureTask 用于封装定时任务和获取任务结果。
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
基本数据
/** 任务排序序列号 */
private final long sequenceNumber;
/** 任务可执行的时间,单位为纳秒 */
private volatile long time;
/**
* 周期任务执行的周期时间,单位为纳秒
* 正数代表 fixed-rate 模式.
* 负数代表 fixed-delay 模式.
* 0 代表非周期任务.
*/
private final long period;
/** ScheduledThreadPoolExecutor#decorateTask 允许我们包装一下 Executor 构造的 RunnableScheduledFuture (实现为 ScheduledFutureTask ) 并重新返回一个 RunnableScheduledFuture 给 Executor。
* 所以 ScheduledFutureTask.outerTask 实际上就是 decorateTask 方法包装出来的结果。decorateTask 默认返回的就是参数中的 RunnableScheduledFuture,也就是不进行包装,这种情况下 outerTask 就是 ScheduledFutureTask 自身了。
*/
RunnableScheduledFuture<V> outerTask = this;
/**
* 延迟队列的索引,用于支持快速取消
*/
int heapIndex;
ScheduledFutureTask 是 ScheduleThreadPoolExecutor 对内和对外的桥梁。对内,它的形态是 Runnable 来执行任务,对外,它的形态是 Future。
它覆盖了 FutureTask 的 run 方法来实现对延时执行、周期执行的支持。
对于 ScheduledFutureTask#run
方法来说,它并不需要关心 run 的时候是否到了执行时间,因为这个职责会由 ScheduleThreadPoolExecutor 中的工作队列来完成,保证在任务可执行的时候被 Worker 线程从队列中获取除。
从继承关系中可以看到,其父接口有 Delayed 接口。 Delayed 接口继承了 Comparable 接口。这两个接口十分重要。 DelayedWorkQueue 根据 compareTo
方法给队列元素排序,执行时间早的元素放在队列的前面。
conpareTo 方法
// 升序排列
public int compareTo(Delayed other) {
if (other == this) // 同一个对象,返回 0
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
// 如果 other 不是 ScheduledFutureTask 类型
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
getDelay 方法
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), NANOSECONDS);
}
DelayedWorkQueue
用于存储延时任务的无界队列。内部用数组实现,初始容量是 16,容量不足时会扩容 50%。
这里用它来表示一个二叉堆,实现一个优先队列,并通过 Leader/Follower
模式避免线程不必要的等待。
DelayedWorkQueue 是 ScheduledThreadPoolExecutor 使用的工作队列,从队列中取出时,任务已经到了可以被执行的时间了。
它内部维护了一个最小堆,根据任务的执行开始时间来维护任务顺序。对于 ScheduledFutureTask 类型的元素,它额外维护了元素在队列中堆数组的索引,用于实现快速取消。
DelayedWorkQueue 用独占锁来实现管程,保证数据的线程安全性。
二叉堆(英语:binary heap)是一种特殊的堆,二叉堆是完全二叉树或者是近似完全二叉树。二叉堆满足堆特性:父节点的键值总是保持固定的序关系于任何一个子节点的键值,且每个节点的左子树和右子树都是一个二叉堆。 当父节点的键值总是大于或等于任何一个子节点的键值时为“最大堆”。当父节点的键值总是小于或等于任何一个子节点的键值时为“最小堆”。
入队(offer)方法
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
// 容量扩增50%。
grow();
size = i + 1;
// 第一个元素,其实这里也可以统一进行sift-up操作,没必要特判。
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 插入堆尾。
siftUp(i, e);
}
// 如果新加入的元素成为了堆顶,则原先的leader就无效了。
if (queue[0] == e) {
leader = null;
// 由于原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来取走堆顶任务。
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
出队
take 方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
/*
* 循环读取当前堆中最小也就执行开始时间最近的任务。
* 如果当前队列为空无任务,则在available条件上等待。
* 否则如果最近任务的delay<=0则返回这个任务以执行,否则的话根据是否可以作为leader分类:
* 如果可以作为leader,则根据delay进行有时限等待。
* 否则无限等待直至被leader唤醒。
*/
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果当前队列无元素,则在available条件上无限等待直至有任务通过offer入队并唤醒。
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 如果delay小于0说明任务该立刻执行了。
if (delay <= 0)
// 从堆中移除元素并返回结果。
return finishPoll(first);
/*
* 在接下来等待的过程中,first应该清为null。
* 因为下一轮重新拿到的最近需要执行的任务很可能已经不是这里的first了。
* 所以对于接下来的逻辑来说first已经没有任何用处了,不该持有引用。
*/
first = null;
// 如果目前有leader的话,当前线程作为follower在available条件上无限等待直至唤醒。
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
/*
* 如果从available条件中被唤醒当前线程仍然是leader,则清空leader。
*
* 分析一下这里不等的情况:
* 1. 原先 thisThread == leader, 然后堆顶更新了,leader 为 null。
* 2. 堆顶更新,offer方法释放锁后,有其它线程通过 take/poll 拿到锁,读到 leader == null(offer 方法触发此行为),然后在此将自身更新为 leader。
*
* 对于这两种情况统一的处理逻辑就是只要 leader 为 thisThread,则将 leader 为 null 用以接下来判断是否需要唤醒后继线程。
*/
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
/*
* 如果当前堆中无元素(根据堆顶判断)则直接释放锁。
*
*
* 否则如果leader有值,说明当前线程一定不是leader,当前线程不用去唤醒后续等待线程。
* 否则由当前线程来唤醒后继等待线程。不过这并不代表当前线程原来是leader。
*/
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
执行方法
ScheduledThreadPoolExecutor 任务提交的入口方法主要是 execute, schedule, scheduleAtFixedRate 以及 scheduleWithFixedDelay 这几类。
execute,schedule 方法
/**
* 覆盖了父类execute的实现,以零延时任务的形式实现。
*/
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 包装ScheduledFutureTask。
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// fixed-rate模式period为正数。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 包装ScheduledFutureTask,默认返回本身。
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 将构造出的ScheduledFutureTask的outerTask设置为经过包装的结果。
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// fixed-delay模式delay为正数。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
// 包装ScheduledFutureTask,默认返回本身。
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 将构造出的ScheduledFutureTask的outerTask设置为经过包装的结果。
sft.outerTask = t;
delayedExecute(t);
return t;
}
delayExecute 方法
// ScheduledThreadPoolExecutor
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 非RUNNING态,根据饱和策略处理任务。
if (isShutdown())
reject(task);
else {
// 往work queue中插入任务。
super.getQueue().add(task);
/*
* 检查任务是否可以被执行。
* 如果任务不应该被执行,并且从队列中成功移除的话(说明没被worker拿取执行),则调用cancel取消任务。
*/
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
// 参数中false表示不试图中断执行任务的线程。
task.cancel(false);
else
ensurePrestart();
}
}
/**
* 这是父类 ThreadPoolExecutor 的方法用于确保有worker线程来执行任务。
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// worker数目小于corePoolSize,则添加一个worker。
if (wc < corePoolSize)
addWorker(null, true);
// wc==orePoolSize==0的情况也添加一个worker。
else if (wc == 0)
addWorker(null, false);
}
run 方法
//ScheduledThreadPoolExecutor#ScheduledFutureTask#run
public void run() {
// 是否周期性,就是判断period是否为0。
boolean periodic = isPeriodic();
// 检查任务是否可以被执行。
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果非周期性任务直接调用run运行即可。
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
// 需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及。
reExecutePeriodic(outerTask);
}
}
canRunInCurrentRunState 方法
JDK8 和 JDK11 的源码略有不同,这是 JDK8 的源码。
// ScheduledThreadPoolExecutor
boolean canRunInCurrentRunState(boolean periodic) {
/*
* isRunningOrShutdown 的参数为布尔值,true 则表示 shutdown 状态也返回 true ,否则只有 running 状态返回 ture。
* 如果为周期性任务则根据 continueExistingPeriodicTasksAfterShutdown 来判断是否 shutdown了仍然可以执行。
* 否则根据 executeExistingDelayedTasksAfterShutdown 来判断是否 shutdown 了仍然可以执行。
*/
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
runAndReset 方法
FutureTask#runAndReset
方法保证了在任务正常执行完成后返回 true,此时 FutureTask 的状态 state 保持为 NEW,由于没有调用 set 方法,也就是没有调用 finishCompletion 方法,它内部持有的 Callable 任务引用不会置为 null,等待获取结果的线程集合也不会解除阻塞。
这种设计方案专门针对可以周期性重复执行的任务。异常执行情况和取消的情况导致的最终结果和 run 方法是一致的。
// FutureTask
// 执行任务并且重置状态
// 由于没有执行set()方法设置执行结果,这个方法除了执行过程中抛出异常或者主动取消会到导致state由NEW更变为其他值,正常执行完毕一个任务之后,state是保持为NEW不变
protected boolean runAndReset() {
// 如果状态不为NEW(0)或者CAS(null,当前线程实例)更新runner-真正的执行Callable对象的线程实例失败,那么直接返回false,不执行任务
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 这里会忽略执行结果,只记录是否正常执行
c.call();
ran = true;
} catch (Throwable ex) {
// 记录执行异常结果
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 正常情况下的执行完毕,ran会更新为true,state此时也保持为NEW,这个时候方法返回true
return ran && s == NEW;
}
setNextRunTime 方法
setNextRunTime 方法是为了确认下一次任务执行的时间。
private void setNextRunTime() {
long p = period;
/*
* fixed-rate模式,时间设置为上一次时间+p。
* 提一句,这里的时间其实只是可以被执行的最小时间,不代表到点就要执行。
* 如果这次任务还没执行完是肯定不会执行下一次的。
*/
if (p > 0)
time += p;
/**
* fixed-delay模式,计算下一次任务可以被执行的时间。
* 简单来说差不多就是当前时间+delay值。因为代码走到这里任务就已经结束了,now()可以认为就是任务结束时间。
*/
else
time = triggerTime(-p);
}
long triggerTime(long delay) {
/*
* 如果delay < Long.Max_VALUE/2,则下次执行时间为当前时间+delay。
*
* 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay(如果有必要的话)。
*/
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
/**
* 主要就是有这么一种情况:
* 某个任务的delay为负数,说明当前可以执行(其实早该执行了)。
* 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高。
*
* 那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果。
*
* 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。
* 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。
* 不然就把当前delay值给调整为Long.MAX_VALUE + 队首delay。
*/
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
reExecutePeriodic 方法
// ScheduledThreadPoolExecutor
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
// 塞到工作队列中。
super.getQueue().add(task);
// 再次检查是否可以执行,如果不能执行且任务还在队列中未被取走则取消任务。
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
cancel 方法
public boolean cancel(boolean mayInterruptIfRunning) {
// 先调用父类FutureTask#cancel来取消任务。
boolean cancelled = super.cancel(mayInterruptIfRunning);
/*
* removeOnCancel开关用于控制任务取消后是否应该从队列中移除。
*
* 如果已经成功取消,并且removeOnCancel开关打开,并且 heapIndex >= 0 (说明仍然在队列中), 则从队列中删除该任务。
*/
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
remove 方法
ScheduledThreadPoolExecutor 支持任务取消的时候快速从队列中移除,因为大部分情况下队列中的元素是 ScheduledFutureTask 类型,内部维护了 heapIndex,即堆数组的索引。
如果维护了 heapIndex,查找元素的时间复杂度为 O(1)。
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
setIndex(queue[i], -1);
/*
* 堆的删除某个元素操作就是将最后一个元素移到那个元素。
* 这时候有可能需要向上调整堆,也可能需要向下维护。
*
* 对于小根堆而言,如果移过去后比父元素小,则需要向上维护堆结构,
* 否则将左右两个子节点中较小值与当前元素比较,如果当前元素较大,则需要向下维护堆结构。
*/
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
// 如果参数x就是堆数组中最后一个元素则删除操作已经完毕了。
if (s != i) {
// 尝试向下维护堆。
siftDown(i, replacement);
// 相等说明replacement比子节点都要小,尝试向上维护堆。
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
private int indexOf(Object x) {
if (x != null) {
if (x instanceof ScheduledFutureTask) {
int i = ((ScheduledFutureTask) x).heapIndex;
// 再次判断i确实是本线程池的,因为remove方法的参数x完全可以是个其它池子里拿到的ScheduledFutureTask。
if (i >= 0 && i < size && queue[i] == x)
return i;
} else {
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}
onShutdown 方法
onShutdown 方法是 ThreadPoolExecutor 的一个钩子方法,在 shutdown 方法中调用,默认实现为空。
ScheduledThreadPoolExecutor 覆盖了此方法,用于删除并取消工作队列中的不需要执行的任务。
// ScheduledThreadPoolExecutor
@Override
void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
// shutdown是否仍然执行延时任务。
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
// shutdown是否仍然执行周期任务。
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
// 如果两者皆不可则对队列中所有RunnableScheduledFuture调用cancel取消并清空队列。
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
/*
* 不需要执行的任务删除并取消。
* 已经取消的任务也需要从队列中删除。
* 所以这里就判断下是否需要执行或者任务是否已经取消。
*/
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) {
if (q.remove(t))
t.cancel(false);
}
}
}
}
// 因为任务被从队列中清理掉,所以这里需要调用tryTerminate尝试跃迁executor的状态。
tryTerminate();
}