一、SpringBoot异步任务
1.1 为什么要用异步框架,它解决什么问题?
在SpringBoot的日常开发中,一般都是同步调用的。但实际中有很多场景非常适合使用异步来处理,如:注册新用户,送100个积分;或下单成功,发送push消息等等。
就拿注册新用户这个用例来说,为什么要异步处理?
- 第一个原因:容错性、健壮性,如果送积分出现异常,不能因为送积分而导致用户注册失败;因为用户注册是主要功能,送积分是次要功能,即使送积分异常也要提示用户注册成功,然后后面在针对积分异常做补偿处理。
- 第二个原因:提升性能,例如注册用户花了20毫秒,送积分花费50毫秒,如果用同步的话,总耗时70毫秒,用异步的话,无需等待积分,故耗时20毫秒。
故,异步能解决2个问题,性能和容错性。
1.2 SpringBoot如何实现异步调用?
对于异步方法调用,从Spring3开始提供了@Async
注解,只需要在方法上标注此注解,此方法即可实现异步调用。
当然,还需要一个配置类,通过Enable模块驱动注解@EnableAsync
来开启异步功能。
注:@Async 注解通常用在方法上,但是也可以用作类型上,当类被 @Async 注解时,表示该类中所有的方法都是异步执行的。
1.3实现异步调用
第一步:新建配置类,开启@Async
功能支持
使用@EnableAsync
来开启异步任务支持,@EnableAsync
注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。这里选择使用单独的配置类AsyncConfiguration
@Configuration
@EnableAsync
public class AsyncConfiguration {
}
第二步:在方法上标记异步调用
增加一个 AsyncServiceImpl 类,用来进行业务处理,同时添加 @Async
注解,代表该方法为异步处理
@Slf4j
@Service
public class AsyncServiceImpl implements AsyncService {
/**
* 测试使用SpringBoot默认线程池
*
* @return Future
* @throws InterruptedException
*/
@Override
@Async("otherTaskExecutor")
public Future<String> otherAsyncThreadPool() throws InterruptedException {
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
log.info("otherAsyncThreadPool: Thread = {}", Thread.currentThread().getName());
return new AsyncResult<>("Test Async Thread Pool Done");
}
/**
* 测试使用SpringBoot定制线程池
*
* @throws InterruptedException
*/
@Async
@Override
public void customAsyncThreadPool() throws InterruptedException {
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
log.info("customAsyncThreadPool: Thread = {}", Thread.currentThread().getName());
}
}
上述代码使用 _@Async_
标记了两个异步执行方法,一个没有返回值的 _customAsyncThreadPool_
,另一个拥有返回值 _otherAsyncThreadPool_
,这里需要注意的一点时,被 _@Async_
注解的方法可以接受任意类型参数,但只能返回void或Future类型数据。所以当异步方法返回数据时,需要使用Future包装异步任务结果,上述代码使用AsyncResult 包装异步任务结果,AsyncResult间接继承Future,是 Spring 提供的一个可用于追踪异步方法执行结果的包装类。其他常用的Future类型还有 Spring 4.2 提供的ListenableFuture,或者 JDK 8 提供的CompletableFuture,这些类型可提供更丰富的异步任务操作
如果前端需要获取耗时任务结果,则异步任务方法应当返回一个Future类型数据,此时Controller相关接口需要调用该Future的get()方法获取异步任务结果,get()方法是一个阻塞方法,因此该操作相当于将异步任务转换为同步任务,浏览器同样会面临我们前面所讲的转圈等待过程,但是异步执行还是有他的好处的,因为我们可以控制get()方法的调用时序,因此可以先执行其他一些操作后,最后再调用get()方法
第三步:在Controller中进行异步方法调用
@Slf4j
@RestController
@RequestMapping("/async")
public class AsyncController {
@Autowired
AsyncService asyncService;
@RequestMapping("/testAsyncThreadPool")
public String test3() throws InterruptedException {
asyncService.otherAsyncThreadPool();
asyncService.customAsyncThreadPool();
return "test async thread pool done";
}
}
通过访问http://localhost:8080/async/testAsyncThreadPool查看控制台日志:
2022-02-20 01:03:54.310 INFO 21032 --- [ custom-async-1] c.g.c.service.impl.AsyncServiceImpl : customAsyncThreadPool: Thread = custom-async-1
2022-02-20 01:03:55.319 INFO 21032 --- [ other-async-1] c.g.c.service.impl.AsyncServiceImpl : otherAsyncThreadPool: Thread = other-async-1
Postman上是一瞬间响应
通过日志可以看到:主线程不需要等待异步方法执行完成,减少了响应时间,提高了接口性能。
通过上面三步就可以在SpringBoot中欢乐的使用异步方法来提高接口性能了,是不是很简单?
但是,上面的代码忽略了一个最大的问题,就是给@Async
异步框架自定义线程池。
1.4 为什么要给@Async
自定义线程池?
默认情况下,Spring 会自动搜索相关线程池定义:要么是一个唯一TaskExecutor Bean 实例,要么是一个名称为taskExecutor的Executor Bean 实例。如果这两个 Bean 实例都不存在,就会使用SimpleAsyncTaskExecutor来异步执行被@Async注解的方法。
综上,可以知道,默认情况下,Spring 使用的 Executor 是SimpleAsyncTaskExecutor,SimpleAsyncTaskExecutor每次调用都会创建一个新的线程,不会重用之前的线程。很多时候,这种实现方式不符合我们的业务场景,因此通常我们都会自定义一个 Executor 来替换SimpleAsyncTaskExecutor
使用@Async
注解,在默认情况下用的是SimpleAsyncTaskExecutor
线程池,该线程池不是真正意义上的线程池。
使用此线程池无法实现线程重用,每次调用都会新建一条线程。若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError
错误,关键代码如下:
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;
//判断是否开启限流,默认为否
if (this.isThrottleActive() && startTimeout > 0L) {
//执行前置操作,进行限流
this.concurrencyThrottle.beforeAccess();
this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
} else {
//未限流的情况,执行线程任务
this.doExecute(taskToUse);
}
}
protected void doExecute(Runnable task) {
//不断创建线程
Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task);
thread.start();
}
//创建线程
public Thread createThread(Runnable runnable) {
//指定线程名,task-1,task-2...
Thread thread = new Thread(this.getThreadGroup(), runnable, this.nextThreadName());
thread.setPriority(this.getThreadPriority());
thread.setDaemon(this.isDaemon());
return thread;
}
也可以直接通过上面的控制台日志观察,每次打印的线程名都是[task-1]、[task-2]、[task-3]、[task-4]…..递增的。
正因如此,所以在使用Spring中的@Async
异步框架时一定要自定义线程池,替代默认的SimpleAsyncTaskExecutor
。
Spring提供了多种线程池:
SimpleAsyncTaskExecutor
:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。SyncTaskExecutor
:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地ConcurrentTaskExecutor
:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类ThreadPoolTaskScheduler
:可以使用cron表达式ThreadPoolTaskExecutor
:最常使用,推荐。其实质是对java.util.concurrent.ThreadPoolExecutor的包装
1.5 为@Async
实现一个自定义线程池
@Configuration
@EnableAsync
public class SyncConfiguration {
@Bean(name = "asyncPoolTaskExecutor")
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(10);
//线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
taskExecutor.setMaxPoolSize(100);
//缓存队列
taskExecutor.setQueueCapacity(50);
//许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
taskExecutor.setKeepAliveSeconds(200);
//异步方法内部线程名称
taskExecutor.setThreadNamePrefix("async-");
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
}
配置自定义线程池以后就可以大胆的使用@Async
提供的异步处理能力了。
1.6 多个线程池处理
在现实的互联网项目开发中,针对高并发的请求,一般的做法是高并发接口单独线程池隔离处理。
假设现在2个高并发接口:一个是修改用户信息接口,刷新用户redis缓存;一个是下订单接口,发送app push信息。往往会根据接口特征定义两个线程池,这时候在使用@Async
时就需要通过指定线程池名称进行区分。
为@Async
指定线程池名字
@SneakyThrows
@Async("asyncPoolTaskExecutor")
public void doTask1() {
long t1 = System.currentTimeMillis();
Thread.sleep(2000);
long t2 = System.currentTimeMillis();
log.info("task1 cost {} ms" , t2-t1);
}
当系统存在多个线程池时,也可以配置一个默认线程池,对于非默认的异步任务再通过@Async("otherTaskExecutor")
来指定线程池名称。
配置默认线程池
Spring 还提供了另一个功能更加强大的接口AsyncConfigurer,该接口主要是用于自定义一个Executor配置类,提供了应用层级Executor接口,以及对于@Async方法异常捕获功能。如果 Spring 检测到该接口实例,会优先采用该接口自定义的Executor
可以修改配置类让其实现AsyncConfigurer
,并重写getAsyncExecutor()
方法,指定默认线程池:
/**
* Springboot线程池定制配置
*
* @className: AsyncConfiguration
* @author GMF
* @date 2022/2/20
* @time 0:02
*/
@Slf4j
@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
@Bean(name = "asyncPoolTaskExecutor")
public ThreadPoolTaskExecutor customExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(2);
//线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
taskExecutor.setMaxPoolSize(10);
//缓存队列
taskExecutor.setQueueCapacity(50);
//许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
taskExecutor.setKeepAliveSeconds(200);
//异步方法内部线程名称
taskExecutor.setThreadNamePrefix("custom-async-");
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
@Bean(name = "otherTaskExecutor")
public ThreadPoolTaskExecutor defaultExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//核心线程数
taskExecutor.setCorePoolSize(2);
//线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
taskExecutor.setMaxPoolSize(10);
//缓存队列
taskExecutor.setQueueCapacity(50);
//许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
taskExecutor.setKeepAliveSeconds(200);
//异步方法内部线程名称
taskExecutor.setThreadNamePrefix("other-async-");
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
/**
* 指定默认线程池
*/
@Override
public Executor getAsyncExecutor() {
return customExecutor();
}
}
如下,customAsyncThreadPool()
方法使用默认使用线程池asyncPoolTaskExecutor
,otherAsyncThreadPool()
使用线程池otherTaskExecutor
,非常灵活
/**
* 测试使用SpringBoot默认线程池
*
* @throws InterruptedException
*/
@Async("otherTaskExecutor")
public void otherAsyncThreadPool() throws InterruptedException {
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
log.info("defaultAsyncThreadPool: Thread = {}", Thread.currentThread().getName());
}
/**
* 测试使用SpringBoot定制线程池
*
* @throws InterruptedException
*/
@Async
public void customAsyncThreadPool() throws InterruptedException {
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
log.info("customAsyncThreadPool: Thread = {}", Thread.currentThread().getName());
}
1.7 异常处理
前文介绍过,对于被 @Async 注解的异步方法,只能返回 void 或者 Future 类型。对于返回 Future 类型数据,如果异步任务方法抛出异常,则很容易进行处理,因为 Future.get() 会重新抛出该异常,我们只需对其进行捕获即可。但是对于返回 void 的异步任务方法,异常不会传播到被调用者线程,因此我们需要自定义一个额外的异步任务异常处理器,捕获异步任务方法抛出的异常
在 AsyncConfiguration 中补充配置:
/**
* 自定义处理异步方法抛出的未捕获异常的策略
*
* @return
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@SneakyThrows
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.error("线程池执行任务发送未知错误,执行方法:{},异常信息:{}", method.getName(), ex.getMessage());
//保存错误日志
File errorLog = new File("src/main/resources/error.log");
FileOutputStream fileOutputStream = new FileOutputStream(errorLog, true);
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
ex.printStackTrace(pw);
String sStackTrace = sw.toString(); // stack trace as a string
fileOutputStream.write(("线程池执行任务发送未知错误,执行方法:" + method.getName() + ",异常信息:" + "\n" + sStackTrace + "\n").getBytes(StandardCharsets.UTF_8));
fileOutputStream.close();
}
};
}
如果在执行异步任务时抛出异常:
/**
* 测试使用SpringBoot定制线程池
*
* @throws InterruptedException
*/
@Async
public void customAsyncThreadPool() throws InterruptedException {
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
log.info("customAsyncThreadPool: Thread = {}", Thread.currentThread().getName());
throw new RuntimeException("Asyn execution failed");
}
1.8 ⭐异步任务相关限制
被@Async注解的异步任务方法存在相关限制:
- 被@Async注解的方法必须是public的,这样方法才可以被代理
- 不能在同一个类中调用@Async方法,因为同一个类中调用会绕过方法代理,调用的是实际的方法
- 被@Async注解的方法不能是static
@Async不能用于被@Configuration注解的类方法上。- @Async注解不能与 Bean 对象的生命周期回调函数(比如@PostConstruct)一起注解到同一个方法中。解决方法可参考:Spring - The @Async annotation
- 异步类必须注入到 Spring IOC 容器中(也即异步类必须被@Component/@Service等进行注解)
- 其他类中使用异步类对象必须通过@Autowired等方式进行注入,不能手动new对象
二、SpringBoot线程池
2.1 常用的的线程池对象
jdk原生的两个常用线程池对象
ThreadPoolExecutor、ScheduledThreadPoolExecutor,后者继承前者,主要增加了任务调度相关的一些方法
springboot自动装配的两个常用线程池对象
如果引入了spring-boot-autoconfigure这个依赖,则会自动装配两个线程池对象ThreadPoolTaskExecutor,ThreadPoolTaskScheduler(参考org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration)分别对应jdk的两个线程池,是静态代理增强,故ThreadPoolTaskScheduler的api是最丰富的
2.2 ThreadPoolTaskScheduler核心api测试
@Autowired
private ThreadPoolTaskScheduler taskScheduler;
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
三、@Retry(SpringBoot重试机制)
@Retryable是Spring提供的可重试注解,为了使用spring提供的重试机制,需要做如下操作:
3.1 在pom文件中添加相应的依赖
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
3.2 在启动类或者方法所在的类上添加注解@EnableRetry
3.3 在需要重试的方法上添加注解@Retryable,示例如下:
@Retryable(maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5))
public Customer getCustomer(String customerId) {
if (true) {
JSONArray data = retObj.getJSONArray("data");
if (data != null && !data.isEmpty()) {
return data.toJavaList(Customer.class).get(0);
}
} else {
log.error("异常,{}", customerId);
throw new RuntimeException("获数据失败");
}
return null;
}
@Retryable注解中的参数说明:
- maxAttempts:最大重试次数,默认为3,如果要设置的重试次数为3,可以不写;
- value:抛出指定异常才会重试
- include:和value一样,默认为空,当exclude也为空时,默认所以异常
- exclude:指定不处理的异常
backoff:重试等待策略,默认使用@Backoff@Backoff的value默认为1000L,我们设置为2000L。
@Backoff注解中的参数说明:
value:隔多少毫秒后重试,默认为1000L,我们设置为3000L;
- delay:和value一样,但是默认为0;
- multiplier:(指定延迟倍数)默认为0,表示固定暂停1秒后进行重试,如果把multiplier设置为1.5,则第一次重试为2秒,第二次为3秒,第三次为4.5秒。
3.4 可以在指定方法上标记@Recover来开启重试失败后调用的方法(注意,需跟重处理方法在同一个类中)
@Recover :当重试到达指定次数时,被注解的方法将被回调,可以在该方法中进行日志处理。需要注意的是发生的异常和入参类型一致时才会回调
/**
* 记录抛出的异常
*
* @param exception 异常信息
*/
@Recover
public void recover(RuntimeException exception){
log.error("重试达到最大次数:异常信息:{}", exception.getMessage());
}