线程池的具体实现有两种,分别是ThreadPoolExecutor 和ScheduledThreadPoolExecutor 普通线程池和定时线程池,上一篇已经分析过ThreadPoolExecutor原理与使用了,本篇我们来重点分析下ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor

从ScheduledThreadPoolExecutor的类图可以看出,其继承了ThreadPoolExecutor,并在这基础上做了功能的拓展和封装。
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}
从上面可以看出,其构造方法和普通线程池的方法一样,只不过定时线程池运用内部封装的DelayedWorkQueue作为阻塞队列,以ScheduledFutureTask类型任务作为线程池调度的最小单位。
DelayedWorkQueue

DelayedWorkQueue也是一种设计为定时任务的延迟队列,它的实现和DelayQueue一样,不过是将优先级队列和DelayQueue的实现过程迁移到本身方法体中,从而可以在该过程当中灵活的加入定时任务特有的方法调用
1. DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若 time相同则根据sequenceNumber排序;
2. DelayQueue也是一个无界队列
更多内容可以可以参考之前的文章《阻塞队列-DelayedQueue》
ScheduledFutureTask

ScheduledFutureTask继承了FutureTask,可以通过调用get()方法获取任务执行的结果。并且实现了Comparable接口,通过比较时间大小实现优先级排序
SchduledFutureTask主要接收的参数
private long time; //任务开始的时间private final long sequenceNumber; //任务的序号,任务添加到ScheduledThreadPoolExecutor中会分配唯一的序列号private final long period; //任务执行的时间间隔RunnableScheduledFuture<V> outerTask = this //下次执行的任务,该属性是实现周期执行的关键
ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}-------------------ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}
SchduledFutureTask的排序实现
- 首先按照time排序,time小的排在前面,time大的排在后面;
- 如果time相同,按照sequenceNumber排序,sequenceNumber小的排在前面,sequenceNumber大的排在后面,换句话说,如果两个task的执行时间相同, 优先执行先提交的task。
public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}
SchduledFutureTask计算任务开始执行时间
当前时间加上延迟时间,其中如果delay小于 Long.MAX_VALUE/2,则直接返回delay,否则需要处理溢出的情况。
long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}
工作原理
任务提交
SchduledFutureTask提交任务的方式有三种
schedule():任务在指定延迟时间到达后触发,只会执行一次
scheduledAtFixedRate():固定周期执行任务(不管上一个任务有没有执行完,只要时间周期到了就将任务放到阻塞队列中)
scheduledWithFixedDelay():固定延时执行任务(与任务执行时间有关,在上一个任务执行完后延时指定时间,再开始执行下一个任务)
schedule() 普通延时任务
调用decorateTask()进行包装,该方法是留给用户去扩展的,默认是个空方法 ,返回的是参数中new出来的SchduledFutureTask对象
public ScheduledFuture<?> schedule(Runnable command, long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();//decorateTask()方法默认是一个空方法,留给用户自己拓展,最后返回的是参数中new出来的RunnableScheduledFuture对象RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));delayedExecute(t);return t;}
接着提交任务,这里第一次任务提交不同与普通线程池(直接创建线程执行),而是把任务直接放到阻塞队列DelayedWorkQueue中
private void delayedExecute(RunnableScheduledFuture<?> task) {//如果线程池已关闭,拒绝执行if (isShutdown())reject(task);else {//将任务放入到DelayedWorkQueue中super.getQueue().add(task);//再次判断线程池有没有关闭if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))ask.cancel(false);else//判断当前线程池是否少于核心线程数,是则调用ensurePrestart();}}
判断是否需要创建线程,接着调用熟悉的addWorker()方法,创建线程并开始执行任务(这里并没有传任务进去,线程直接从阻塞队列里面拿)
void ensurePrestart() {int wc = workerCountOf(ctl.get());//如果线程数小于corePoolSizeif (wc < corePoolSize)addWorker(null, true);//线程数为0else if (wc == 0)addWorker(null, false);}
scheduleAtFixedRate() 周期任务
大体逻辑与上面一样,把SchduledFutureTask的outerTask属性的赋值为新创建的任务,后续实现周期性执行,并且为period属性赋值为参数传入值
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, //初始延迟时间long period, //间隔时间TimeUnit unit) { //时间单位if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));//交给用户自己拓展,返回上面创建的ScheduledFutureTask对象RunnableScheduledFuture<Void> t = decorateTask(command, sft);//将outerTask属性赋值给上面创建的任务,实现周期性执行sft.outerTask = t;delayedExecute(t);return t;}
scheduleWithFixedDelay() 周期延时任务
逻辑上与周期任务一样,不过把period属性设置为参数传入值的相反数(即为一个负数)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,//初始延时时间long delay, //周期延时时间TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay)); //把period值设为传入的delay参数的相反数RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}
任务执行
当线程创建完成后,就会从阻塞队列里获取任务,调用ScheduledFutureTask的run()开始执行任务。
run()
如果是普通延时任务,直接执行然后退出,否则需要将outerTask属性的任务重新放入到阻塞队列,等待下一次周期执行
public void run() {//判断period是否不为0boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);//如果period = 0,则是普通延时任务,直接执行else if (!periodic)ScheduledFutureTask.super.run();//如果period != 0,则在任务执行完之后重新把outerTask属性中的任务重新放到阻塞队列中else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}}
setNextRunTime()
设置下次执行时间
- period > 0,说明是周期任务,下次任务执行时间为 time += period
- period < 0,说明是周期延时任务,下次任务执行时间为 time = now() + period(即 当上一个任务执行完之后等待period时间后再执行)
private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}
reExecutePeriodic()
重新将任务加入到阻塞队列
void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {//重新将任务加入到阻塞队列super.getQueue().add(task);if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);elseensurePrestart();}}
Timer与ScheduedThreadPoolExecutor
除了定时线程池能够执行定时任务外,JDK中util包下的Timer类也有类似功能,但阿里规范中并不推荐使用该类定制定时任务
执行下面代码,当抛出一个异常会直接报错
原因是Timer是单线程模式,在创建Timer的时候自动创建了一条线程,当执行任务出现异常没有进行异常处理,该线程直接退出
private final TimerThread thread = new TimerThread(queue);
线程池中,在执行 processWorkerExit() 对线程进行销毁中,会对任务执行中是否出现异常进行判断处理,如果出现了异常会重新调用 addWorker() 方法,重新往线程池中添加新的线程
上面代码中,虽然没有报错,但线程却停止执行了,虽然线程池可以对线程数进行控制,但是直接抛出异常,导致任务丢失了,所以在任务中一定要对异常进行try/catch处理,否则会造成任务丢失。
