1 简介
通过 ThreadPoolExecutor 可以实现各式各样的自定义线程池,而 ScheduledThreadPoolExecutor 类则在自定义线程池的基础上增加了周期性执行任务的功能。
Java 定时任务可以用Timer + TimerTask来做,或者使用ScheduledExecutorService,使用ScheduledExecutorService有两个好处:
1. 如果任务执行时间过长,TimerTask会出现延迟执行的情况。比如,第一任务在1000ms执行了4000ms,第二个任务定时在2000ms开始执行。这里由于第一个任务要执行4000,所以第二个任务实际在5000ms开始执行。这是由于Timer是单线程,且顺序执行提交的任务
2. 如果执行任务抛出异常,Timer是不会执行会后面的任务的
使用ScheduledExecutorService可以避免上面两种情况,因为ScheduledExecutorService是线程池,有多个线程执行。
1.1 ScheduledThreadPoolExecutor 示例:
package com.example.springdemo.com.threadDemo.scheduled;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
*
* @description:
* @author: zhangqing
* @date: 2021-08-24
*/
public class ScheduledThreadPoolExecutorTest {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
/**命令–要执行的任务
initialDelay–延迟第一次执行的时间
延迟–一次执行终止与下一次执行开始之间的延迟
unit–initialDelay和delay参数的时间单位
*/
service.scheduleWithFixedDelay(new ScheduledThread("scheduleWithFixedDelay工作"), 3, 5, TimeUnit.SECONDS);
Thread.sleep(20000);
service.shutdown();
service = Executors.newScheduledThreadPool(3);
/**
* scheduleAtFixedRate 中的 delayTime/period 表示从线程池中首先开始执行的线程算起,假设period为1s,
* 若线程执行了5s,那么下一个线程在第一个线程运行完后会很快被执行。
*/
service.scheduleAtFixedRate(new ScheduledThread("scheduleAtFixedRate工作"), 3, 5, TimeUnit.SECONDS);
Thread.sleep(20000);
service.shutdown();
}
}
class ScheduledThread implements Runnable {
private String work;
public ScheduledThread(String work) {
this.work = work;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " ;start time" + LocalDateTime.now());
System.out.println(work);
Thread.sleep(8000);
System.out.println(Thread.currentThread().getName() + " ;end time" + LocalDateTime.now());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出结果:
"C:\Program Files\Java\jdk1.8.0_291\bin\java.exe" -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:52098,suspend=y,server=n -javaagent:C:\Users\zhangqing\AppData\Local\JetBrains\IntelliJIdea2021.1\captureAgent\debugger-agent.jar -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_291\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_291\jre\lib\rt.jar;E:\demo\spring\target\classes;D:\mvnRespo\org\springframework\boot\spring-boot-starter-web\2.5.3\spring-boot-starter-web-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter\2.5.3\spring-boot-starter-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot\2.5.3\spring-boot-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot-autoconfigure\2.5.3\spring-boot-autoconfigure-2.5.3.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter-logging\2.5.3\spring-boot-starter-logging-2.5.3.jar;D:\mvnRespo\ch\qos\logback\logback-classic\1.2.4\logback-classic-1.2.4.jar;D:\mvnRespo\ch\qos\logback\logback-core\1.2.4\logback-core-1.2.4.jar;D:\mvnRespo\org\apache\logging\log4j\log4j-to-slf4j\2.14.1\log4j-to-slf4j-2.14.1.jar;D:\mvnRespo\org\apache\logging\log4j\log4j-api\2.14.1\log4j-api-2.14.1.jar;D:\mvnRespo\org\slf4j\jul-to-slf4j\1.7.32\jul-to-slf4j-1.7.32.jar;D:\mvnRespo\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\mvnRespo\org\yaml\snakeyaml\1.28\snakeyaml-1.28.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter-json\2.5.3\spring-boot-starter-json-2.5.3.jar;D:\mvnRespo\com\fasterxml\jackson\core\jackson-databind\2.12.4\jackson-databind-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\core\jackson-annotations\2.12.4\jackson-annotations-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\core\jackson-core\2.12.4\jackson-core-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.12.4\jackson-datatype-jdk8-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.12.4\jackson-datatype-jsr310-2.12.4.jar;D:\mvnRespo\com\fasterxml\jackson\module\jackson-module-parameter-names\2.12.4\jackson-module-parameter-names-2.12.4.jar;D:\mvnRespo\org\springframework\boot\spring-boot-starter-tomcat\2.5.3\spring-boot-starter-tomcat-2.5.3.jar;D:\mvnRespo\org\apache\tomcat\embed\tomcat-embed-core\9.0.50\tomcat-embed-core-9.0.50.jar;D:\mvnRespo\org\apache\tomcat\embed\tomcat-embed-el\9.0.50\tomcat-embed-el-9.0.50.jar;D:\mvnRespo\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.50\tomcat-embed-websocket-9.0.50.jar;D:\mvnRespo\org\springframework\spring-web\5.3.9\spring-web-5.3.9.jar;D:\mvnRespo\org\springframework\spring-beans\5.3.9\spring-beans-5.3.9.jar;D:\mvnRespo\org\springframework\spring-webmvc\5.3.9\spring-webmvc-5.3.9.jar;D:\mvnRespo\org\springframework\spring-aop\5.3.9\spring-aop-5.3.9.jar;D:\mvnRespo\org\springframework\spring-context\5.3.9\spring-context-5.3.9.jar;D:\mvnRespo\org\springframework\spring-expression\5.3.9\spring-expression-5.3.9.jar;D:\mvnRespo\com\google\guava\guava\28.1-jre\guava-28.1-jre.jar;D:\mvnRespo\com\google\guava\failureaccess\1.0.1\failureaccess-1.0.1.jar;D:\mvnRespo\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;D:\mvnRespo\com\google\code\findbugs\jsr305\3.0.2\jsr305-3.0.2.jar;D:\mvnRespo\org\checkerframework\checker-qual\2.8.1\checker-qual-2.8.1.jar;D:\mvnRespo\com\google\errorprone\error_prone_annotations\2.3.2\error_prone_annotations-2.3.2.jar;D:\mvnRespo\com\google\j2objc\j2objc-annotations\1.3\j2objc-annotations-1.3.jar;D:\mvnRespo\org\codehaus\mojo\animal-sniffer-annotations\1.18\animal-sniffer-annotations-1.18.jar;D:\mvnRespo\org\slf4j\slf4j-api\1.7.32\slf4j-api-1.7.32.jar;D:\mvnRespo\org\springframework\spring-core\5.3.9\spring-core-5.3.9.jar;D:\mvnRespo\org\springframework\spring-jcl\5.3.9\spring-jcl-5.3.9.jar;D:\mvnRespo\org\springframework\spring-aspects\5.2.7.RELEASE\spring-aspects-5.2.7.RELEASE.jar;D:\mvnRespo\org\aspectj\aspectjtools\1.9.5\aspectjtools-1.9.5.jar;D:\mvnRespo\aopalliance\aopalliance\1.0\aopalliance-1.0.jar;D:\mvnRespo\org\aspectj\aspectjweaver\1.9.0\aspectjweaver-1.9.0.jar;D:\mvnRespo\cglib\cglib\3.3.0\cglib-3.3.0.jar;D:\mvnRespo\org\ow2\asm\asm\7.1\asm-7.1.jar;D:\mvnRespo\io\netty\netty-all\4.1.66.Final\netty-all-4.1.66.Final.jar;D:\IntelliJ IDEA 2021.1.3\lib\idea_rt.jar" com.example.springdemo.com.threadDemo.scheduled.ScheduledThreadPoolExecutorTest
Connected to the target VM, address: '127.0.0.1:52098', transport: 'socket'
pool-1-thread-1 ;start time2021-08-24T16:44:12.094
scheduleWithFixedDelay工作
pool-1-thread-1 ;end time2021-08-24T16:44:20.108
pool-1-thread-1 ;start time2021-08-24T16:44:25.122
scheduleWithFixedDelay工作
pool-2-thread-1 ;start time2021-08-24T16:44:32.029
scheduleAtFixedRate工作
pool-1-thread-1 ;end time2021-08-24T16:44:33.135
pool-2-thread-1 ;end time2021-08-24T16:44:40.032
pool-2-thread-1 ;start time2021-08-24T16:44:40.032
scheduleAtFixedRate工作
pool-2-thread-1 ;end time2021-08-24T16:44:48.046
pool-2-thread-2 ;start time2021-08-24T16:44:48.047
scheduleAtFixedRate工作
pool-2-thread-2 ;end time2021-08-24T16:44:56.059
Disconnected from the target VM, address: '127.0.0.1:52098', transport: 'socket'
Process finished with exit code 0
1.2 DelayQueue 示例:
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。下面是 java 常见的阻塞队列。
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
延迟阻塞队列就是在阻塞队列的基础上提供了延迟获取任务的功能。
package com.example.springdemo.com.threadDemo.scheduled;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @description:
* @author: zhangqing
* @date: 2021-08-24
*/
public class DelayQueueTest {
private static DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
static class DelayTask implements Delayed {
// 延迟时间
private final long delay;
// 到期时间
private final long expire;
// 数据
private final String msg;
// 创建时间
private final long now;
/**
* 初始化 DelayTask 对象
*
* @param delay 延迟时间 单位:微妙
* @param msg 业务信息
*/
DelayTask(long delay, String msg) {
this.delay = delay; // 延迟时间
this.msg = msg; // 业务信息
this.now = Instant.now().toEpochMilli();
this.expire = now + delay; // 到期时间 = 当前时间+延迟时间
}
/**
* 获取延迟时间
*
* @param unit 单位对象
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expire - Instant.now().toEpochMilli(), TimeUnit.MILLISECONDS);
}
/**
* 比较器
* 比较规则:延迟时间越长的对象越靠后
*
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
if (o == this) // compare zero ONLY if same object
return 0;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayTask{" +
"delay=" + delay +
", expire=" + expire +
", msg='" + msg + '\'' +
", now=" + now +
'}';
}
}
/**
* 生产者线程
*
* @param args
*/
public static void main(String[] args) {
initConsumer();
try {
// 等待消费者初始化完毕
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
delayQueue.add(new DelayTask(1000, "Task1"));
delayQueue.add(new DelayTask(2000, "Task2"));
delayQueue.add(new DelayTask(3000, "Task3"));
delayQueue.add(new DelayTask(4000, "Task4"));
delayQueue.add(new DelayTask(5000, "Task5"));
}
/**
* 初始化消费者线程
*/
private static void initConsumer() {
Runnable task = () -> {
while (true) {
try {
System.out.println("尝试获取延迟队列中的任务。" + LocalDateTime.now());
System.out.println(delayQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Thread consumer = new Thread(task);
consumer.start();
}
}
输出:
尝试获取延迟队列中的任务。2021-08-24T17:02:55.725
DelayTask{delay=1000, expire=1629795779691, msg='Task1', now=1629795778691}
尝试获取延迟队列中的任务。2021-08-24T17:02:59.703
DelayTask{delay=2000, expire=1629795780691, msg='Task2', now=1629795778691}
尝试获取延迟队列中的任务。2021-08-24T17:03:00.701
DelayTask{delay=3000, expire=1629795781691, msg='Task3', now=1629795778691}
尝试获取延迟队列中的任务。2021-08-24T17:03:01.701
DelayTask{delay=4000, expire=1629795782691, msg='Task4', now=1629795778691}
尝试获取延迟队列中的任务。2021-08-24T17:03:02.699
DelayTask{delay=5000, expire=1629795783691, msg='Task5', now=1629795778691}
尝试获取延迟队列中的任务。2021-08-24T17:03:03.696
2 实现原理
2.1 DelayQueue 实现原理
2.1.1 take
//检索并移除此队列的头,如有必要,等待此队列上具有过期延迟的元素可用。
//返回:
//这个队列的头
public E take() throws InterruptedException {
// 获取锁。每个延迟队列内聚了一个重入锁。
final ReentrantLock lock = this.lock;
// 获取可中断的锁。
lock.lockInterruptibly();
try {
for (;;) {
// 尝试从优先级队列中获取队列头部元素
E first = q.peek();
if (first == null)
// 无元素,当前线程节点加入等待队列,并阻塞当前线程
available.await();
else {
// 通过延迟任务的 getDelay 方法获取延迟时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 延迟时间到期,获取并删除头部元素。
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 线程节点进入等待队列 x 纳秒。
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 若还存在元素的话,则将等待队列头节点中的线程节点移动到同步队列中。
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
2.1.2 add
public boolean add(E e) {
return offer(e);
}
/**
* 将指定的元素插入此延迟队列
*/
public boolean offer(E e) {
// 获取到重入锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
// 添加成功元素
if (q.peek() == e) {
leader = null;
// 将等待队列中的头节点移动到同步队列。
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
DelayQueue 将实现了 Delayed 接口的对象添加到优先级队列中,通过在依赖内聚重入锁的 Condition 上调用 await(delayTime) 方法,实现了延迟获取阻塞队列中元素的功能。
2.2 ScheduledExecutorService
实现类:ScheduledThreadPoolExecutor
2.2.1 delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 判断线程池是否已经关闭。
if (isShutdown())
// 执行拒绝策略。
reject(task);
else {
// 添加封装好的延迟任务到阻塞队列中。
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
只要线程池正常运行,则将 DelayedTask 添加到 workQueue 中。注意,workQueue 是定义在 ThreadPoolExecutor 当中的用来保存工作任务的阻塞队列。
2.2.2 ensurePrestart
/**
* Same as prestartCoreThread except arranges that at least one
* thread is started even if corePoolSize is 0.
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 当前工作的线程是否超过核心线程数。
if (wc < corePoolSize)
// 调用 addWorker 方法,创建新的线程执行任务。
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
addWorker即回到ThreadPoolExecutor中的新建任务队列
2.3 ScheduledFutureTask
2.3.1 run
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 若无间隔时间,则直接调用 futureTask 的 run 方法。
else if (!periodic)
ScheduledFutureTask.super.run();
// 调用 runAndReset 方法执行任务。
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次执行时间
setNextRunTime();
// 将任务重新放回工作任务队列。
reExecutePeriodic(outerTask);
}
}
工作任务队列中的任务对象一旦被工作线程获取成功后,就会被从队列中移出。而其他之前阻塞在队列上,此时竞争到锁的工作者线程将会尝试获取任务队列中的下一个任务。
调用成功获取到的 ScheduledFutureTask 的 run 方法,执行业务逻辑以后 将重新计算对象的 delay 时间,再通过 runAndReset 方法将重新计算的后的对象重置回工作任务阻塞队列中。由于默认实现的 compareTo 方法,
这样,就实现了线程周期性的执行任务的功能。