ScheduledExecutorService 继承图
ScheduledExecutorService的使用
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
// 单次执行
executor.schedule(()-> {
logger.info("delay 1000");
}, 2, TimeUnit.SECONDS);
// 循环执行
executor.scheduleAtFixedRate(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("scheduleAtFixedRate");
}, 1, 2, TimeUnit.SECONDS);
// 循环执行
executor.scheduleWithFixedDelay(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("scheduleWithFixedDelay");
}, 1, 2, TimeUnit.SECONDS);
}
scheduleAtFixedRate
与 scheduleWithFixedDelay
的区别
如上所示,假设一个task的执行周期过长(例3秒),scheduleAtFixedRate 会在(period)之后,将Task塞到等待队列中,而 scheduleWithFixedDelay 则是在执行完成之后,等待(delay)秒之后,在将Task塞到等待队列中
// scheduleAtFixedRate 的输出,从打印时间可以看出,
// 周期为3秒(即任务的执行task,因为period小于执行周期,已经塞到了队列中等待执行)
八月 31, 2020 2:20:38 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
信息: scheduleAtFixedRate
八月 31, 2020 2:20:41 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
信息: scheduleAtFixedRate
八月 31, 2020 2:20:44 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
信息: scheduleAtFixedRate
八月 31, 2020 2:20:47 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
信息: scheduleAtFixedRate
八月 31, 2020 2:20:50 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
信息: scheduleAtFixedRate
// scheduleWithFixedDelay的输出,从打印时间可以看出
// 周期为5秒(任务的执行周期3s+delay2s),即等任务执行完成之后,经过delay之后,再执行下一个task
八月 31, 2020 2:22:05 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
信息: scheduleWithFixedDelay
八月 31, 2020 2:22:10 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
信息: scheduleWithFixedDelay
八月 31, 2020 2:22:15 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
信息: scheduleWithFixedDelay
八月 31, 2020 2:22:20 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
信息: scheduleWithFixedDelay
八月 31, 2020 2:22:25 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
信息: scheduleWithFixedDelay
ScheduledExecutorService 源码分析
先看看 ScheduledFutureTask的组成
ScheduledExecutorService内部将Runnable封装成 ScheduledFutureTask
重要属性
// 任务的序列号
private final long sequenceNumber;
// 延时的时间
private long time;
// 任务的周期
private final long period;
// 执行的任务
RunnableScheduledFuture<V> outerTask = this;
// 堆顶的下标
int heapIndex;
ScheduledFutureTask 的排序规则
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
// 先比较任务的时间
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
// 任务的时间比较完成之后,再比较任务的序列号
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
从构造函数开始
调用父类的构造方法
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
因为 ScheduledThreadPoolExecutor
继承 ThreadPoolExecutor
即调用 ThreadPoolExecutor
的构造方法
schedule() 方法
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// 校验参数
if (command == null || unit == null)
throw new NullPointerException();
//1. 将 command(Runnable)封装成ScheduledFutureTask
//2. decorateTask() 直接返回第二个参数,没啥用
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//3. 延时执行
delayedExecute(t);
return t;
}
delayedExecute()
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 判断当前的线程池状态是否是shutdown的,如果是,采取拒绝策略,直接拒绝
if (isShutdown())
reject(task);
else {
// 否则,线程池状态良好,获取workQueue,将task加入到workQueue中
super.getQueue().add(task);
// 因为有上下文的切换,所以,这里再判断一次线程池的状态
// 如果线程池已经shutdown,或者,在当前的runState下不能执行入任务的话,就cancel取消人物
if (isShutdown() &&
// 因为是 schedule方法,period =0,所以 task.isPeriodic() = false
// 执行 isRunningOrShutdown(true),
// rs == RUNNING || (rs == SHUTDOWN && shutdownOK)
// 所以,在shutdown状态的话,schedule可以执行,此判断不成立,执行ensurePrestart
!canRunInCurrentRunState(task.isPeriodic()) &&
// 否则,其他状态下,将任务移除
remove(task))
// 并取消future
task.cancel(false);
else
ensurePrestart();
}
}
ensurePrestart()
void ensurePrestart() {
// 获取当前的workerCount
int wc = workerCountOf(ctl.get());
// 如果小于核心线程数,addWroker创建线程执行任务
if (wc < corePoolSize)
addWorker(null, true);
// 如果workerCount == 0,说明是第一次
else if (wc == 0)
addWorker(null, false);
}
添加完worker之后,会执行runWorker方法,循环从workQueue中取出task,执行这个task
scheduleAtFixedRate()
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
// 校验参数
if (command == null || unit == null)
throw new NullPointerException();
// 校验参数
if (period <= 0)
throw new IllegalArgumentException();
// 将runnable封装成 ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 直接返回 sft,啥都没干
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 将sft的 outerTask 设置为 t(自己设置自己)
sft.outerTask = t;
// 延时执行
delayedExecute(t);
return t;
}
delayedExecute()
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 判断当前的线程池状态是否是shutdown的,如果是,采取拒绝策略,直接拒绝
if (isShutdown())
reject(task);
else {
// 否则,线程池状态良好,获取workQueue,将task加入到workQueue中
super.getQueue().add(task);
// 因为有上下文的切换,所以,这里再判断一次线程池的状态
// 如果线程池已经shutdown,或者,在当前的runState下不能执行入任务的话,就cancel取消人物
if (isShutdown() &&
// 因为是 scheduleAtFixedRate,此时的period>0,所以 task.isPeriodic() == true
// 此时执行的就是isRunningOrShutdown(false)
// rs == RUNNING || (rs == SHUTDOWN && shutdownOK) 这个判断就再也不成立了
// 即返回false,因为在shutdown状态了,已经不能再提交新的task,这是个周期任务,已经
// 无法执行了,所以在scheduleAtFixedRate中,线程池shutdown下,将会remove这个任务
// 并cancel掉
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 否则,线程池状态良好,执行 ensurePrestart,创建worker,执行task
ensurePrestart();
}
}
scheduleWithFixedDelay()
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
// 校验参数
if (command == null || unit == null)
throw new NullPointerException();
// 校验参数
if (delay <= 0)
throw new IllegalArgumentException();
// 封装 ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
// 返回sft
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 将outerTask指向自己
sft.outerTask = t;
// 延时执行
delayedExecute(t);
return t;
}
delayedExecute() 和 scheduleAtFixedRate() 中的逻辑一样,只不过,period 变成了负数
添加完Worker之后,执行Task,执行的是 ScheduledFutureTask 这个Task中的 run() 方法
ScheduledFutureTask#run
public void run() {
// 获取当前的Task是否是循环的Task
// 在schedule中为false,其他均为true
boolean periodic = isPeriodic();
// 如果是 scheduleAtFixedRate/scheduleWithFixedDelay
// 且线程池 shutdown 的情况下,这个if成立,取消这个Future
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 否则的话,就不是周期性任务,调用父类的run方法,执行任务(schedule的逻辑)
else if (!periodic)
ScheduledFutureTask.super.run();
// 否则的话,就是周期性任务,调用父类的 runAndReset方法,执行任务
// (scheduleAtFixedRate/scheduleWithFixedDelay) 的逻辑
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
cancel(false)
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
setNextRunTime()
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
reExecutePeriodic(outerTask)
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 判断当前的线程池状态是否已经跑
// 因为跑到了这个函数,必然是一个周期性的任务,不存在不是周期性的任务
// canRunInCurrentRunState(true),在线程池shutdown的时候就是false,再也不能执行
// 正常情况下(running状态),则会将task重新加入workQueue中等下下一次的执行
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}