ScheduledExecutorService 继承图

image.png

ScheduledExecutorService的使用

  1. public static void main(String[] args) {
  2. ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
  3. // 单次执行
  4. executor.schedule(()-> {
  5. logger.info("delay 1000");
  6. }, 2, TimeUnit.SECONDS);
  7. // 循环执行
  8. executor.scheduleAtFixedRate(() -> {
  9. try {
  10. Thread.sleep(3000);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. logger.info("scheduleAtFixedRate");
  15. }, 1, 2, TimeUnit.SECONDS);
  16. // 循环执行
  17. executor.scheduleWithFixedDelay(() -> {
  18. try {
  19. Thread.sleep(3000);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. logger.info("scheduleWithFixedDelay");
  24. }, 1, 2, TimeUnit.SECONDS);
  25. }

scheduleAtFixedRatescheduleWithFixedDelay 的区别
如上所示,假设一个task的执行周期过长(例3秒),scheduleAtFixedRate 会在(period)之后,将Task塞到等待队列中,而 scheduleWithFixedDelay 则是在执行完成之后,等待(delay)秒之后,在将Task塞到等待队列中

  1. // scheduleAtFixedRate 的输出,从打印时间可以看出,
  2. // 周期为3秒(即任务的执行task,因为period小于执行周期,已经塞到了队列中等待执行)
  3. 八月 31, 2020 2:20:38 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
  4. 信息: scheduleAtFixedRate
  5. 八月 31, 2020 2:20:41 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
  6. 信息: scheduleAtFixedRate
  7. 八月 31, 2020 2:20:44 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
  8. 信息: scheduleAtFixedRate
  9. 八月 31, 2020 2:20:47 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
  10. 信息: scheduleAtFixedRate
  11. 八月 31, 2020 2:20:50 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
  12. 信息: scheduleAtFixedRate
  13. // scheduleWithFixedDelay的输出,从打印时间可以看出
  14. // 周期为5秒(任务的执行周期3s+delay2s),即等任务执行完成之后,经过delay之后,再执行下一个task
  15. 八月 31, 2020 2:22:05 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
  16. 信息: scheduleWithFixedDelay
  17. 八月 31, 2020 2:22:10 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
  18. 信息: scheduleWithFixedDelay
  19. 八月 31, 2020 2:22:15 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
  20. 信息: scheduleWithFixedDelay
  21. 八月 31, 2020 2:22:20 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
  22. 信息: scheduleWithFixedDelay
  23. 八月 31, 2020 2:22:25 下午 cn.spectrumrpc.juc.ScheduledExecutorTests lambda$main$0
  24. 信息: scheduleWithFixedDelay


ScheduledExecutorService 源码分析

先看看 ScheduledFutureTask的组成

ScheduledExecutorService内部将Runnable封装成 ScheduledFutureTask

重要属性

  1. // 任务的序列号
  2. private final long sequenceNumber;
  3. // 延时的时间
  4. private long time;
  5. // 任务的周期
  6. private final long period;
  7. // 执行的任务
  8. RunnableScheduledFuture<V> outerTask = this;
  9. // 堆顶的下标
  10. int heapIndex;

ScheduledFutureTask 的排序规则

  1. public int compareTo(Delayed other) {
  2. if (other == this) // compare zero if same object
  3. return 0;
  4. if (other instanceof ScheduledFutureTask) {
  5. ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
  6. long diff = time - x.time;
  7. // 先比较任务的时间
  8. if (diff < 0)
  9. return -1;
  10. else if (diff > 0)
  11. return 1;
  12. // 任务的时间比较完成之后,再比较任务的序列号
  13. else if (sequenceNumber < x.sequenceNumber)
  14. return -1;
  15. else
  16. return 1;
  17. }
  18. long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
  19. return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
  20. }

从构造函数开始

调用父类的构造方法

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }

因为 ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor 即调用 ThreadPoolExecutor 的构造方法

schedule() 方法

  1. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  2. // 校验参数
  3. if (command == null || unit == null)
  4. throw new NullPointerException();
  5. //1. 将 command(Runnable)封装成ScheduledFutureTask
  6. //2. decorateTask() 直接返回第二个参数,没啥用
  7. RunnableScheduledFuture<?> t = decorateTask(command,
  8. new ScheduledFutureTask<Void>(command, null,
  9. triggerTime(delay, unit)));
  10. //3. 延时执行
  11. delayedExecute(t);
  12. return t;
  13. }

