1、ScheduledThreadPoolExecutor类图

ScheduledThreadPoolExecutor - 图1
从类图可以看出ScheduledThreadPoolExecutor类继承了ThreadPoolExector类,就是说ScheduledThreadPoolExecutor拥有execute()和submit()提交异步任务的基础功能,同时实现了一个新接口:ScheduledExecutorService,ScheduledExecutorService接口继承的是ExecutorService接口,下面我们看看ScheduledExecutorService接口里有哪些方法:
ScheduledThreadPoolExecutor - 图2
其中schedule方法是用来执行线程任务的,接收Runnable类型和Callable类型的任务;scheduleAtFixRate和scheduleWithFixedDelay方法是用来执行周期性任务和周期性延迟任务的,具体在下面会介绍。

2、ScheduledThreadPoolExecutor类详细介绍

2.1 构造函数

源码如下:

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. };
  5. public ScheduledThreadPoolExecutor(int corePoolSize,
  6. ThreadFactory threadFactory) {
  7. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  8. new DelayedWorkQueue(), threadFactory);
  9. };
  10. public ScheduledThreadPoolExecutor(int corePoolSize,
  11. RejectedExecutionHandler handler) {
  12. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  13. new DelayedWorkQueue(), handler);
  14. };
  15. public ScheduledThreadPoolExecutor(int corePoolSize,
  16. ThreadFactory threadFactory,
  17. RejectedExecutionHandler handler) {
  18. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  19. new DelayedWorkQueue(), threadFactory, handler);
  20. }

可以看出ScheduledThreadPoolExecutor类的构造函数都是用的父类的构造函数,即ThreadPoolExecutor类的构造函数,且传入的参数都很固定:corePoolSize、maxPoolSize(Integer.MAX_VALUE)、keepAliveTime(0),阻塞队列(DelayedWorkQueue),关于这个DelayedWorkQueue下面介绍。
初始化ScheduledThreadPoolExecutor类除了用其构造函数外,还可以使用Excutors类提供的工厂方法:

  1. public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  2. return new DelegatedScheduledExecutorService
  3. (new ScheduledThreadPoolExecutor(1));
  4. }
  5. public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
  6. return new DelegatedScheduledExecutorService
  7. (new ScheduledThreadPoolExecutor(1, threadFactory));
  8. }
  9. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  10. return new ScheduledThreadPoolExecutor(corePoolSize);
  11. }
  12. public static ScheduledExecutorService newScheduledThreadPool(
  13. int corePoolSize, ThreadFactory threadFactory) {
  14. return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
  15. }

提供了初始化两种类型的周期性线程池,只是初始化参数不同:

  • ScheduledThreadPoolExecutor:可以执行并行任务也就是多条线程同时执行
  • SingleThreadScheduledExecutor:可以执行单条线程

其底层都是用的ScheduledThreadPoolExecutor类的构造方法实现的,而ScheduledThreadPoolExecutor类的构造方法是用的ThreadPoolExecutor类的构造方法。

2.2 特有方法

  1. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
  2. public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
  3. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit);
  4. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

(1) ScheduledFuture<?>schedule(Runnable command,long delay, TimeUnit unit)
该方法是执行延迟任务,即达到给定的延迟时间:delay后才执行线程任务(仅执行一次),且接收的是实现Runnable接口的类的实例。
(2) ScheduledFutureschedule(Callable callable,long delay, TimeUnit unit)
该方法也是执行延迟任务(仅执行一次),与上面方法唯一不同的是接收的线程任务的类型不同,该方法接收的是实现了Callabel接口的类的实例。
(3)ScheduledFuture<?>scheduleAtFixedRate(Runnable command,long initialDelay,long period, TimeUnit unit)
该方法是在固定延迟后,周期性的执行线程池任务,入参中initialDelay是固定延迟,超过这个时间后,线程池开始执行任务;period是执行周期,准确地说是上一次开始执行任务的时刻到下一次开始执行任务的时刻中间的时间间隔,当时间间隔大于任务执行时间时,时间间隔还是以period为准,当时间间隔小于任务执行时间时,时间间隔是以任务执行时间为准。
(4)ScheduledFuture<?>scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit)
该方法是在固定延迟后,周期性延迟执行线程池任务,入参中initialDelay是固定延迟,超过这个时间后,线程池开始执行任务;delay是周期性延迟的时间,准确地说是上一次任务结束的时刻到下一次任务开始的时刻中间的时间间隔;与scheduleAtFixedRate不同的是,当delay大于或者小于任务执行时间时,延迟间隔还是按delay走,不像scheduleAtFixedRate一样周期会改变。
有关(3)和(4)的区分可以参考链接2。

