- ExecutorService接口
- ScheduledExecutorService接口
- ThreadPoolExecutor
- ScheduledThreadPoolExecutor
- Timer
Executor框架是concurrent包提供的用于执行线程任务的框架,它基于生产者-消费者模式实现,将提交任务的线程和执行任务的线程解耦。提交任务的线程视作生产者,执行任务的线程视作消费者。任务的执行策略可以通过定制不同的消费者实现,比如:任务可以同步执行,也可以异步执行;任务可以按照编排优先级,高优先级的任务可以优先执行;任务可以延迟执行或者按周期执行…这些实现对于生产者而言透明,生产者无需关注消费者的具体实现,仅需要按照业务需求提交任务即可。
Executor接口定义了concurrent包线程任务执行的入口,其扩展接口和实现类形成了一套满足线程任务执行的通用框架。
ExecutorService接口
作为Executor的扩展,分离了任务的执行(execute)和提交方法(submit/invoke*)。包含3种状态:
- 运行(running):ExecutorService创建后的状态,表示执行器处于正常状态,可以接受任务;
- 关闭(shutdown):执行shutdown/shutdownNow后的状态,此时不再接受新任务,且等待ExecutorService中的工作线程终止;
- 终止(terminated):ExecutorService中的所有工作线程终止运行,可通过isTerminated判断是否终止;通过awaitTermination同步等待终止;
ExecutorService定义了多个方法实现对线程任务状态更加精细的控制,扩展方法包括:
- void shutdown():触发Executor关闭操作,Executor在关闭前会等待已经提交的任务执行完成后才关闭。shutdown方法触发关闭操作后即刻返回,并不会等待任务执行完成才返回;
- List shutdownNow():触发Executor关闭操作,停止所有正在执行的任务,并返回所有等待执行的任务列表;
- boolean isShutdown():是否已经触发Executor的关闭操作,当执行shutdown/shutdownNow方法后,该方法返回true;
- boolean isTerminated():是否已经结束关闭操作;当关闭操作触发时,Executor并不一定立即关闭,可能需要等待已经提交任务执行完成后才关闭,因此只有当Executor真正执行完关闭操作后,该方法返回true;
- boolean awaitTermination(long timeout, TimeUnit unit):调用该方法的线程会阻塞等待,直到Executor执行完关闭操作;阻塞期间可能因超时(返回false)或者被中断(抛出InterruptedException异常)返回;
- Future submit(Callable task):提交一个线程任务,返回一个Futrue对象,以便接收和处理线程执行结果;
- Future submit(Runnable task, T result):同上,因为Runnable的run方法没有返回值,故单独用一个参数存放执行结果;
- Future<?> submit(Runnable task):同上;
- List> invokeAll(Collection<? extends Callable> tasks):批量提交任务,同步等待全部任务执行完并返回Future对象集合存放每个任务的执行结果;
- List> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit):同上,允许设置超时时间;
- T invokeAny(Collection<? extends Callable> tasks):执行所提交的任务,同步等待第一个已经执行完成的任务并返回结果,其余未完成任务将被取消执行;
- T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit):同上,允许设置超时时间;
面试问题:Runnable接口和Callable接口有何异同? 相同点:均被设计用来抽象线程任务的执行,将任务的提交和任务的执行解耦,使得任务的提交和执行可以放在不同的线程中执行; 不同点:Callable接口允许返回计算结果,也可以抛出异常,在任务执行过程中可以异步的捕获异常,也可以获取线程执行结果;而Runnable接口不允许这样做,通常异常处理需要放在接口内实现,计算结果也无法直接返回,需要借助消息队列等其它数据结构和组件实现; 注意:是用execute方法和submit方法提交任务在异常处理上存在区别: execute提交的任务在执行时如果抛出未捕获异常,则可以由java.lang.Thread.UncaughtExceptionHandler捕获处理。而submit提交的任务执行过程中如果抛出未捕获异常,将被视为任务执行结果的一部分,异常通过返回Future.get封装在ExecutionException中抛出。
ScheduledExecutorService接口
定时任务执行入口,允许延迟或者周期性调度线程任务,类似java.util.Timer的调度功能。
面试问题:java.util.Timer和ScheduledExecutorService接口的实现有何异同?
- 提交任务参数不同:Timer的任务参数必须是TimerTask类型;而ScheduledExecutorService参数相对灵活,可以是Runnable也可以是Callable类型;
- 调度线程不同:一个Timer对象包含一个TimerThread和一个TaskQueue对象,TimerThread线程用于调度所有的TimerTask任务,TaskQueue负责存放当前Timer对象等待调度的所有任务。TaskQueue是一个优先队列,基于堆实现,调度效率为log(n),任务通过下一次执行时间进行排序,以便保证最近的任务最先被调度执行。在创建Timer之后,TimerThread即已创建并且调用start方法启动,Timer调度任务是逐个调度,如果队列中的任务执行时间过长,将会导致整个任务序列执行时间的延迟。ScheduledExecutorService的实现类对执行任务的线程控制更加灵活,如ScheduledThreadPoolExecutor使用线程池来调度任务,可以同时调度多个任务,减少了因为任务排队执行而造成的延迟;当ScheduledThreadPoolExecutor指定的线程池大小为1时,效果等同于使用Timer;
- 时间单位不同:ScheduledExecutorService使用TimeUnit指定时间,更加灵活;
- 调度时间基准不同:Timer基于绝对时间调度,因而对操作系统的时钟变化敏感,一旦操作系统调整时钟,可能导致任务执行时间变化;ScheduledThreadPoolExecutor基于相对时间调度,因而对于系统时钟变化不敏感,具有更好的兼容性;
- Timer存在线程泄漏问题:Timer对于执行任务抛出的异常不作捕获处理,一旦任务抛出异常,将导致整个Timer不可用,累及其它待执行任务;而ScheduledThreadPoolExecutor可以正确处理任务抛出的异常;
ScheduledExecutorService接口方法包括:
- ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit):以给定延迟调度一个一次性的任务,当任务执行完成后,ScheduledFuture<?>的get方法返回null;
- ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit):同上,在任务执行完后,返回Callable执行结果;
- ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):周期性的执行给定任务。任务第一次执行时延迟initialDelay时长,然后按照( initialDelay + period*( n - 1 ) ,n表示任务执行次数)固定周期执行;如果单次任务执行时间超过period,则下一次执行将会等待当次执行完成后立即执行,而不会并行执行;
- ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):周期性的执行给定任务。任务第一次执行时延迟initialDelay时长,然后按照( 上一次结束时间 + period )周期性执行,因而该方法执行任务的周期并不固定,会根据任务执行时间的长短变化;上一次任务结束到下一次任务执行开始,总是相隔period时长;
面试问题:ScheduledExecutorService接口的scheduleAtFixedRate与scheduleWithFixedDelay区别? 解释如上
ThreadPoolExecutor
Java源码里面都有大量的注释,认真读懂这些注释,就可以把握其七分工作机制了。关于ThreadPoolExecutor的解析,我们就从其类注释开始。
ThreadPoolExecutor.png
现将注释大致翻译如下:
ExecutorService(ThreadPoolExecutor的顶层接口)使用线程池中的线程执行每个提交的任务,通常我们使用Executors的工厂方法来创建ExecutorService。
线程池解决了两个不同的问题:
- 提升性能:它们通常在执行大量异步任务时,由于减少了每个任务的调用开销,并且它们提供了一种限制和管理资源(包括线程)的方法,使得性能提升明显;
- 统计信息:每个ThreadPoolExecutor保持一些基本的统计信息,例如完成的任务数量。
为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 但是,在常见场景中,我们预配置了几种线程池,我们敦促程序员使用更方便的Executors的工厂方法直接使用。
- Executors.newCachedThreadPool(无界线程池,自动线程回收)
- Executors.newFixedThreadPool(固定大小的线程池);
- Executors.newSingleThreadExecutor(单一后台线程);
注:这里没有提到ScheduledExecutorService ,后续解析。
在自定义线程池时,请参考以下指南:
1、Core and maximum pool sizes 核心和最大线程池数量
参数 | 翻译 |
---|---|
corePoolSize | 核心线程池数量 |
maximumPoolSize | 最大线程池数量 |
线程池执行器将会根据corePoolSize和maximumPoolSize自动地调整线程池大小。
当在execute(Runnable)方法中提交新任务并且少于corePoolSize线程正在运行时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。 如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则仅当队列已满时才会创建新线程。 通过设置corePoolSize和maximumPoolSize相同,您可以创建一个固定大小的线程池。 通过将maximumPoolSize设置为基本上无界的值,例如Integer.MAX_VALUE,您可以允许池容纳任意数量的并发任务。 通常,核心和最大池大小仅在构建时设置,但也可以使用setCorePoolSize
和setMaximumPoolSize
进行动态更改。
这段话详细了描述了线程池对任务的处理流程,这里用个图总结一下
线程任务处理流程.png
2、prestartCoreThread 核心线程预启动
在默认情况下,只有当新任务到达时,才开始创建和启动核心线程,但是我们可以使用 prestartCoreThread()
和 prestartAllCoreThreads()
方法动态调整。
如果使用非空队列构建池,则可能需要预先启动线程。
方法 | 作用 |
---|---|
prestartCoreThread() | 创一个空闲任务线程等待任务的到达 |
prestartAllCoreThreads() | 创建核心线程池数量的空闲任务线程等待任务的到达 |
3、ThreadFactory 线程工厂
新线程使用ThreadFactory创建。 如果未另行指定,则使用Executors.defaultThreadFactory默认工厂,使其全部位于同一个ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护进程状态。
通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从newThread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。
线程应该有modifyThread权限。 如果工作线程或使用该池的其他线程不具备此权限,则服务可能会降级:配置更改可能无法及时生效,并且关闭池可能会保持可终止但尚未完成的状态。
4、Keep-alive times 线程存活时间
如果线程池当前拥有超过corePoolSize的线程,那么多余的线程在空闲时间超过keepAliveTime时会被终止 ( 请参阅getKeepAliveTime(TimeUnit) )。这提供了一种在不积极使用线程池时减少资源消耗的方法。
如果池在以后变得更加活跃,则应构建新线程。 也可以使用方法setKeepAliveTime(long,TimeUnit)
进行动态调整。
防止空闲线程在关闭之前终止,可以使用如下方法:
setKeepAliveTime(Long.MAX_VALUE,TimeUnit.NANOSECONDS);
默认情况下,keep-alive策略仅适用于存在超过corePoolSize线程的情况。 但是,只要keepAliveTime值不为零,方法allowCoreThreadTimeOut(boolean)
也可用于将此超时策略应用于核心线程。
5、Queuing 队列
BlockingQueu用于存放提交的任务,队列的实际容量与线程池大小相关联。
- 如果当前线程池任务线程数量小于核心线程池数量,执行器总是优先创建一个任务线程,而不是从线程队列中取一个空闲线程。
- 如果当前线程池任务线程数量大于核心线程池数量,执行器总是优先从线程队列中取一个空闲线程,而不是创建一个任务线程。
- 如果当前线程池任务线程数量大于核心线程池数量,且队列中无空闲任务线程,将会创建一个任务线程,直到超出maximumPoolSize,如果超时maximumPoolSize,则任务将会被拒绝。
这个过程参考[线程任务处理流程图.png]
主要有三种队列策略:
- Direct handoffs 直接握手队列
Direct handoffs 的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不需要保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。
此策略在处理可能具有内部依赖关系的请求集时避免锁定。Direct handoffs 通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致线程数量会无限增长问题。 - Unbounded queues 无界队列
当所有corePoolSize线程繁忙时,使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。当每个任务互不影响,完全独立于其他任务时,这可能是合适的; 例如,在网页服务器中, 这种队列方式可以用于平滑瞬时大量请求。但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致队列无限增长问题。 - Bounded queues 有界队列
一个有界的队列(例如,一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽,但是难以控制。队列大小和maximumPoolSizes需要 相互权衡:
- 使用大队列和较小的maximumPoolSizes可以最大限度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常被阻塞(比如I/O限制),那么系统可以调度比我们允许的更多的线程。
- 使用小队列通常需要较大的maximumPoolSizes,这会使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。
这里主要为了说明有界队列大小和maximumPoolSizes的大小控制,若何降低资源消耗的同时,提高吞吐量
6、Rejected tasks 拒绝任务
拒绝任务有两种情况:1. 线程池已经被关闭;2. 任务队列已满且maximumPoolSizes已满;
无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。预定义了四种处理策略:
- AbortPolicy:默认测策略,抛出RejectedExecutionException运行时异常;
- CallerRunsPolicy:这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度;
- DiscardPolicy:直接丢弃新提交的任务;
- DiscardOldestPolicy:如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程);
我们可以自己定义RejectedExecutionHandler,以适应特殊的容量和队列策略场景中。7、Hook methods 钩子方法
ThreadPoolExecutor为提供了每个任务执行前后提供了钩子方法,重写beforeExecute(Thread,Runnable)
和afterExecute(Runnable,Throwable)
方法来操纵执行环境; 例如,重新初始化ThreadLocals,收集统计信息或记录日志等。此外,terminated()
在Executor完全终止后需要完成后会被调用,可以重写此方法,以执行任殊处理。
注意:如果hook或回调方法抛出异常,内部的任务线程将会失败并结束。8、Queue maintenance 维护队列
getQueue()
方法可以访问任务队列,一般用于监控和调试。绝不建议将这个方法用于其他目的。当在大量的队列任务被取消时,remove()
和purge()
方法可用于回收空间。9、Finalization 关闭
如果程序中不在持有线程池的引用,并且线程池中没有线程时,线程池将会自动关闭。如果您希望确保即使用户忘记调用shutdown()
方法也可以回收未引用的线程池,使未使用线程最终死亡。那么必须通过设置适当的 keep-alive times 并设置allowCoreThreadTimeOut(boolean) 或者 使 corePoolSize下限为0 。一般情况下,线程池启动后建议手动调用shutdown()关闭。
预定义线程池
1、FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
- keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;
- workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
- FixedThreadPool的任务执行是无序的;
适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。
2、CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
- keepAliveTime = 60s,线程空闲60s后自动结束。
- workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;
适用场景:快速处理大量耗时较短的任务,如Netty的NIO接受请求时,可使用CachedThreadPool。
3、SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
咋一瞅,不就是newFixedThreadPool(1)吗?定眼一看,这里多了一层FinalizableDelegatedExecutorService包装,这一层有什么用呢,写个dome来解释一下:
public static void main(String[] args) {
ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService;
System.out.println(threadPoolExecutor.getMaximumPoolSize());
threadPoolExecutor.setCorePoolSize(8);
ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
// 运行时异常 java.lang.ClassCastException
// ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) singleExecutorService;
}
对比可以看出,FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。
4、ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
对于ScheduledThreadPool关注后续篇章。
执行原理
分析ThreadPoolExecutor的执行原理,直接从execute
方法开始
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1、工作线程 < 核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2、运行态,并尝试将任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} // 3、使用尝试使用最大线程运行
else if (!addWorker(command, false))
reject(command);
}
这三处if判断,还是比较泛的,整体大框框上的流程,可用下图表示。
线程任务处理流程.png
在execute方法中,用到了double-check的思想,我们看到上述代码中并没有同步控制,都是基于乐观的check,如果任务可以创建则进入addWorker(Runnable firstTask, boolean core)方法,注意上述代码中的三种传参方式:
- addWorker(command, true): 创建核心线程执行任务;
- addWorker(command, false):创建非核心线程执行任务;
- addWorker(null, false): 创建非核心线程,当前任务为空;
addWorker的返回值是boolean,不保证操作成功。下面详看addWorker方法(代码稍微有点长):
private boolean addWorker(Runnable firstTask, boolean core) {
// 第一部分:自旋、CAS、重读ctl 等结合,直到确定是否可以创建worker,
// 可以则跳出循环继续操作,否则返回false
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c)) // CAS增长workerCount,成功则跳出循环
break retry;
c = ctl.get(); // Re-read ctl 重新获取ctl
if (runStateOf(c) != rs) // 状态改变则继续外层循环,否则在内层循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 第二部分:创建worker,这部分使用ReentrantLock锁
boolean workerStarted = false; // 线程启动标志位
boolean workerAdded = false; // 线程是否加入workers 标志位
Worker w = null;
try {
w = new Worker(firstTask); //创建worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 获取到锁以后仍需检查ctl,可能在上一个获取到锁处理的线程可能会改变runState
// 如 ThreadFactory 创建失败 或线程池被 shut down等
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 失败操作
}
return workerStarted;
}
addWorker的工作可分为两个部分:
- 第一部分:原子操作,判断是否可以创建worker。通过自旋、CAS、ctl 等操作,判断继续创建还是返回false,自旋周期一般很短。
- 第二部分:同步创建workder,并启动线程。
第一部分思路理清楚,就可以理解了。下面详解第二部分的Worker:
Worker类图
Worker是ThreadPoolExecutor的内部类,实现了 AbstractQueuedSynchronizer 并继承了 Runnable。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 每个worker有自己的内部线程,ThreadFactory创建失败时是null */
final Thread thread;
/** 初始化任务,可能是null */
Runnable firstTask;
/** 每个worker的完成任务数 */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 禁止线程在启动前被打断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 重要的执行方法 */
public void run() {
runWorker(this);
}
// state = 0 代表未锁;state = 1 代表已锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// interrupt已启动线程
void interruptIfStarted() {
Thread t;
// 初始化是 state = -1,不会被interrupt
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker 实现了简单的 非重入互斥锁,互斥容易理解,非重入是为了避免线程池的一些控制方法获得重入锁,比如setCorePoolSize操作。注意 Worker 实现锁的目的与传统锁的意义不太一样。其主要是为了控制线程是否可interrupt,以及其他的监控,如线程是否 active(正在执行任务)。
线程池里线程是否处于运行状态与普通线程不一样,普通线程可以调用 Thread.currentThread().isAlive() 方法来判断,而线程池,在run方法中可能在等待获取新任务,这期间线程线程是 alive 但是却不是 active。
runWorker代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许被 interrupt
boolean completedAbruptly = true;
try {
// loop 直至 task = null (线程池关闭、超时等)
// 注意这里的getTask()方法,我们配置的阻塞队列会在这里起作用
while (task != null || (task = getTask()) != null) {
w.lock(); // 执行任务前上锁
// 如果线程池停止,确保线程中断; 如果没有,确保线程不中断。这需要在第二种情况下进行重新获取ctl,以便在清除中断时处理shutdownNow竞争
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 扩展点
Throwable thrown = null;
try {
task.run(); // 真正执行run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 扩展点
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 线程退出工作
}
}
runWorker的主要任务就是一直loop循环,来一个任务处理一个任务,没有任务就去getTask(),getTask()可能会阻塞,代码如下:
private Runnable getTask() {
boolean timedOut = false; // 上一次 poll() 是否超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 是否继续处理任务 可以参见上一篇的状态控制
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 是否允许超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask()方法里面主要用我们配置的workQueue来工作,其阻塞原理与超时原理基于阻塞队列实现,这里不做详解。
ScheduledThreadPoolExecutor
简介
在探讨时 ThreadPoolExecutor 只介绍了FixedThreadPool、CachedThreadPool、SingleThreadExecutor,并没有去介绍ScheduledThreadPoolExecutor,因为 ScheduledThreadPoolExecutor 与其他线程池的概念有些区别,它是一个支持任务周期性调度的线程池。
ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor,同时通过实现 ScheduledExecutorSerivce 来扩展基础线程池的功能,使其拥有了调度能力。其整个调度的核心在于内部类 DelayedWorkQueue ,一个有序的延时队列。
ScheduledThreadPoolExecutor类图.png
ScheduledThreadPoolExecutor 的出现,很好的弥补了传统 Timer 的不足,具体对比看下表:
Timer | ScheduledThreadPoolExecutor | |
---|---|---|
线程 | 单线程 | 多线程 |
多任务 | 任务之间相互影响 | 任务之间不影响 |
调度时间 | 绝对时间 | 相对时间 |
异常 | 单任务异常, 后续任务受影响 |
无影响 |
构造方法
ScheduledThreadPoolExecutor有三个构造形式:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
关于父类的构造可参见 ThreadPoolExecutor。当然我们也可以使用工具类Executors的newScheduledThreadPool的方法,快速创建。注意这里使用的DelayedWorkQueue。
ScheduledThreadPoolExecutor没有提供带有最大线程数的构造函数的,默认是Integer.MAX_VALUE,说明其可以无限制的开启任意线程执行任务,在大量任务系统,应注意这一点,避免内存溢出。
核心方法
核心方法主要介绍ScheduledThreadPoolExecutor的调度方法,其他方法与 ThreadPoolExecutor 一致。调度方法均由 ScheduledExecutorService 接口定义:
public interface ScheduledExecutorService extends ExecutorService {
// 特定时间延时后执行一次Runnable
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
// 特定时间延时后执行一次Callable
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
// 固定周期执行任务(与任务执行时间无关,周期是固定的)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
// 固定延时执行任务(与任务执行时间有关,延时从上一次任务完成后开始)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
代码中注释了每个方法的作用,需注意固定周期与固定延时的区别。下面分别对这些方法进行测试:
public class ScheduledPoolTest {
private static final SimpleDateFormat FORMAT = new SimpleDateFormat("hh:mm:ss");
private static final Random RANDOM = new Random();
/**
* 输出:
* 11:04:32
11:04:35
*/
public static void schedule() {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
printTime();
scheduledExecutorService.schedule(new Task(), 3, TimeUnit.SECONDS);
}
/**
* 输出:
* 11:05:34
11:05:36
11:05:46
11:05:56
11:06:06
11:06:16
......
*/
public static void scheduleAtFixedRate() {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
printTime();
scheduledExecutorService.scheduleAtFixedRate(new Task(), 2, 10, TimeUnit.SECONDS);
}
/**
* 输出:
* 11:07:39
11:07:41
11:07:54
11:08:08
11:08:22
11:08:33
......
*/
public static void scheduleWithFixedDelay() {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
printTime();
scheduledExecutorService.scheduleWithFixedDelay(new Task(), 2, 10, TimeUnit.SECONDS);
}
static class Task implements Runnable{
public void run() {
printTime();
try {
Thread.sleep(RANDOM.nextInt(5) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void printTime() {
Date date = new Date();
System.out.println(FORMAT.format(date));
}
}
为了体现scheduleAtFixedRate和scheduleWithFixedDelay的差别,在代码中我们加入了随机睡眠时间,使任务执行不确定。从注释中的输出我们可以看到scheduleAtFixedRate的任务运行周期不受任务执行时间的影响,而scheduleWithFixedDelay的任务运行周期受任务执行时间影响较大。
但需注意,如果任务的执行时间超过任务调度周期,比如任务执行需要10s,而给定执行时间间隔是5s的话,任务的调度是在任务10s执行完之后立即重新执行,而不是5s的周期。
总结
ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基础上扩展了 线程周期调度功能,使用时应注意控制其调度的时间点。
调度原理
ScheduledThreadPoolExecutor 的调度原理主要基于两个内部类,ScheduledFutureTask 和 DelayedWorkQueue:
- ScheduledFutureTask 是对任务的一层封装,将我们提交的 Runnable 或 Callable 封装成具有时间周期的任务;
- DelayedWorkQueue 实现了对 ScheduledFutureTask 的延迟出队管理;
ScheduledFutureTask
ScheduledFutureTask类图
ScheduledFutureTask有以下几种构造方法:
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
super 中调用 FutureTask 的构造方法,可以参考 FutureTask实现原理。ScheduledFutureTask 主要配置参数如下:
名称 | 含义 |
---|---|
time | 任务能够执行的时间点(单位:nanoTime ) |
period | 正值表示固定时间周期执行。 负值表示固定延迟周期执行。 0表示非重复任务。 |
sequenceNumber | FIFO调度序列值(用 AtomicLong 实现) |
注意:period 大于 0 或 小于 0 时,都是周期性执行的,只是执行时间规律不一样。
ScheduledFutureTask 的主要调度辅助方法如下:
// 任务的延迟执行时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
//实现任务的排序,执行时间越小越靠前,相同则按照队列FIFO顺序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber) // 时间一样时,按照FIFO的顺序
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
// 是否是周期性任务
public boolean isPeriodic() {
return period != 0;
}
// 设置下一次运行时间
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p; // 按固定时间周期,下次执行时间为上次执行时间 + 周期时间
else
time = triggerTime(-p); // 按固定延时周期,下次执行时间为当前时间 + 延时时间
}
核心 run 方法
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic)) // 判断是否可以运行任务
cancel(false); // 取消任务,移除队列
else if (!periodic) // 非周期性任务 直接调用父类 FutureTask 的 run 方法
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) { // 周期性任务,调用父类 runAndReset 方法,返回是否执行成功
// 执行成功后继续设置下一次运行时间
setNextRunTime();
// 重新执行周期性任务(可能因为线程池运行状态的改变而被拒绝)
reExecutePeriodic(outerTask);
}
}
对于周期性任务,在 run 方法中执行成功后会继续设置下一次执行时间,并把任务加入延时队列。但需注意,如果任务执行失败,将不会再被周期性调用。所以在可能执行失败的周期性任务中,必须做好异常处理。
DelayedWorkQueue
DelayedWorkQueue 是一个延时有序队列,内部采用 数组 维护队列元素,采用 堆排序 的思想维护队列顺序,并在队列元素(ScheduledFutureTask)建立索引,支持快速删除。
注意:DelayedWorkQueue 的整个队列不是完全有序的,只保证元素有序出队。
DelayedWorkQueue类图
下面详细讲解 DelayedWorkQueue 的实现:
核心入队方法:
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow(); // 队列扩容 类似 ArrayList 扩容
size = i + 1;
if (i == 0) { // 队列为空,直接加入
queue[0] = e;
setIndex(e, 0); // 设置元素在队列的索引,即告诉元素自己在队列的第几位
} else {
siftUp(i, e); // 放入适当的位置
}
if (queue[0] == e) {
leader = null; // 等待队列头的线程
available.signal(); // 通知
}
} finally {
lock.unlock();
}
return true;
}
入队方法中最重要的是 siftUp 方法, sift 在英文单词中是 筛
的意思,这里可将 siftUp 理解为向前筛,找到合适的 堆排序点 加进去。
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1; // (k-1)/2
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
siftUp 主要思想是将新增的任务与前 (k-1)/2 的位置比较,如果任务执行时间较近者替换位置 (k-1)/2。依次往前比较,直到无替换发生。每次新增元素调用 siftUp 仅能保证第一个元素是最小的。整个队列不一定有序:
例将:5 10 9 3 依次入队,队列变化如下
[5]
[5,10]
[5,9,10]
[3,5,10,9]
如果对上述的入队方式不了解,可用下面的排序代码进行断点调试:
// DelayedWorkQueue 的入队、出队排序模拟
public class SortArray {
Integer[] queue = new Integer[16];
int size = 0;
public static void main(String[] args) {
SortArray array = new SortArray();
array.add(5);
array.add(9);
array.add(10);
array.add(3);
System.out.println(array.take());
System.out.println(array.take());
System.out.println(array.take());
System.out.println(array.take());
}
boolean add(Integer e) {
if (e == null)
throw new NullPointerException();
int i = size;
size = i + 1;
if (i == 0) {
queue[0] = e;
} else {
siftUp(i, e);
}
return true;
}
Integer take() {
Integer i = queue[0];
int s = --size;
Integer k = queue[s];
if (size != 0)
siftDown(0, k);
return i;
}
private void siftUp(int k, Integer key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Integer e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
private void siftDown(int k, Integer key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Integer c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}
}
核心出队方法:
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 直接获取队首任务
RunnableScheduledFuture<?> first = queue[0];
if (first == null) // 空 则等待
available.await();
else {
long delay = first.getDelay(NANOSECONDS); // 看任务是否可以执行
if (delay <= 0)
return finishPoll(first); // 可执行,则进行出队操作
// 可不执行,还需等待,则往下走
first = null;
// 看是否有正在等待的leader线程
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 延时等待
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
代码中的 available 是一个信号量,会在队列的头部有新任务变为可用或者新线程可能需要成为领导者时,发出信号。
private final Condition available = lock.newCondition();
take() 方法中重要的方法是 finishPoll(first)
,主要进行出队时维护队列顺序:
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
siftDown 跟前面的 siftUp 很像,它也只能保证出队后下一个仍为最近的任务。并不会移动和清理整个队列。
还是用上面列出的 SortArray 这个类为例:
public static void main(String[] args) {
SortArray array = new SortArray();
array.add(5);
array.add(9);
array.add(10);
array.add(3);
System.out.println(Arrays.toString(array.queue));
System.out.println(array.take());
System.out.println(array.take());
System.out.println(array.take());
System.out.println(array.take());
System.out.println(Arrays.toString(array.queue));
array.add(20);
array.add(4);
System.out.println(Arrays.toString(array.queue));
}
我们先将5,9,10,3 依次入队,然后全部出队,再入队 20,4,我们看下最后的队列里面的数据是什么样子:
[3, 5, 10, 9, null, null, null, null, null, null, null, null, null, null, null, null]
3
5
9
10
[10, 10, 10, 9, null, null, null, null, null, null, null, null, null, null, null, null]
[4, 20, 10, 9, null, null, null, null, null, null, null, null, null, null, null, null]
看了这个结果你可能有点奇怪,已经出队了的元素居然还在队列里面。这是一种 lazy 策略,DelayedWorkQueue 并不会真正直接清理掉队列里出队的元素,用 size 来控制队列的逻辑大小,并发物理实际大小,后来的元素会根据size来覆盖原有的元素。
关于 DelayedWorkQueue 的出队和入队还有疑问的,可以自己调试 SortArray 的代码,看看不同的情况的不同处理结果。DelayedWorkQueue 的 siftUp 、siftDown 这种排序策略非常高效,并非维护整个队列实时有序,只保证第一个出队元素的正确性。
元素删除
上文有提到 ScheduledFutureTask 的索引,DelayedWorkQueue 运用索引可以快速定位删除元素:
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement); // 顺序调整
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
// 使用索引获取下标
private int indexOf(Object x) {
if (x != null) {
if (x instanceof ScheduledFutureTask) {
int i = ((ScheduledFutureTask) x).heapIndex; // 索引
if (i >= 0 && i < size && queue[i] == x)
return i;
} else {
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}
remove方法里面首先利用 indexOf
调用索引获取下标,然后使用 siftDown
,siftUp
来调整队列顺序。这里索引的使用能够极大提高元素定位的效率,尤其是在队列比较长的时候。
最后思考一个问题:为什么 DelayedWorkQueue 使用数组而不是链表结构?
个人认为,因为使用数据结构,利用下标快速访问,可以发挥基于 siftDown
,siftUp
的高效排序算法,而链表的下标访问效率低,因此选择使用数组。
Timer
Timer在JDK里面,是很早的一个API了。具有延时的,并具有周期性的任务,在newScheduledThreadPool出来之前我们一般会用Timer和TimerTask来做,但是Timer存在一些缺陷,为什么这么说呢?
Timer的缺陷
Timer只创建唯一的线程来执行所有Timer任务。如果一个timer任务的执行很耗时,会导致其他TimerTask的时效准确性出问题。例如一个TimerTask每10秒执行一次,而另外一个TimerTask每40ms执行一次,重复出现的任务会在后来的任务完成后快速连续的被调用4次,要么完全“丢失”4次调用。Timer的另外一个问题在于,如果TimerTask抛出未检查的异常会终止timer线程。这种情况下,Timer也不会重新回复线程的执行了;它错误的认为整个Timer都被取消了。此时已经被安排但尚未执行的TimerTask永远不会再执行了,新的任务也不能被调度了。
这里做了一个小的 demo 来复现问题,代码如下:
package com.hjc;
import java.util.Timer;
import java.util.TimerTask;
/**
* Created by cong on 2018/7/12.
*/
public class TimerTest {
//创建定时器对象
static Timer timer = new Timer();
public static void main(String[] args) {
//添加任务1,延迟500ms执行
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("---one Task---");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException("error ");
}
}, 500);
//添加任务2,延迟1000ms执行
timer.schedule(new TimerTask() {
@Override
public void run() {
for (;;) {
System.out.println("---two Task---");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}, 1000);
}
}
如上代码先添加了一个任务在 500ms 后执行,然后添加了第二个任务在 1s 后执行,我们期望的是当第一个任务输出 —-one Task—- 后等待 1s 后第二个任务会输出 —-two Task—-,
但是执行完毕代码后输出结果如下所示:
例子2,
public class Shedule {
private static long start;
public static void main(String[] args) {
TimerTask task = new TimerTask() {
public void run() {
System.out.println(System.currentTimeMillis()-start);
try{
Thread.sleep(3000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
};
TimerTask task1 = new TimerTask() {
@Override
public void run() {
System.out.println(System.currentTimeMillis()-start);
}
};
Timer timer = new Timer();
start = System.currentTimeMillis();
//启动一个调度任务,1S钟后执行
timer.schedule(task,1000);
//启动一个调度任务,3S钟后执行
timer.schedule(task1,3000);
}
}
上面程序我们预想是第一个任务执行后,第二个任务3S后执行的,即输出一个1000,一个3000.
实际运行结果如下:
实际运行结果并不如我们所愿。世界结果,是过了4S后才输出第二个任务,即4001约等于4秒。那部分时间时间到哪里去了呢?那个时间是被我们第一个任务的sleep所占用了。
现在我们在第一个任务中去掉Thread.sleep();这一行代码,运行是否正确了呢?运行结果如下:
可以看到确实是第一个任务过了1S后执行,第二个任务在第一个任务执行完后过3S执行了。
这就说明了Timer只创建唯一的线程来执行所有Timer任务。如果一个timer任务的执行很耗时,会导致其他TimerTask的时效准确性出问题。
Timer 实现原理分析
下面简单介绍下 Timer 的原理,如下图是 Timer 的原理模型介绍:
1.其中 TaskQueue 是一个平衡二叉树堆实现的优先级队列,每个 Timer 对象内部有唯一一个 TaskQueue 队列。用户线程调用 timer 的 schedule 方法就是把 TimerTask 任务添加到 TaskQueue 队列,在调用 schedule 的方法时候 long delay 参数用来说明该任务延迟多少时间执行。
2.TimerThread 是具体执行任务的线程,它从 TaskQueue 队列里面获取优先级最小的任务进行执行,需要注意的是只有执行完了当前的任务才会从队列里面获取下一个任务而不管队列里面是否有已经到了设置的 delay 时间,一个 Timer 只有一个 TimerThread 线程,所以可知 Timer 的内部实现是一个多生产者单消费者模型。
从实现模型可以知道要探究上面的问题只需看 TimerThread 的实现就可以了,TimerThread 的 run 方法主要逻辑源码如下:
public void run() {
try {
mainLoop();
} finally {
// 有人杀死了这个线程,表现得好像Timer已取消
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // 消除过时的引用
}
}
}
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
//从队列里面获取任务时候要加锁
synchronized(queue) {
......
}
if (taskFired)
task.run();//执行任务
} catch(InterruptedException e) {
}
}
}
可知当任务执行过程中抛出了除 InterruptedException 之外的异常后,唯一的消费线程就会因为抛出异常而终止,那么队列里面的其他待执行的任务就会被清除。所以 TimerTask 的 run 方法内最好使用 try-catch 结构 catch 主可能的异常,不要把异常抛出到 run 方法外。
其实要实现类似 Timer 的功能使用 ScheduledThreadPoolExecutor 的 schedule 是比较好的选择。ScheduledThreadPoolExecutor 中的一个任务抛出了异常,其他任务不受影响的。
ScheduledThreadPoolExecutor 例子如下:
/**
* Created by cong on 2018/7/12.
*/
public class ScheduledThreadPoolExecutorTest {
static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
public static void main(String[] args) {
scheduledThreadPoolExecutor.schedule(new Runnable() {
public void run() {
System.out.println("---one Task---");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException("error ");
}
}, 500, TimeUnit.MICROSECONDS);
scheduledThreadPoolExecutor.schedule(new Runnable() {
public void run() {
for (int i =0;i<5;++i) {
System.out.println("---two Task---");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, 1000, TimeUnit.MICROSECONDS);
scheduledThreadPoolExecutor.shutdown();
}
}
运行结果如下:
之所以 ScheduledThreadPoolExecutor 的其他任务不受抛出异常的任务的影响是因为 ScheduledThreadPoolExecutor 中的 ScheduledFutureTask 任务中 catch 掉了异常,但是在线程池任务的 run 方法内使用 catch 捕获异常并打印日志是最佳实践。