delayedExecute()

  1. private void delayedExecute(RunnableScheduledFuture<?> task) {
  2. // 判断当前的线程池状态是否是shutdown的,如果是,采取拒绝策略,直接拒绝
  3. if (isShutdown())
  4. reject(task);
  5. else {
  6. // 否则,线程池状态良好,获取workQueue,将task加入到workQueue中
  7. super.getQueue().add(task);
  8. // 因为有上下文的切换,所以,这里再判断一次线程池的状态
  9. // 如果线程池已经shutdown,或者,在当前的runState下不能执行入任务的话,就cancel取消人物
  10. if (isShutdown() &&
  11. // 因为是 schedule方法,period =0,所以 task.isPeriodic() = false
  12. // 执行 isRunningOrShutdown(true),
  13. // rs == RUNNING || (rs == SHUTDOWN && shutdownOK)
  14. // 所以,在shutdown状态的话,schedule可以执行,此判断不成立,执行ensurePrestart
  15. !canRunInCurrentRunState(task.isPeriodic()) &&
  16. // 否则,其他状态下,将任务移除
  17. remove(task))
  18. // 并取消future
  19. task.cancel(false);
  20. else
  21. ensurePrestart();
  22. }
  23. }

ensurePrestart()

  1. void ensurePrestart() {
  2. // 获取当前的workerCount
  3. int wc = workerCountOf(ctl.get());
  4. // 如果小于核心线程数,addWroker创建线程执行任务
  5. if (wc < corePoolSize)
  6. addWorker(null, true);
  7. // 如果workerCount == 0,说明是第一次
  8. else if (wc == 0)
  9. addWorker(null, false);
  10. }

添加完worker之后,会执行runWorker方法,循环从workQueue中取出task,执行这个task


scheduleAtFixedRate()

  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  2. long initialDelay,
  3. long period,
  4. TimeUnit unit) {
  5. // 校验参数
  6. if (command == null || unit == null)
  7. throw new NullPointerException();
  8. // 校验参数
  9. if (period <= 0)
  10. throw new IllegalArgumentException();
  11. // 将runnable封装成 ScheduledFutureTask
  12. ScheduledFutureTask<Void> sft =
  13. new ScheduledFutureTask<Void>(command,
  14. null,
  15. triggerTime(initialDelay, unit),
  16. unit.toNanos(period));
  17. // 直接返回 sft,啥都没干
  18. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  19. // 将sft的 outerTask 设置为 t(自己设置自己)
  20. sft.outerTask = t;
  21. // 延时执行
  22. delayedExecute(t);
  23. return t;
  24. }

delayedExecute()

  1. private void delayedExecute(RunnableScheduledFuture<?> task) {
  2. // 判断当前的线程池状态是否是shutdown的,如果是,采取拒绝策略,直接拒绝
  3. if (isShutdown())
  4. reject(task);
  5. else {
  6. // 否则,线程池状态良好,获取workQueue,将task加入到workQueue中
  7. super.getQueue().add(task);
  8. // 因为有上下文的切换,所以,这里再判断一次线程池的状态
  9. // 如果线程池已经shutdown,或者,在当前的runState下不能执行入任务的话,就cancel取消人物
  10. if (isShutdown() &&
  11. // 因为是 scheduleAtFixedRate,此时的period>0,所以 task.isPeriodic() == true
  12. // 此时执行的就是isRunningOrShutdown(false)
  13. // rs == RUNNING || (rs == SHUTDOWN && shutdownOK) 这个判断就再也不成立了
  14. // 即返回false,因为在shutdown状态了,已经不能再提交新的task,这是个周期任务,已经
  15. // 无法执行了,所以在scheduleAtFixedRate中,线程池shutdown下,将会remove这个任务
  16. // 并cancel掉
  17. !canRunInCurrentRunState(task.isPeriodic()) &&
  18. remove(task))
  19. task.cancel(false);
  20. else
  21. // 否则,线程池状态良好,执行 ensurePrestart,创建worker,执行task
  22. ensurePrestart();
  23. }
  24. }