2.3 两个内部类:DelayedWorkQueue和ScheduledFutureTask

1)DelayedWorkQueue
DelayedWorkQueue是ScheduledThreadPoolExecutor线程池使用的阻塞队列,DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的,具体我没仔细看…
(2)ScheduledFutureTask
ScheduledThreadPoolExecutor类的schedule方法的源码如下:

  1. public ScheduledFuture<?> schedule(Runnable command,
  2. long delay,
  3. TimeUnit unit) {
  4. if (command == null || unit == null)
  5. throw new NullPointerException();
  6. RunnableScheduledFuture<?> t = decorateTask(command,
  7. new ScheduledFutureTask<Void>(command, null,
  8. triggerTime(delay, unit)));
  9. delayedExecute(t);
  10. return t;
  11. }

在schedule方法里,会通过decorateTask方法将传入的Runnable类型的任务转化成ScheduledFutureTask任务,ScheduledFutureTask继承了FutureTask并实现了RunnableScheduledFuture接口,为了保证ScheduledThreadPoolExecutor能够延时执行任务以及能够周期性执行任务,ScheduledFutureTask重写了run方法,源码如下:

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        //如果不是周期性执行任务,则直接调用run方法
        ScheduledFutureTask.super.run();
        //如果是周期性执行任务的话,需要重设下一次执行任务的时间
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }

这块我没细看。

3、使用举例

package ThreadExecutorPool;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
/**
 * @ClassName ScheduledThreadPoolExecutorDemo
 * @Description TODO
 * @Auther Jerry
 * @Date 2020/3/24 - 23:49
 * @Version 1.0
 */
public class ScheduledThreadPoolExecutorDemo {
    public static void main(String[] args) {
        // 工厂方法初始化周期性线程池
        ScheduledExecutorService threadPool1 = Executors.newScheduledThreadPool(5);
        // 初始化延迟任务
        DelayTask delayTask = new DelayTask();
        SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss z");
        Date date = new Date(System.currentTimeMillis());
        System.out.println(Thread.currentThread().getName() + "当前时刻:" + formatter.format(date));
        ScheduledFuture <Integer> result = threadPool1.schedule(delayTask, 5, TimeUnit.SECONDS);
        try {
            System.out.println(Thread.currentThread().getName() + " 执行结果:" + result.get());
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        catch (ExecutionException e)
        {
            e.printStackTrace();
        }
        // 工厂方法初始化周期性线程池
        ScheduledExecutorService threadPool2 = Executors.newSingleThreadScheduledExecutor();
        // 初始话周期性任务
        PeriodTask periodTask = new PeriodTask();
        threadPool2.scheduleAtFixedRate(periodTask, 0, 2, TimeUnit.SECONDS);
    }
    static class DelayTask implements Callable<Integer> {
        int sum = 0;
        @Override
        public Integer call() throws Exception {
            SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss z");
            Date date = new Date(System.currentTimeMillis());
            System.out.println(Thread.currentThread().getName() + "延迟后开始执行任务的时刻:" + formatter.format(date));
            for (int i = 0; i < 100; ++i) {
                sum += i;
            }
            return sum;
        }
    }
    static class PeriodTask implements Runnable{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "我要离职");
        }
    }
}

执行结果如下:

main当前时刻:2020-03-27 at 00:58:41 CST
pool-1-thread-1延迟后开始执行任务的时刻:2020-03-27 at 00:58:46 CST
main 执行结果:4950
pool-2-thread-1我要离职
pool-2-thread-1我要离职
pool-2-thread-1我要离职
pool-2-thread-1我要离职
pool-2-thread-1我要离职
pool-2-thread-1我要离职
...

参考

线程池之ScheduledThreadPoolExecutor
ScheduledExecutorService的scheduleAtFixedRate和scheduleWithFixedDelay方法的区别