原理剖析
本节讲解三个重要函数。
● schedule(Runnable command, long delay, TimeUnit unit)
● scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
● scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
schedule(Runnable command, long delay, TimeUnit unit)方法
该方法的作用是提交一个延迟执行的任务,任务从提交时间算起延迟单位为 unit 的 delay 时间后开始执行。提交的任务不是周期性任务,任务只会执行一次,代码如下。
public ScheduledFuture<? > schedule(Runnable command,
long delay,
TimeUnit unit) {
//(1)参数校验
if (command == null || unit == null)
throw new NullPointerException();
//(2)任务转换
RunnableScheduledFuture<? > t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//(3)添加任务到延迟队列
delayedExecute(t);
return t;
}
I.如上代码(1)进行参数校验,如果 command 或者 unit 为 null,则抛出 NPE 异常。
II.代码(2)装饰任务,把提交的 command 任务转换为 ScheduledFutureTask。ScheduledFutureTask 是具体放入延迟队列里面的东西。由于是延迟任务,所以 ScheduledFutureTask 实现了 long getDelay(TimeUnit unit)和 int compareTo(Delayed other)方法。triggerTime 方法将延迟时间转换为绝对时间,也就是把当前时间的纳秒数加上延迟的纳秒数后的 long 型值。ScheduledFutureTask 的构造函数如下。
ScheduledFutureTask(Runnable r, V result, long ns) {
//调用父类 FutureTask 的构造函数
super(r, result);
this.time = ns;
this.period = 0; //period 为 0,说明为一次性任务
this.sequenceNumber = sequencer.getAndIncrement();
}
在构造函数内部首先调用了父类 FutureTask 的构造函数,父类 FutureTask 的构造函数代码如下。
//通过适配器把 runnable 转换为 callable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; //设置当前任务状态为 NEW
}
FutureTask 中的任务被转换为 Callable 类型后,被保存到了变量 this.callable 里面,并设置 FutureTask 的任务状态为 NEW。
然后在 ScheduledFutureTask 构造函数内部设置 time 为上面说的绝对时间。需要注意,这里 period 的值为 0,这说明当前任务为一次性的任务,不是定时反复执行任务。其中 long getDelay(TimeUnit unit)方法的代码如下(该方法用来计算当前任务还有多少时间就过期了)。
//元素过期算法,装饰后时间-当前时间,就是即将过期剩余时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
compareTo(Delayed other)方法的代码如下:
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<? > x = (ScheduledFutureTask<? >)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
compareTo 的作用是加入元素到延迟队列后,在内部建立或者调整堆时会使用该元素的 compareTo 方法与队列里面其他元素进行比较,让最快要过期的元素放到队首。所以无论什么时候向队列里面添加元素,队首的元素都是最快要过期的元素。
III.代码(3)将任务添加到延迟队列,delayedExecute 的代码如下。
private void delayedExecute(RunnableScheduledFuture<? > task) {
//(4)如果线程池关闭了,则执行线程池拒绝策略
if (isShutdown())
reject(task);
else {
//(5)添加任务到延迟队列
super.getQueue().add(task);
//(6)再次检查线程池状态
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//(7)确保至少一个线程在处理任务
ensurePrestart();
}
}
IV.代码(4)首先判断当前线程池是否已经关闭了,如果已经关闭则执行线程池的拒绝策略,否则执行代码(5)将任务添加到延迟队列。添加完毕后还要重新检查线程池是否被关闭了,如果已经关闭则从延迟队列里面删除刚才添加的任务,但是此时有可能线程池中的线程已经从任务队列里面移除了该任务,也就是该任务已经在执行了,所以还需要调用任务的 cancle 方法取消任务。
V.如果代码(6)判断结果为 false,则会执行代码(7)确保至少有一个线程在处理任务,即使核心线程数 corePoolSize 被设置为 0。ensurePrestart 的代码如下。
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
//增加核心线程数
if (wc < corePoolSize)
addWorker(null, true);
//如果初始化 corePoolSize==0,则也添加一个线程。
else if (wc == 0)
addWorker(null, false);
}
如上代码首先获取线程池中的线程个数,如果线程个数小于核心线程数则新增一个线程,否则如果当前线程数为 0 则新增一个线程。
上面我们分析了如何向延迟队列添加任务,下面我们来看线程池里面的线程如何获取并执行任务。在前面讲解 ThreadPoolExecutor 时我们说过,具体执行任务的线程是 Worker 线程,Worker 线程调用具体任务的 run 方法来执行。由于这里的任务是 ScheduledFutureTask,所以我们下面看看 ScheduledFutureTask 的 run 方法。
public void run() {
//(8)是否只执行一次
boolean periodic = isPeriodic();
//(9)取消任务
if (! canRunInCurrentRunState(periodic))
cancel(false);
//(10)只执行一次,调用 schedule 方法时候
else if (! periodic)
ScheduledFutureTask.super.run();
//(11)定时执行
else if (ScheduledFutureTask.super.runAndReset()) {
//(11.1)设置 time=time+period
setNextRunTime();
//(11.2)重新加入该任务到 delay 队列
reExecutePeriodic(outerTask);
}
}
VI.代码(8)中的 isPeriodic 的作用是判断当前任务是一次性任务还是可重复执行的任务,isPeriodic 的代码如下。
public boolean isPeriodic() {
return period ! = 0;
}
可以看到,其内部是通过 period 的值来判断的,由于转换任务在创建 ScheduledFutureTask 时传递的 period 的值为 0,所以这里 isPeriodic 返回 false。
VII.代码(9)判断当前任务是否应该被取消,canRunInCurrentRunState 的代码如下。
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
这里传递的 periodic 的值为 false,所以 isRunningOrShutdown 的参数为 executeExisti ngDelayedTasksAfterShutdown。executeExistingDelayedTasksAfterShutdown 默认为 true,表示当其他线程调用了 shutdown 命令关闭了线程池后,当前任务还是要执行,否则如果为 false,则当前任务要被取消。
VIII.由于 periodic 的值为 false,所以执行代码(10)调用父类 FutureTask 的 run 方法具体执行任务。FutureTask 的 run 方法的代码如下。
public void run() {
//(12)
if (state ! = NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
//(13)
try {
Callable<V> c = callable;
if (c ! = null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//(13.1)
setException(ex);
}
//(13.2)
if (ran)
set(result);
}
} finally {
...
}
}
代码(12)判断如果任务状态不是 NEW 则直接返回,或者如果当前任务状态为 NEW 但是使用 CAS 设置当前任务的持有者为当前线程失败则直接返回。代码(13)具体调用 callable 的 call 方法执行任务。这里在调用前又判断了任务的状态是否为 NEW,是为了避免在执行代码(12)后其他线程修改了任务的状态(比如取消了该任务)。
如果任务执行成功则执行代码(13.2)修改任务状态,set 方法的代码如下。
protected void set(V v) {
//如果当前任务的状态为 NEW,则设置为 COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
//设置当前任务的状态为 NORMAL,也就是任务正常结束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
如上代码首先使用 CAS 将当前任务的状态从 NEW 转换到 COMPLETING。这里当有多个线程调用时只有一个线程会成功。成功的线程再通过 UNSAFE.putOrderedInt 设置任务的状态为正常结束状态,这里没有使用 CAS 是因为对于同一个任务只可能有一个线程运行到这里。在这里使用 putOrderedInt 比使用 CAS 或者 putLongvolatile 效率要高,并且这里的场景不要求其他线程马上对设置的状态值可见。
请思考个问题,在什么时候多个线程会同时执行 CAS 将当前任务的状态从 NEW 转换到 COMPLETING?其实当同一个 command 被多次提交到线程池时就会存在这样的情况,因为同一个任务共享一个状态值 state。
如果任务执行失败,则执行代码(13.1)。setException 的代码如下,可见与 set 函数类似。
protected void setException(Throwable t) {
//如果当前任务的状态为 NEW,则设置为 COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
//设置当前任务的状态为 EXCEPTIONAL,也就是任务非正常结束
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion();
}
}
到这里代码(10)的逻辑执行完毕,一次性任务也就执行完毕了,
下面会讲到,如果任务是可重复执行的,则不会执行代码(10)而是执行代码(11)。
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)方法
该方法的作用是,当任务执行完毕后,让其延迟固定时间后再次运行(fixed-delay 任务)。其中 initialDelay 表示提交任务后延迟多少时间开始执行任务 command,delay 表示当任务执行完毕后延长多少时间后再次运行 command 任务,unit 是 initialDelay 和 delay 的时间单位。任务会一直重复运行直到任务运行中抛出了异常,被取消了,或者关闭了线程池。scheduleWithFixedDelay 的代码如下。
public ScheduledFuture<? > scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
//(14)参数校验
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
//(15)任务转换,注意这里是 period=-delay<0
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//(16)添加任务到队列
delayedExecute(t);
return t;
}
代码(14)进行参数校验,校验失败则抛出异常,代码(15)将 command 任务转换为 ScheduledFutureTask。这里需要注意的是,传递给 ScheduledFutureTask 的 period 变量的值为-delay,period<0 说明该任务为可重复执行的任务。然后代码(16)添加任务到延迟队列后返回。
将任务添加到延迟队列后线程池线程会从队列里面获取任务,然后调用 ScheduledFutureTask 的 run 方法执行。由于这里 period<0,所以 isPeriodic 返回 true,所以执行代码(11)。runAndReset 的代码如下。
protected boolean runAndReset() {
//(17)
if (state ! = NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
//(18)
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c ! = null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
...
}
return ran && s == NEW; //(19)
}
该代码和 FutureTask 的 run 方法类似,只是任务正常执行完毕后不会设置任务的状态,这样做是为了让任务成为可重复执行的任务。这里多了代码(19),这段代码判断如果当前任务正常执行完毕并且任务状态为 NEW 则返回 true,否则返回 false。如果返回了 true 则执行代码(11.1)的 setNextRunTime 方法设置该任务下一次的执行时间。setNextRunTime 的代码如下。
private void setNextRunTime() {
long p = period;
if (p > 0)//fixed-rate 类型任务
time += p;
else//fixed-delay 类型任务
time = triggerTime(-p);
}
这里 p<0 说明当前任务为 fixed-delay 类型任务。然后设置 time 为当前时间加上-p 的时间,也就是延迟-p 时间后再次执行。
总结:本节介绍的 fixed-delay 类型的任务的执行原理为,当添加一个任务到延迟队列后,等待 initialDelay 时间,任务就会过期,过期的任务就会被从队列移除,并执行。执行完毕后,会重新设置任务的延迟时间,然后再把任务放入延迟队列,循环往复。需要注意的是,如果一个任务在执行中抛出了异常,那么这个任务就结束了,但是不影响其他任务的执行。
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)方法
该方法相对起始时间点以固定频率调用指定的任务(fixed-rate 任务)。当把任务提交到线程池并延迟 initialDelay 时间(时间单位为 unit)后开始执行任务 command。然后从 initialDelay+period 时间点再次执行,而后在 initialDelay + 2 * period 时间点再次执行,循环往复,直到抛出异常或者调用了任务的 cancel 方法取消了任务,或者关闭了线程池。scheduleAtFixedRate 的原理与 scheduleWithFixedDelay 类似,下面我们讲下它们之间的不同点。首先调用 scheduleAtFixedRate 的代码如下。
public ScheduledFuture<? > scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
...
//装饰任务类,注意 period=period>0,不是负的
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
...
return t;
}
在如上代码中,在将 fixed-rate 类型的任务 command 转换为 ScheduledFutureTask 时设置 period=period,不再是-period。
所以当前任务执行完毕后,调用 setNextRunTime 设置任务下次执行的时间时执行的是 time += p 而不再是 time = triggerTime(-p)。
总结:相对于 fixed-delay 任务来说,fixed-rate 方式执行规则为,时间为 initdelday +nperiod 时启动任务,但是*如果当前任务还没有执行完,下一次要执行任务的时间到了,则不会并发执行,下次要执行的任务会延迟执行,要等到当前任务执行完毕后再执行。
总结
本章讲解了 ScheduledThreadPoolExecutor 的实现原理,如图 9-2 所示,其内部使用 DelayQueue 来存放具体任务。任务分为三种,其中一次性执行任务执行完毕就结束了,fixed-delay 任务保证同一个任务在多次执行之间间隔固定时间,fixed-rate 任务保证按照固定的频率执行。任务类型使用 period 的值来区分。