scheduleWithFixedDelay()

  1. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  2. long initialDelay,
  3. long delay,
  4. TimeUnit unit) {
  5. // 校验参数
  6. if (command == null || unit == null)
  7. throw new NullPointerException();
  8. // 校验参数
  9. if (delay <= 0)
  10. throw new IllegalArgumentException();
  11. // 封装 ScheduledFutureTask
  12. ScheduledFutureTask<Void> sft =
  13. new ScheduledFutureTask<Void>(command,
  14. null,
  15. triggerTime(initialDelay, unit),
  16. unit.toNanos(-delay));
  17. // 返回sft
  18. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  19. // 将outerTask指向自己
  20. sft.outerTask = t;
  21. // 延时执行
  22. delayedExecute(t);
  23. return t;
  24. }

delayedExecute() 和 scheduleAtFixedRate() 中的逻辑一样,只不过,period 变成了负数


添加完Worker之后,执行Task,执行的是 ScheduledFutureTask 这个Task中的 run() 方法

ScheduledFutureTask#run

  1. public void run() {
  2. // 获取当前的Task是否是循环的Task
  3. // 在schedule中为false,其他均为true
  4. boolean periodic = isPeriodic();
  5. // 如果是 scheduleAtFixedRate/scheduleWithFixedDelay
  6. // 且线程池 shutdown 的情况下,这个if成立,取消这个Future
  7. if (!canRunInCurrentRunState(periodic))
  8. cancel(false);
  9. // 否则的话,就不是周期性任务,调用父类的run方法,执行任务(schedule的逻辑)
  10. else if (!periodic)
  11. ScheduledFutureTask.super.run();
  12. // 否则的话,就是周期性任务,调用父类的 runAndReset方法,执行任务
  13. // (scheduleAtFixedRate/scheduleWithFixedDelay) 的逻辑
  14. else if (ScheduledFutureTask.super.runAndReset()) {
  15. setNextRunTime();
  16. reExecutePeriodic(outerTask);
  17. }
  18. }

cancel(false)

  1. public boolean cancel(boolean mayInterruptIfRunning) {
  2. boolean cancelled = super.cancel(mayInterruptIfRunning);
  3. if (cancelled && removeOnCancel && heapIndex >= 0)
  4. remove(this);
  5. return cancelled;
  6. }

setNextRunTime()

  1. private void setNextRunTime() {
  2. long p = period;
  3. if (p > 0)
  4. time += p;
  5. else
  6. time = triggerTime(-p);
  7. }

reExecutePeriodic(outerTask)

  1. void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  2. // 判断当前的线程池状态是否已经跑
  3. // 因为跑到了这个函数,必然是一个周期性的任务,不存在不是周期性的任务
  4. // canRunInCurrentRunState(true),在线程池shutdown的时候就是false,再也不能执行
  5. // 正常情况下(running状态),则会将task重新加入workQueue中等下下一次的执行
  6. if (canRunInCurrentRunState(true)) {
  7. super.getQueue().add(task);
  8. if (!canRunInCurrentRunState(true) && remove(task))
  9. task.cancel(false);
  10. else
  11. ensurePrestart();
  12. }
  13. }