1 简介

通过 ThreadPoolExecutor 可以实现各式各样的自定义线程池,而 ScheduledThreadPoolExecutor 类则在自定义线程池的基础上增加了周期性执行任务的功能。

Java 定时任务可以用Timer + TimerTask来做,或者使用ScheduledExecutorService,使用ScheduledExecutorService有两个好处:
1. 如果任务执行时间过长,TimerTask会出现延迟执行的情况。比如,第一任务在1000ms执行了4000ms,第二个任务定时在2000ms开始执行。这里由于第一个任务要执行4000,所以第二个任务实际在5000ms开始执行。这是由于Timer是单线程,且顺序执行提交的任务
2. 如果执行任务抛出异常,Timer是不会执行会后面的任务的
使用ScheduledExecutorService可以避免上面两种情况,因为ScheduledExecutorService是线程池,有多个线程执行。

1.1 ScheduledThreadPoolExecutor 示例:

  1. package com.example.springdemo.com.threadDemo.scheduled;
  2. import java.time.LocalDateTime;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.ScheduledExecutorService;
  5. import java.util.concurrent.TimeUnit;
  6. /**
  7. *
  8. * @description:
  9. * @author: zhangqing
  10. * @date: 2021-08-24
  11. */
  12. public class ScheduledThreadPoolExecutorTest {
  13. public static void main(String[] args) throws InterruptedException {
  14. ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
  15. /**命令–要执行的任务
  16. initialDelay–延迟第一次执行的时间
  17. 延迟–一次执行终止与下一次执行开始之间的延迟
  18. unit–initialDelay和delay参数的时间单位
  19. */
  20. service.scheduleWithFixedDelay(new ScheduledThread("scheduleWithFixedDelay工作"), 3, 5, TimeUnit.SECONDS);
  21. Thread.sleep(20000);
  22. service.shutdown();
  23. service = Executors.newScheduledThreadPool(3);
  24. /**
  25. * scheduleAtFixedRate 中的 delayTime/period 表示从线程池中首先开始执行的线程算起,假设period为1s,
  26. * 若线程执行了5s,那么下一个线程在第一个线程运行完后会很快被执行。
  27. */
  28. service.scheduleAtFixedRate(new ScheduledThread("scheduleAtFixedRate工作"), 3, 5, TimeUnit.SECONDS);
  29. Thread.sleep(20000);
  30. service.shutdown();
  31. }
  32. }
  33. class ScheduledThread implements Runnable {
  34. private String work;
  35. public ScheduledThread(String work) {
  36. this.work = work;
  37. }
  38. @Override
  39. public void run() {
  40. try {
  41. System.out.println(Thread.currentThread().getName() + " ;start time" + LocalDateTime.now());
  42. System.out.println(work);
  43. Thread.sleep(8000);
  44. System.out.println(Thread.currentThread().getName() + " ;end time" + LocalDateTime.now());
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }

输出结果:

  1. "C:\Program Files\Java\jdk1.8.0_291\bin\java.exe" -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:52098,suspend=y,server=n -javaagent:C:\Users\zhangqing\AppData\Local\JetBrains\IntelliJIdea2021.1\captureAgent\debugger-agent.jar -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_291\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\rt.jar;E:\demo\spring\target\classes;D:\mvnRespo\org\springframework\boot\spring-boot-starter-web\2.5.3\spring-boot-starter-web-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter\2.5.3\spring-boot-starter-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot\2.5.3\spring-boot-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot-autoconfigure\2.5.3\spring-boot-autoconfigure-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter-logging\2.5.3\spring-boot-starter-logging-2.5.3.jar;D:\mvnRespo\ch\qos\logback\logback-classic\1.2.4\logback-classic-1.2.4.jar;D:\mvnRespo\ch\qos\logback\logback-core\1.2.4\logback-core-1.2.4.jar;D:\mvnRespo\org\apache\logging\log4j\log4j-to-slf4j\2.14.1\log4j-to-slf4j-2.14.1.jar;D:\mvnRespo\org\apache\logging\log4j\log4j-api\2.14.1\log4j-api-2.14.1.jar;D:\mvnRespo\org\slf4j\jul-to-slf4j\1.7.32\jul-to-slf4j-1.7.32.jar;D:\mvnRespo\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\mvnRespo\org\yaml\snakeyaml\1.28\snakeyaml-1.28.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter-json\2.5.3\spring-boot-starter-json-2.5.3.jar;D:\mvnRespo\com\fasterxml\jackson\core\jackson-databind\2.12.4\jackson-databind-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\core\jackson-annotations\2.12.4\jackson-annotations-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\core\jackson-core\2.12.4\jackson-core-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.12.4\jackson-datatype-jdk8-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.12.4\jackson-datatype-jsr310-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\module\jackson-module-parameter-names\2.12.4\jackson-module-parameter-names-2.12.4.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter-tomcat\2.5.3\spring-boot-starter-tomcat-2.5.3.jar;D:\mvnRespo\org\apache\tomcat\embed\tomcat-embed-core\9.0.50\tomcat-embed-core-9.0.50.jar;D:\mvnRespo\org\apache\tomcat\embed\tomcat-embed-el\9.0.50\tomcat-embed-el-9.0.50.jar;D:\mvnRespo\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.50\tomcat-embed-websocket-9.0.50.jar;D:\mvnRespo\org\springframework\spring-web\5.3.9\spring-web-5.3.9.jar;D:\mvnRespo\org\springframework\spring-beans\5.3.9\spring-beans-5.3.9.jar;D:\mvnRespo\org\springframework\spring-webmvc\5.3.9\spring-webmvc-5.3.9.jar;D:\mvnRespo\org\springframework\spring-aop\5.3.9\spring-aop-5.3.9.jar;D:\mvnRespo\org\springframework\spring-context\5.3.9\spring-context-5.3.9.jar;D:\mvnRespo\org\springframework\spring-expression\5.3.9\spring-expression-5.3.9.jar;D:\mvnRespo\com\google\guava\guava\28.1-jre\guava-28.1-jre.jar;D:\mvnRespo\com\google\guava\failureaccess\1.0.1\failureaccess-1.0.1.jar;D:\mvnRespo\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;D:\mvnRespo\com\google\code\findbugs\jsr305\3.0.2\jsr305-3.0.2.jar;D:\mvnRespo\org\checkerframework\checker-qual\2.8.1\checker-qual-2.8.1.jar;D:\mvnRespo\com\google\errorprone\error_prone_annotations\2.3.2\error_prone_annotations-2.3.2.jar;D:\mvnRespo\com\google\j2objc\j2objc-annotations\1.3\j2objc-annotations-1.3.jar;D:\mvnRespo\org\codehaus\mojo\animal-sniffer-annotations\1.18\animal-sniffer-annotations-1.18.jar;D:\mvnRespo\org\slf4j\slf4j-api\1.7.32\slf4j-api-1.7.32.jar;D:\mvnRespo\org\springframework\spring-core\5.3.9\spring-core-5.3.9.jar;D:\mvnRespo\org\springframework\spring-jcl\5.3.9\spring-jcl-5.3.9.jar;D:\mvnRespo\org\springframework\spring-aspects\5.2.7.RELEASE\spring-aspects-5.2.7.RELEASE.jar;D:\mvnRespo\org\aspectj\aspectjtools\1.9.5\aspectjtools-1.9.5.jar;D:\mvnRespo\aopalliance\aopalliance\1.0\aopalliance-1.0.jar;D:\mvnRespo\org\aspectj\aspectjweaver\1.9.0\aspectjweaver-1.9.0.jar;D:\mvnRespo\cglib\cglib\3.3.0\cglib-3.3.0.jar;D:\mvnRespo\org\ow2\asm\asm\7.1\asm-7.1.jar;D:\mvnRespo\io\netty\netty-all\4.1.66.Final\netty-all-4.1.66.Final.jar;D:\IntelliJ IDEA 2021.1.3\lib\idea_rt.jar" com.example.springdemo.com.threadDemo.scheduled.ScheduledThreadPoolExecutorTest
  2. Connected to the target VM, address: '127.0.0.1:52098', transport: 'socket'
  3. pool-1-thread-1 ;start time2021-08-24T16:44:12.094
  4. scheduleWithFixedDelay工作
  5. pool-1-thread-1 ;end time2021-08-24T16:44:20.108
  6. pool-1-thread-1 ;start time2021-08-24T16:44:25.122
  7. scheduleWithFixedDelay工作
  8. pool-2-thread-1 ;start time2021-08-24T16:44:32.029
  9. scheduleAtFixedRate工作
  10. pool-1-thread-1 ;end time2021-08-24T16:44:33.135
  11. pool-2-thread-1 ;end time2021-08-24T16:44:40.032
  12. pool-2-thread-1 ;start time2021-08-24T16:44:40.032
  13. scheduleAtFixedRate工作
  14. pool-2-thread-1 ;end time2021-08-24T16:44:48.046
  15. pool-2-thread-2 ;start time2021-08-24T16:44:48.047
  16. scheduleAtFixedRate工作
  17. pool-2-thread-2 ;end time2021-08-24T16:44:56.059
  18. Disconnected from the target VM, address: '127.0.0.1:52098', transport: 'socket'
  19. Process finished with exit code 0

1.2 DelayQueue 示例:

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。下面是 java 常见的阻塞队列。

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

延迟阻塞队列就是在阻塞队列的基础上提供了延迟获取任务的功能。

  1. package com.example.springdemo.com.threadDemo.scheduled;
  2. import java.time.Instant;
  3. import java.time.LocalDateTime;
  4. import java.util.concurrent.DelayQueue;
  5. import java.util.concurrent.Delayed;
  6. import java.util.concurrent.TimeUnit;
  7. /**
  8. * @description:
  9. * @author: zhangqing
  10. * @date: 2021-08-24
  11. */
  12. public class DelayQueueTest {
  13. private static DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
  14. static class DelayTask implements Delayed {
  15. // 延迟时间
  16. private final long delay;
  17. // 到期时间
  18. private final long expire;
  19. // 数据
  20. private final String msg;
  21. // 创建时间
  22. private final long now;
  23. /**
  24. * 初始化 DelayTask 对象
  25. *
  26. * @param delay 延迟时间 单位:微妙
  27. * @param msg 业务信息
  28. */
  29. DelayTask(long delay, String msg) {
  30. this.delay = delay; // 延迟时间
  31. this.msg = msg; // 业务信息
  32. this.now = Instant.now().toEpochMilli();
  33. this.expire = now + delay; // 到期时间 = 当前时间+延迟时间
  34. }
  35. /**
  36. * 获取延迟时间
  37. *
  38. * @param unit 单位对象
  39. * @return
  40. */
  41. @Override
  42. public long getDelay(TimeUnit unit) {
  43. return unit.convert(expire - Instant.now().toEpochMilli(), TimeUnit.MILLISECONDS);
  44. }
  45. /**
  46. * 比较器
  47. * 比较规则:延迟时间越长的对象越靠后
  48. *
  49. * @param o
  50. * @return
  51. */
  52. @Override
  53. public int compareTo(Delayed o) {
  54. if (o == this) // compare zero ONLY if same object
  55. return 0;
  56. return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
  57. }
  58. @Override
  59. public String toString() {
  60. return "DelayTask{" +
  61. "delay=" + delay +
  62. ", expire=" + expire +
  63. ", msg='" + msg + '\'' +
  64. ", now=" + now +
  65. '}';
  66. }
  67. }
  68. /**
  69. * 生产者线程
  70. *
  71. * @param args
  72. */
  73. public static void main(String[] args) {
  74. initConsumer();
  75. try {
  76. // 等待消费者初始化完毕
  77. TimeUnit.SECONDS.sleep(3);
  78. } catch (InterruptedException e) {
  79. e.printStackTrace();
  80. }
  81. delayQueue.add(new DelayTask(1000, "Task1"));
  82. delayQueue.add(new DelayTask(2000, "Task2"));
  83. delayQueue.add(new DelayTask(3000, "Task3"));
  84. delayQueue.add(new DelayTask(4000, "Task4"));
  85. delayQueue.add(new DelayTask(5000, "Task5"));
  86. }
  87. /**
  88. * 初始化消费者线程
  89. */
  90. private static void initConsumer() {
  91. Runnable task = () -> {
  92. while (true) {
  93. try {
  94. System.out.println("尝试获取延迟队列中的任务。" + LocalDateTime.now());
  95. System.out.println(delayQueue.take());
  96. } catch (InterruptedException e) {
  97. e.printStackTrace();
  98. }
  99. }
  100. };
  101. Thread consumer = new Thread(task);
  102. consumer.start();
  103. }
  104. }

输出:

  1. 尝试获取延迟队列中的任务。2021-08-24T17:02:55.725
  2. DelayTask{delay=1000, expire=1629795779691, msg='Task1', now=1629795778691}
  3. 尝试获取延迟队列中的任务。2021-08-24T17:02:59.703
  4. DelayTask{delay=2000, expire=1629795780691, msg='Task2', now=1629795778691}
  5. 尝试获取延迟队列中的任务。2021-08-24T17:03:00.701
  6. DelayTask{delay=3000, expire=1629795781691, msg='Task3', now=1629795778691}
  7. 尝试获取延迟队列中的任务。2021-08-24T17:03:01.701
  8. DelayTask{delay=4000, expire=1629795782691, msg='Task4', now=1629795778691}
  9. 尝试获取延迟队列中的任务。2021-08-24T17:03:02.699
  10. DelayTask{delay=5000, expire=1629795783691, msg='Task5', now=1629795778691}
  11. 尝试获取延迟队列中的任务。2021-08-24T17:03:03.696

2 实现原理

2.1 DelayQueue 实现原理

2.1.1 take

  1. //检索并移除此队列的头,如有必要,等待此队列上具有过期延迟的元素可用。
  2. //返回:
  3. //这个队列的头
  4. public E take() throws InterruptedException {
  5. // 获取锁。每个延迟队列内聚了一个重入锁。
  6. final ReentrantLock lock = this.lock;
  7. // 获取可中断的锁。
  8. lock.lockInterruptibly();
  9. try {
  10. for (;;) {
  11. // 尝试从优先级队列中获取队列头部元素
  12. E first = q.peek();
  13. if (first == null)
  14. // 无元素,当前线程节点加入等待队列,并阻塞当前线程
  15. available.await();
  16. else {
  17. // 通过延迟任务的 getDelay 方法获取延迟时间
  18. long delay = first.getDelay(NANOSECONDS);
  19. if (delay <= 0)
  20. // 延迟时间到期,获取并删除头部元素。
  21. return q.poll();
  22. first = null; // don't retain ref while waiting
  23. if (leader != null)
  24. available.await();
  25. else {
  26. Thread thisThread = Thread.currentThread();
  27. leader = thisThread;
  28. try {
  29. // 线程节点进入等待队列 x 纳秒。
  30. available.awaitNanos(delay);
  31. } finally {
  32. if (leader == thisThread)
  33. leader = null;
  34. }
  35. }
  36. }
  37. }
  38. } finally {
  39. // 若还存在元素的话,则将等待队列头节点中的线程节点移动到同步队列中。
  40. if (leader == null && q.peek() != null)
  41. available.signal();
  42. lock.unlock();
  43. }
  44. }

2.1.2 add

  1. public boolean add(E e) {
  2. return offer(e);
  3. }
  4. /**
  5. * 将指定的元素插入此延迟队列
  6. */
  7. public boolean offer(E e) {
  8. // 获取到重入锁
  9. final ReentrantLock lock = this.lock;
  10. lock.lock();
  11. try {
  12. q.offer(e);
  13. // 添加成功元素
  14. if (q.peek() == e) {
  15. leader = null;
  16. // 将等待队列中的头节点移动到同步队列。
  17. available.signal();
  18. }
  19. return true;
  20. } finally {
  21. lock.unlock();
  22. }
  23. }

DelayQueue 将实现了 Delayed 接口的对象添加到优先级队列中,通过在依赖内聚重入锁的 Condition 上调用 await(delayTime) 方法,实现了延迟获取阻塞队列中元素的功能。

2.2 ScheduledExecutorService

实现类:ScheduledThreadPoolExecutor

2.2.1 delayedExecute

  1. private void delayedExecute(RunnableScheduledFuture<?> task) {
  2. // 判断线程池是否已经关闭。
  3. if (isShutdown())
  4. // 执行拒绝策略。
  5. reject(task);
  6. else {
  7. // 添加封装好的延迟任务到阻塞队列中。
  8. super.getQueue().add(task);
  9. if (isShutdown() &&
  10. !canRunInCurrentRunState(task.isPeriodic()) &&
  11. remove(task))
  12. task.cancel(false);
  13. else
  14. ensurePrestart();
  15. }
  16. }

只要线程池正常运行,则将 DelayedTask 添加到 workQueue 中。注意,workQueue 是定义在 ThreadPoolExecutor 当中的用来保存工作任务的阻塞队列。

2.2.2 ensurePrestart

  1. /**
  2. * Same as prestartCoreThread except arranges that at least one
  3. * thread is started even if corePoolSize is 0.
  4. */
  5. void ensurePrestart() {
  6. int wc = workerCountOf(ctl.get());
  7. // 当前工作的线程是否超过核心线程数。
  8. if (wc < corePoolSize)
  9. // 调用 addWorker 方法,创建新的线程执行任务。
  10. addWorker(null, true);
  11. else if (wc == 0)
  12. addWorker(null, false);
  13. }

addWorker即回到ThreadPoolExecutor中的新建任务队列

2.3 ScheduledFutureTask

2.3.1 run

  1. public void run() {
  2. boolean periodic = isPeriodic();
  3. if (!canRunInCurrentRunState(periodic))
  4. cancel(false);
  5. // 若无间隔时间,则直接调用 futureTask 的 run 方法。
  6. else if (!periodic)
  7. ScheduledFutureTask.super.run();
  8. // 调用 runAndReset 方法执行任务。
  9. else if (ScheduledFutureTask.super.runAndReset()) {
  10. // 设置下次执行时间
  11. setNextRunTime();
  12. // 将任务重新放回工作任务队列。
  13. reExecutePeriodic(outerTask);
  14. }
  15. }

工作任务队列中的任务对象一旦被工作线程获取成功后,就会被从队列中移出。而其他之前阻塞在队列上,此时竞争到锁的工作者线程将会尝试获取任务队列中的下一个任务。
调用成功获取到的 ScheduledFutureTask 的 run 方法,执行业务逻辑以后 将重新计算对象的 delay 时间,再通过 runAndReset 方法将重新计算的后的对象重置回工作任务阻塞队列中。由于默认实现的 compareTo 方法,
这样,就实现了线程周期性的执行任务的功能。