1 简介
通过 ThreadPoolExecutor 可以实现各式各样的自定义线程池,而 ScheduledThreadPoolExecutor 类则在自定义线程池的基础上增加了周期性执行任务的功能。
Java 定时任务可以用Timer + TimerTask来做,或者使用ScheduledExecutorService,使用ScheduledExecutorService有两个好处:
1. 如果任务执行时间过长,TimerTask会出现延迟执行的情况。比如,第一任务在1000ms执行了4000ms,第二个任务定时在2000ms开始执行。这里由于第一个任务要执行4000,所以第二个任务实际在5000ms开始执行。这是由于Timer是单线程,且顺序执行提交的任务
2. 如果执行任务抛出异常,Timer是不会执行会后面的任务的
使用ScheduledExecutorService可以避免上面两种情况,因为ScheduledExecutorService是线程池,有多个线程执行。
1.1 ScheduledThreadPoolExecutor 示例:
package com.example.springdemo.com.threadDemo.scheduled;import java.time.LocalDateTime;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;/**** @description:* @author: zhangqing* @date: 2021-08-24*/public class ScheduledThreadPoolExecutorTest {public static void main(String[] args) throws InterruptedException {ScheduledExecutorService service = Executors.newScheduledThreadPool(3);/**命令–要执行的任务initialDelay–延迟第一次执行的时间延迟–一次执行终止与下一次执行开始之间的延迟unit–initialDelay和delay参数的时间单位*/service.scheduleWithFixedDelay(new ScheduledThread("scheduleWithFixedDelay工作"), 3, 5, TimeUnit.SECONDS);Thread.sleep(20000);service.shutdown();service = Executors.newScheduledThreadPool(3);/*** scheduleAtFixedRate 中的 delayTime/period 表示从线程池中首先开始执行的线程算起,假设period为1s,* 若线程执行了5s,那么下一个线程在第一个线程运行完后会很快被执行。*/service.scheduleAtFixedRate(new ScheduledThread("scheduleAtFixedRate工作"), 3, 5, TimeUnit.SECONDS);Thread.sleep(20000);service.shutdown();}}class ScheduledThread implements Runnable {private String work;public ScheduledThread(String work) {this.work = work;}@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName() + " ;start time" + LocalDateTime.now());System.out.println(work);Thread.sleep(8000);System.out.println(Thread.currentThread().getName() + " ;end time" + LocalDateTime.now());} catch (InterruptedException e) {e.printStackTrace();}}}
输出结果:
"C:\Program Files\Java\jdk1.8.0_291\bin\java.exe" -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:52098,suspend=y,server=n -javaagent:C:\Users\zhangqing\AppData\Local\JetBrains\IntelliJIdea2021.1\captureAgent\debugger-agent.jar -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_291\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\rt.jar;E:\demo\spring\target\classes;D:\mvnRespo\org\springframework\boot\spring-boot-starter-web\2.5.3\spring-boot-starter-web-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter\2.5.3\spring-boot-starter-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot\2.5.3\spring-boot-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot-autoconfigure\2.5.3\spring-boot-autoconfigure-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter-logging\2.5.3\spring-boot-starter-logging-2.5.3.jar;D:\mvnRespo\ch\qos\logback\logback-classic\1.2.4\logback-classic-1.2.4.jar;D:\mvnRespo\ch\qos\logback\logback-core\1.2.4\logback-core-1.2.4.jar;D:\mvnRespo\org\apache\logging\log4j\log4j-to-slf4j\2.14.1\log4j-to-slf4j-2.14.1.jar;D:\mvnRespo\org\apache\logging\log4j\log4j-api\2.14.1\log4j-api-2.14.1.jar;D:\mvnRespo\org\slf4j\jul-to-slf4j\1.7.32\jul-to-slf4j-1.7.32.jar;D:\mvnRespo\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\mvnRespo\org\yaml\snakeyaml\1.28\snakeyaml-1.28.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter-json\2.5.3\spring-boot-starter-json-2.5.3.jar;D:\mvnRespo\com\fasterxml\jackson\core\jackson-databind\2.12.4\jackson-databind-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\core\jackson-annotations\2.12.4\jackson-annotations-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\core\jackson-core\2.12.4\jackson-core-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.12.4\jackson-datatype-jdk8-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.12.4\jackson-datatype-jsr310-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\module\jackson-module-parameter-names\2.12.4\jackson-module-parameter-names-2.12.4.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter-tomcat\2.5.3\spring-boot-starter-tomcat-2.5.3.jar;D:\mvnRespo\org\apache\tomcat\embed\tomcat-embed-core\9.0.50\tomcat-embed-core-9.0.50.jar;D:\mvnRespo\org\apache\tomcat\embed\tomcat-embed-el\9.0.50\tomcat-embed-el-9.0.50.jar;D:\mvnRespo\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.50\tomcat-embed-websocket-9.0.50.jar;D:\mvnRespo\org\springframework\spring-web\5.3.9\spring-web-5.3.9.jar;D:\mvnRespo\org\springframework\spring-beans\5.3.9\spring-beans-5.3.9.jar;D:\mvnRespo\org\springframework\spring-webmvc\5.3.9\spring-webmvc-5.3.9.jar;D:\mvnRespo\org\springframework\spring-aop\5.3.9\spring-aop-5.3.9.jar;D:\mvnRespo\org\springframework\spring-context\5.3.9\spring-context-5.3.9.jar;D:\mvnRespo\org\springframework\spring-expression\5.3.9\spring-expression-5.3.9.jar;D:\mvnRespo\com\google\guava\guava\28.1-jre\guava-28.1-jre.jar;D:\mvnRespo\com\google\guava\failureaccess\1.0.1\failureaccess-1.0.1.jar;D:\mvnRespo\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;D:\mvnRespo\com\google\code\findbugs\jsr305\3.0.2\jsr305-3.0.2.jar;D:\mvnRespo\org\checkerframework\checker-qual\2.8.1\checker-qual-2.8.1.jar;D:\mvnRespo\com\google\errorprone\error_prone_annotations\2.3.2\error_prone_annotations-2.3.2.jar;D:\mvnRespo\com\google\j2objc\j2objc-annotations\1.3\j2objc-annotations-1.3.jar;D:\mvnRespo\org\codehaus\mojo\animal-sniffer-annotations\1.18\animal-sniffer-annotations-1.18.jar;D:\mvnRespo\org\slf4j\slf4j-api\1.7.32\slf4j-api-1.7.32.jar;D:\mvnRespo\org\springframework\spring-core\5.3.9\spring-core-5.3.9.jar;D:\mvnRespo\org\springframework\spring-jcl\5.3.9\spring-jcl-5.3.9.jar;D:\mvnRespo\org\springframework\spring-aspects\5.2.7.RELEASE\spring-aspects-5.2.7.RELEASE.jar;D:\mvnRespo\org\aspectj\aspectjtools\1.9.5\aspectjtools-1.9.5.jar;D:\mvnRespo\aopalliance\aopalliance\1.0\aopalliance-1.0.jar;D:\mvnRespo\org\aspectj\aspectjweaver\1.9.0\aspectjweaver-1.9.0.jar;D:\mvnRespo\cglib\cglib\3.3.0\cglib-3.3.0.jar;D:\mvnRespo\org\ow2\asm\asm\7.1\asm-7.1.jar;D:\mvnRespo\io\netty\netty-all\4.1.66.Final\netty-all-4.1.66.Final.jar;D:\IntelliJ IDEA 2021.1.3\lib\idea_rt.jar" com.example.springdemo.com.threadDemo.scheduled.ScheduledThreadPoolExecutorTestConnected to the target VM, address: '127.0.0.1:52098', transport: 'socket'pool-1-thread-1 ;start time2021-08-24T16:44:12.094scheduleWithFixedDelay工作pool-1-thread-1 ;end time2021-08-24T16:44:20.108pool-1-thread-1 ;start time2021-08-24T16:44:25.122scheduleWithFixedDelay工作pool-2-thread-1 ;start time2021-08-24T16:44:32.029scheduleAtFixedRate工作pool-1-thread-1 ;end time2021-08-24T16:44:33.135pool-2-thread-1 ;end time2021-08-24T16:44:40.032pool-2-thread-1 ;start time2021-08-24T16:44:40.032scheduleAtFixedRate工作pool-2-thread-1 ;end time2021-08-24T16:44:48.046pool-2-thread-2 ;start time2021-08-24T16:44:48.047scheduleAtFixedRate工作pool-2-thread-2 ;end time2021-08-24T16:44:56.059Disconnected from the target VM, address: '127.0.0.1:52098', transport: 'socket'Process finished with exit code 0
1.2 DelayQueue 示例:
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。下面是 java 常见的阻塞队列。
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
延迟阻塞队列就是在阻塞队列的基础上提供了延迟获取任务的功能。
package com.example.springdemo.com.threadDemo.scheduled;import java.time.Instant;import java.time.LocalDateTime;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/*** @description:* @author: zhangqing* @date: 2021-08-24*/public class DelayQueueTest {private static DelayQueue<DelayTask> delayQueue = new DelayQueue<>();static class DelayTask implements Delayed {// 延迟时间private final long delay;// 到期时间private final long expire;// 数据private final String msg;// 创建时间private final long now;/*** 初始化 DelayTask 对象** @param delay 延迟时间 单位:微妙* @param msg 业务信息*/DelayTask(long delay, String msg) {this.delay = delay; // 延迟时间this.msg = msg; // 业务信息this.now = Instant.now().toEpochMilli();this.expire = now + delay; // 到期时间 = 当前时间+延迟时间}/*** 获取延迟时间** @param unit 单位对象* @return*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(expire - Instant.now().toEpochMilli(), TimeUnit.MILLISECONDS);}/*** 比较器* 比较规则:延迟时间越长的对象越靠后** @param o* @return*/@Overridepublic int compareTo(Delayed o) {if (o == this) // compare zero ONLY if same objectreturn 0;return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString() {return "DelayTask{" +"delay=" + delay +", expire=" + expire +", msg='" + msg + '\'' +", now=" + now +'}';}}/*** 生产者线程** @param args*/public static void main(String[] args) {initConsumer();try {// 等待消费者初始化完毕TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}delayQueue.add(new DelayTask(1000, "Task1"));delayQueue.add(new DelayTask(2000, "Task2"));delayQueue.add(new DelayTask(3000, "Task3"));delayQueue.add(new DelayTask(4000, "Task4"));delayQueue.add(new DelayTask(5000, "Task5"));}/*** 初始化消费者线程*/private static void initConsumer() {Runnable task = () -> {while (true) {try {System.out.println("尝试获取延迟队列中的任务。" + LocalDateTime.now());System.out.println(delayQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}};Thread consumer = new Thread(task);consumer.start();}}
输出:
尝试获取延迟队列中的任务。2021-08-24T17:02:55.725DelayTask{delay=1000, expire=1629795779691, msg='Task1', now=1629795778691}尝试获取延迟队列中的任务。2021-08-24T17:02:59.703DelayTask{delay=2000, expire=1629795780691, msg='Task2', now=1629795778691}尝试获取延迟队列中的任务。2021-08-24T17:03:00.701DelayTask{delay=3000, expire=1629795781691, msg='Task3', now=1629795778691}尝试获取延迟队列中的任务。2021-08-24T17:03:01.701DelayTask{delay=4000, expire=1629795782691, msg='Task4', now=1629795778691}尝试获取延迟队列中的任务。2021-08-24T17:03:02.699DelayTask{delay=5000, expire=1629795783691, msg='Task5', now=1629795778691}尝试获取延迟队列中的任务。2021-08-24T17:03:03.696
2 实现原理
2.1 DelayQueue 实现原理
2.1.1 take
//检索并移除此队列的头,如有必要,等待此队列上具有过期延迟的元素可用。//返回://这个队列的头public E take() throws InterruptedException {// 获取锁。每个延迟队列内聚了一个重入锁。final ReentrantLock lock = this.lock;// 获取可中断的锁。lock.lockInterruptibly();try {for (;;) {// 尝试从优先级队列中获取队列头部元素E first = q.peek();if (first == null)// 无元素,当前线程节点加入等待队列,并阻塞当前线程available.await();else {// 通过延迟任务的 getDelay 方法获取延迟时间long delay = first.getDelay(NANOSECONDS);if (delay <= 0)// 延迟时间到期,获取并删除头部元素。return q.poll();first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {// 线程节点进入等待队列 x 纳秒。available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {// 若还存在元素的话,则将等待队列头节点中的线程节点移动到同步队列中。if (leader == null && q.peek() != null)available.signal();lock.unlock();}}
2.1.2 add
public boolean add(E e) {return offer(e);}/*** 将指定的元素插入此延迟队列*/public boolean offer(E e) {// 获取到重入锁final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);// 添加成功元素if (q.peek() == e) {leader = null;// 将等待队列中的头节点移动到同步队列。available.signal();}return true;} finally {lock.unlock();}}
DelayQueue 将实现了 Delayed 接口的对象添加到优先级队列中,通过在依赖内聚重入锁的 Condition 上调用 await(delayTime) 方法,实现了延迟获取阻塞队列中元素的功能。
2.2 ScheduledExecutorService
实现类:ScheduledThreadPoolExecutor
2.2.1 delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {// 判断线程池是否已经关闭。if (isShutdown())// 执行拒绝策略。reject(task);else {// 添加封装好的延迟任务到阻塞队列中。super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart();}}
只要线程池正常运行,则将 DelayedTask 添加到 workQueue 中。注意,workQueue 是定义在 ThreadPoolExecutor 当中的用来保存工作任务的阻塞队列。
2.2.2 ensurePrestart
/*** Same as prestartCoreThread except arranges that at least one* thread is started even if corePoolSize is 0.*/void ensurePrestart() {int wc = workerCountOf(ctl.get());// 当前工作的线程是否超过核心线程数。if (wc < corePoolSize)// 调用 addWorker 方法,创建新的线程执行任务。addWorker(null, true);else if (wc == 0)addWorker(null, false);}
addWorker即回到ThreadPoolExecutor中的新建任务队列
2.3 ScheduledFutureTask
2.3.1 run
public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);// 若无间隔时间,则直接调用 futureTask 的 run 方法。else if (!periodic)ScheduledFutureTask.super.run();// 调用 runAndReset 方法执行任务。else if (ScheduledFutureTask.super.runAndReset()) {// 设置下次执行时间setNextRunTime();// 将任务重新放回工作任务队列。reExecutePeriodic(outerTask);}}
工作任务队列中的任务对象一旦被工作线程获取成功后,就会被从队列中移出。而其他之前阻塞在队列上,此时竞争到锁的工作者线程将会尝试获取任务队列中的下一个任务。
调用成功获取到的 ScheduledFutureTask 的 run 方法,执行业务逻辑以后 将重新计算对象的 delay 时间,再通过 runAndReset 方法将重新计算的后的对象重置回工作任务阻塞队列中。由于默认实现的 compareTo 方法,
这样,就实现了线程周期性的执行任务的功能。
