线程池的具体实现有两种,分别是ThreadPoolExecutor 和ScheduledThreadPoolExecutor 普通线程池和定时线程池,上一篇已经分析过ThreadPoolExecutor原理与使用了,本篇我们来重点分析下ScheduledThreadPoolExecutor。

ScheduledThreadPoolExecutor

微信截图_20210711162317.png
从ScheduledThreadPoolExecutor的类图可以看出,其继承了ThreadPoolExecutor,并在这基础上做了功能的拓展和封装。

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
  3. }
  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue) {
  6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7. Executors.defaultThreadFactory(), defaultHandler);
  8. }

从上面可以看出,其构造方法和普通线程池的方法一样,只不过定时线程池运用内部封装的DelayedWorkQueue作为阻塞队列,以ScheduledFutureTask类型任务作为线程池调度的最小单位。

DelayedWorkQueue

微信截图_20210711164153.png
DelayedWorkQueue也是一种设计为定时任务的延迟队列,它的实现和DelayQueue一样,不过是将优先级队列和DelayQueue的实现过程迁移到本身方法体中,从而可以在该过程当中灵活的加入定时任务特有的方法调用
1. DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若 time相同则根据sequenceNumber排序;
2. DelayQueue也是一个无界队列

更多内容可以可以参考之前的文章《阻塞队列-DelayedQueue》

ScheduledFutureTask

微信截图_20210711172457.png
ScheduledFutureTask继承了FutureTask,可以通过调用get()方法获取任务执行的结果。并且实现了Comparable接口,通过比较时间大小实现优先级排序

SchduledFutureTask主要接收的参数

  1. private long time; //任务开始的时间
  2. private final long sequenceNumber; //任务的序号,任务添加到ScheduledThreadPoolExecutor中会分配唯一的序列号
  3. private final long period; //任务执行的时间间隔
  4. RunnableScheduledFuture<V> outerTask = this //下次执行的任务,该属性是实现周期执行的关键
  1. ScheduledFutureTask(Runnable r, V result, long ns) {
  2. super(r, result);
  3. this.time = ns;
  4. this.period = 0;
  5. this.sequenceNumber = sequencer.getAndIncrement();
  6. }
  7. -------------------
  8. ScheduledFutureTask(Runnable r, V result, long ns, long period) {
  9. super(r, result);
  10. this.time = ns;
  11. this.period = period;
  12. this.sequenceNumber = sequencer.getAndIncrement();
  13. }

SchduledFutureTask的排序实现

  1. 首先按照time排序,time小的排在前面,time大的排在后面;
  2. 如果time相同,按照sequenceNumber排序,sequenceNumber小的排在前面,sequenceNumber大的排在后面,换句话说,如果两个task的执行时间相同, 优先执行先提交的task。
    1. public int compareTo(Delayed other) {
    2. if (other == this) // compare zero if same object
    3. return 0;
    4. if (other instanceof ScheduledFutureTask) {
    5. ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
    6. long diff = time - x.time;
    7. if (diff < 0)
    8. return -1;
    9. else if (diff > 0)
    10. return 1;
    11. else if (sequenceNumber < x.sequenceNumber)
    12. return -1;
    13. else
    14. return 1;
    15. }
    16. long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    17. return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    18. }

SchduledFutureTask计算任务开始执行时间
当前时间加上延迟时间,其中如果delay小于 Long.MAX_VALUE/2,则直接返回delay,否则需要处理溢出的情况。

  1. long triggerTime(long delay) {
  2. return now() +
  3. ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
  4. }

工作原理

微信截图_20210711224309.png

任务提交

SchduledFutureTask提交任务的方式有三种

schedule():任务在指定延迟时间到达后触发,只会执行一次
scheduledAtFixedRate():固定周期执行任务(不管上一个任务有没有执行完,只要时间周期到了就将任务放到阻塞队列中)
scheduledWithFixedDelay():固定延时执行任务(与任务执行时间有关,在上一个任务执行完后延时指定时间,再开始执行下一个任务)

schedule() 普通延时任务

调用decorateTask()进行包装,该方法是留给用户去扩展的,默认是个空方法 ,返回的是参数中new出来的SchduledFutureTask对象

  1. public ScheduledFuture<?> schedule(Runnable command, long delay,
  2. TimeUnit unit) {
  3. if (command == null || unit == null)
  4. throw new NullPointerException();
  5. //decorateTask()方法默认是一个空方法,留给用户自己拓展,最后返回的是参数中new出来的RunnableScheduledFuture对象
  6. RunnableScheduledFuture<?> t = decorateTask(command,
  7. new ScheduledFutureTask<Void>(command, null,
  8. triggerTime(delay, unit)));
  9. delayedExecute(t);
  10. return t;
  11. }

接着提交任务,这里第一次任务提交不同与普通线程池(直接创建线程执行),而是把任务直接放到阻塞队列DelayedWorkQueue中

  1. private void delayedExecute(RunnableScheduledFuture<?> task) {
  2. //如果线程池已关闭,拒绝执行
  3. if (isShutdown())
  4. reject(task);
  5. else {
  6. //将任务放入到DelayedWorkQueue中
  7. super.getQueue().add(task);
  8. //再次判断线程池有没有关闭
  9. if (isShutdown() &&
  10. !canRunInCurrentRunState(task.isPeriodic()) &&
  11. remove(task))
  12. ask.cancel(false);
  13. else
  14. //判断当前线程池是否少于核心线程数,是则调用
  15. ensurePrestart();
  16. }
  17. }

判断是否需要创建线程,接着调用熟悉的addWorker()方法,创建线程并开始执行任务(这里并没有传任务进去,线程直接从阻塞队列里面拿)

  1. void ensurePrestart() {
  2. int wc = workerCountOf(ctl.get());
  3. //如果线程数小于corePoolSize
  4. if (wc < corePoolSize)
  5. addWorker(null, true);
  6. //线程数为0
  7. else if (wc == 0)
  8. addWorker(null, false);
  9. }

scheduleAtFixedRate() 周期任务

大体逻辑与上面一样,把SchduledFutureTask的outerTask属性的赋值为新创建的任务,后续实现周期性执行,并且为period属性赋值为参数传入值

  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  2. long initialDelay, //初始延迟时间
  3. long period, //间隔时间
  4. TimeUnit unit) { //时间单位
  5. if (command == null || unit == null)
  6. throw new NullPointerException();
  7. if (period <= 0)
  8. throw new IllegalArgumentException();
  9. ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,
  10. null,
  11. triggerTime(initialDelay, unit),
  12. unit.toNanos(period));
  13. //交给用户自己拓展,返回上面创建的ScheduledFutureTask对象
  14. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  15. //将outerTask属性赋值给上面创建的任务,实现周期性执行
  16. sft.outerTask = t;
  17. delayedExecute(t);
  18. return t;
  19. }

scheduleWithFixedDelay() 周期延时任务

逻辑上与周期任务一样,不过把period属性设置为参数传入值的相反数(即为一个负数)

  1. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  2. long initialDelay,//初始延时时间
  3. long delay, //周期延时时间
  4. TimeUnit unit) {
  5. if (command == null || unit == null)
  6. throw new NullPointerException();
  7. if (delay <= 0)
  8. throw new IllegalArgumentException();
  9. ScheduledFutureTask<Void> sft =
  10. new ScheduledFutureTask<Void>(command,
  11. null,
  12. triggerTime(initialDelay, unit),
  13. unit.toNanos(-delay)); //把period值设为传入的delay参数的相反数
  14. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  15. sft.outerTask = t;
  16. delayedExecute(t);
  17. return t;
  18. }

任务执行

当线程创建完成后,就会从阻塞队列里获取任务,调用ScheduledFutureTask的run()开始执行任务。

run()

如果是普通延时任务,直接执行然后退出,否则需要将outerTask属性的任务重新放入到阻塞队列,等待下一次周期执行

  1. public void run() {
  2. //判断period是否不为0
  3. boolean periodic = isPeriodic();
  4. if (!canRunInCurrentRunState(periodic))
  5. cancel(false);
  6. //如果period = 0,则是普通延时任务,直接执行
  7. else if (!periodic)
  8. ScheduledFutureTask.super.run();
  9. //如果period != 0,则在任务执行完之后重新把outerTask属性中的任务重新放到阻塞队列中
  10. else if (ScheduledFutureTask.super.runAndReset()) {
  11. setNextRunTime();
  12. reExecutePeriodic(outerTask);
  13. }
  14. }
  15. }

setNextRunTime()

设置下次执行时间

  1. period > 0,说明是周期任务,下次任务执行时间为 time += period
  2. period < 0,说明是周期延时任务,下次任务执行时间为 time = now() + period(即 当上一个任务执行完之后等待period时间后再执行)
    1. private void setNextRunTime() {
    2. long p = period;
    3. if (p > 0)
    4. time += p;
    5. else
    6. time = triggerTime(-p);
    7. }

reExecutePeriodic()

重新将任务加入到阻塞队列

  1. void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  2. if (canRunInCurrentRunState(true)) {
  3. //重新将任务加入到阻塞队列
  4. super.getQueue().add(task);
  5. if (!canRunInCurrentRunState(true) && remove(task))
  6. task.cancel(false);
  7. else
  8. ensurePrestart();
  9. }
  10. }

Timer与ScheduedThreadPoolExecutor

除了定时线程池能够执行定时任务外,JDK中util包下的Timer类也有类似功能,但阿里规范中并不推荐使用该类定制定时任务
微信截图_20210711225721.png
执行下面代码,当抛出一个异常会直接报错
微信截图_20210711230759.png

原因是Timer是单线程模式,在创建Timer的时候自动创建了一条线程,当执行任务出现异常没有进行异常处理,该线程直接退出

  1. private final TimerThread thread = new TimerThread(queue);

线程池中,在执行 processWorkerExit() 对线程进行销毁中,会对任务执行中是否出现异常进行判断处理,如果出现了异常会重新调用 addWorker() 方法,重新往线程池中添加新的线程
微信截图_20210711232235.png
上面代码中,虽然没有报错,但线程却停止执行了,虽然线程池可以对线程数进行控制,但是直接抛出异常,导致任务丢失了,所以在任务中一定要对异常进行try/catch处理,否则会造成任务丢失