一、SpringBoot异步任务

1.1 为什么要用异步框架,它解决什么问题?

在SpringBoot的日常开发中,一般都是同步调用的。但实际中有很多场景非常适合使用异步来处理,如:注册新用户,送100个积分;或下单成功,发送push消息等等。
就拿注册新用户这个用例来说,为什么要异步处理?

  • 第一个原因:容错性、健壮性,如果送积分出现异常,不能因为送积分而导致用户注册失败;因为用户注册是主要功能,送积分是次要功能,即使送积分异常也要提示用户注册成功,然后后面在针对积分异常做补偿处理。
  • 第二个原因:提升性能,例如注册用户花了20毫秒,送积分花费50毫秒,如果用同步的话,总耗时70毫秒,用异步的话,无需等待积分,故耗时20毫秒。

故,异步能解决2个问题,性能和容错性。

1.2 SpringBoot如何实现异步调用?

对于异步方法调用,从Spring3开始提供了@Async注解,只需要在方法上标注此注解,此方法即可实现异步调用。
当然,还需要一个配置类,通过Enable模块驱动注解@EnableAsync 来开启异步功能。

注:@Async 注解通常用在方法上,但是也可以用作类型上,当类被 @Async 注解时,表示该类中所有的方法都是异步执行的。

1.3实现异步调用

第一步:新建配置类,开启@Async功能支持

使用@EnableAsync来开启异步任务支持,@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。这里选择使用单独的配置类AsyncConfiguration

  1. @Configuration
  2. @EnableAsync
  3. public class AsyncConfiguration {
  4. }

第二步:在方法上标记异步调用

增加一个 AsyncServiceImpl 类,用来进行业务处理,同时添加 @Async 注解,代表该方法为异步处理

  1. @Slf4j
  2. @Service
  3. public class AsyncServiceImpl implements AsyncService {
  4. /**
  5. * 测试使用SpringBoot默认线程池
  6. *
  7. * @return Future
  8. * @throws InterruptedException
  9. */
  10. @Override
  11. @Async("otherTaskExecutor")
  12. public Future<String> otherAsyncThreadPool() throws InterruptedException {
  13. Thread.sleep(TimeUnit.SECONDS.toMillis(3));
  14. log.info("otherAsyncThreadPool: Thread = {}", Thread.currentThread().getName());
  15. return new AsyncResult<>("Test Async Thread Pool Done");
  16. }
  17. /**
  18. * 测试使用SpringBoot定制线程池
  19. *
  20. * @throws InterruptedException
  21. */
  22. @Async
  23. @Override
  24. public void customAsyncThreadPool() throws InterruptedException {
  25. Thread.sleep(TimeUnit.SECONDS.toMillis(2));
  26. log.info("customAsyncThreadPool: Thread = {}", Thread.currentThread().getName());
  27. }
  28. }

上述代码使用 _@Async_ 标记了两个异步执行方法,一个没有返回值的 _customAsyncThreadPool_,另一个拥有返回值 _otherAsyncThreadPool_,这里需要注意的一点时,被 _@Async_ 注解的方法可以接受任意类型参数,但只能返回void或Future类型数据。所以当异步方法返回数据时,需要使用Future包装异步任务结果,上述代码使用AsyncResult 包装异步任务结果,AsyncResult间接继承Future,是 Spring 提供的一个可用于追踪异步方法执行结果的包装类。其他常用的Future类型还有 Spring 4.2 提供的ListenableFuture,或者 JDK 8 提供的CompletableFuture,这些类型可提供更丰富的异步任务操作

如果前端需要获取耗时任务结果,则异步任务方法应当返回一个Future类型数据,此时Controller相关接口需要调用该Future的get()方法获取异步任务结果,get()方法是一个阻塞方法,因此该操作相当于将异步任务转换为同步任务,浏览器同样会面临我们前面所讲的转圈等待过程,但是异步执行还是有他的好处的,因为我们可以控制get()方法的调用时序,因此可以先执行其他一些操作后,最后再调用get()方法

第三步:在Controller中进行异步方法调用

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("/async")
  4. public class AsyncController {
  5. @Autowired
  6. AsyncService asyncService;
  7. @RequestMapping("/testAsyncThreadPool")
  8. public String test3() throws InterruptedException {
  9. asyncService.otherAsyncThreadPool();
  10. asyncService.customAsyncThreadPool();
  11. return "test async thread pool done";
  12. }
  13. }

通过访问http://localhost:8080/async/testAsyncThreadPool查看控制台日志:

  1. 2022-02-20 01:03:54.310 INFO 21032 --- [ custom-async-1] c.g.c.service.impl.AsyncServiceImpl : customAsyncThreadPool: Thread = custom-async-1
  2. 2022-02-20 01:03:55.319 INFO 21032 --- [ other-async-1] c.g.c.service.impl.AsyncServiceImpl : otherAsyncThreadPool: Thread = other-async-1

Postman上是一瞬间响应
image.png

通过日志可以看到:主线程不需要等待异步方法执行完成,减少了响应时间,提高了接口性能。
通过上面三步就可以在SpringBoot中欢乐的使用异步方法来提高接口性能了,是不是很简单?
但是,上面的代码忽略了一个最大的问题,就是给@Async异步框架自定义线程池。

1.4 为什么要给@Async自定义线程池?

默认情况下,Spring 会自动搜索相关线程池定义:要么是一个唯一TaskExecutor Bean 实例,要么是一个名称为taskExecutor的Executor Bean 实例。如果这两个 Bean 实例都不存在,就会使用SimpleAsyncTaskExecutor来异步执行被@Async注解的方法。
综上,可以知道,默认情况下,Spring 使用的 Executor 是SimpleAsyncTaskExecutor,SimpleAsyncTaskExecutor每次调用都会创建一个新的线程,不会重用之前的线程。很多时候,这种实现方式不符合我们的业务场景,因此通常我们都会自定义一个 Executor 来替换SimpleAsyncTaskExecutor

使用@Async注解,在默认情况下用的是SimpleAsyncTaskExecutor线程池,该线程池不是真正意义上的线程池。
使用此线程池无法实现线程重用,每次调用都会新建一条线程。若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误,关键代码如下:

  1. public void execute(Runnable task, long startTimeout) {
  2. Assert.notNull(task, "Runnable must not be null");
  3. Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;
  4. //判断是否开启限流,默认为否
  5. if (this.isThrottleActive() && startTimeout > 0L) {
  6. //执行前置操作,进行限流
  7. this.concurrencyThrottle.beforeAccess();
  8. this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
  9. } else {
  10. //未限流的情况,执行线程任务
  11. this.doExecute(taskToUse);
  12. }
  13. }
  14. protected void doExecute(Runnable task) {
  15. //不断创建线程
  16. Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task);
  17. thread.start();
  18. }
  19. //创建线程
  20. public Thread createThread(Runnable runnable) {
  21. //指定线程名,task-1,task-2...
  22. Thread thread = new Thread(this.getThreadGroup(), runnable, this.nextThreadName());
  23. thread.setPriority(this.getThreadPriority());
  24. thread.setDaemon(this.isDaemon());
  25. return thread;
  26. }

也可以直接通过上面的控制台日志观察,每次打印的线程名都是[task-1]、[task-2]、[task-3]、[task-4]…..递增的。
正因如此,所以在使用Spring中的@Async异步框架时一定要自定义线程池,替代默认的SimpleAsyncTaskExecutor
Spring提供了多种线程池:

  • SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。
  • SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地
  • ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
  • ThreadPoolTaskScheduler:可以使用cron表达式
  • ThreadPoolTaskExecutor :最常使用,推荐。其实质是对java.util.concurrent.ThreadPoolExecutor的包装

1.5 为@Async实现一个自定义线程池

@Configuration
@EnableAsync
public class SyncConfiguration {
    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //核心线程数
        taskExecutor.setCorePoolSize(10);
        //线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        taskExecutor.setMaxPoolSize(100);
        //缓存队列
        taskExecutor.setQueueCapacity(50);
        //许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
        taskExecutor.setKeepAliveSeconds(200);
        //异步方法内部线程名称
        taskExecutor.setThreadNamePrefix("async-");
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}

配置自定义线程池以后就可以大胆的使用@Async提供的异步处理能力了。

1.6 多个线程池处理

在现实的互联网项目开发中,针对高并发的请求,一般的做法是高并发接口单独线程池隔离处理。
假设现在2个高并发接口:一个是修改用户信息接口,刷新用户redis缓存;一个是下订单接口,发送app push信息。往往会根据接口特征定义两个线程池,这时候在使用@Async时就需要通过指定线程池名称进行区分。

@Async指定线程池名字

@SneakyThrows
@Async("asyncPoolTaskExecutor")
public void doTask1() {
    long t1 = System.currentTimeMillis();
    Thread.sleep(2000);
    long t2 = System.currentTimeMillis();
    log.info("task1 cost {} ms" , t2-t1);
}

当系统存在多个线程池时,也可以配置一个默认线程池,对于非默认的异步任务再通过@Async("otherTaskExecutor")来指定线程池名称。

配置默认线程池

Spring 还提供了另一个功能更加强大的接口AsyncConfigurer,该接口主要是用于自定义一个Executor配置类,提供了应用层级Executor接口,以及对于@Async方法异常捕获功能。如果 Spring 检测到该接口实例,会优先采用该接口自定义的Executor
可以修改配置类让其实现AsyncConfigurer,并重写getAsyncExecutor()方法,指定默认线程池:

/**
 * Springboot线程池定制配置
 *
 * @className: AsyncConfiguration
 * @author GMF
 * @date 2022/2/20
 * @time 0:02
*/
@Slf4j
@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor customExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //核心线程数
        taskExecutor.setCorePoolSize(2);
        //线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        taskExecutor.setMaxPoolSize(10);
        //缓存队列
        taskExecutor.setQueueCapacity(50);
        //许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
        taskExecutor.setKeepAliveSeconds(200);
        //异步方法内部线程名称
        taskExecutor.setThreadNamePrefix("custom-async-");
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean(name = "otherTaskExecutor")
    public ThreadPoolTaskExecutor defaultExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //核心线程数
        taskExecutor.setCorePoolSize(2);
        //线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        taskExecutor.setMaxPoolSize(10);
        //缓存队列
        taskExecutor.setQueueCapacity(50);
        //许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
        taskExecutor.setKeepAliveSeconds(200);
        //异步方法内部线程名称
        taskExecutor.setThreadNamePrefix("other-async-");
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }

    /**
     * 指定默认线程池
     */
    @Override
    public Executor getAsyncExecutor() {
        return customExecutor();
    }
}

如下,customAsyncThreadPool()方法使用默认使用线程池asyncPoolTaskExecutorotherAsyncThreadPool()使用线程池otherTaskExecutor,非常灵活

/**
 * 测试使用SpringBoot默认线程池
 *
 * @throws InterruptedException
 */
@Async("otherTaskExecutor")
public void otherAsyncThreadPool() throws InterruptedException {
    Thread.sleep(TimeUnit.SECONDS.toMillis(3));
    log.info("defaultAsyncThreadPool: Thread = {}", Thread.currentThread().getName());
}

/**
 * 测试使用SpringBoot定制线程池
 *
 * @throws InterruptedException
 */
@Async
public void customAsyncThreadPool() throws InterruptedException {
    Thread.sleep(TimeUnit.SECONDS.toMillis(2));
    log.info("customAsyncThreadPool: Thread = {}", Thread.currentThread().getName());
}

1.7 异常处理

前文介绍过,对于被 @Async 注解的异步方法,只能返回 void 或者 Future 类型。对于返回 Future 类型数据,如果异步任务方法抛出异常,则很容易进行处理,因为 Future.get() 会重新抛出该异常,我们只需对其进行捕获即可。但是对于返回 void 的异步任务方法,异常不会传播到被调用者线程,因此我们需要自定义一个额外的异步任务异常处理器,捕获异步任务方法抛出的异常

AsyncConfiguration 中补充配置:

/**
 * 自定义处理异步方法抛出的未捕获异常的策略
 *
 * @return
 */
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return new AsyncUncaughtExceptionHandler() {
        @SneakyThrows
        @Override
        public void handleUncaughtException(Throwable ex, Method method, Object... params) {
            log.error("线程池执行任务发送未知错误,执行方法:{},异常信息:{}", method.getName(), ex.getMessage());

            //保存错误日志
            File errorLog = new File("src/main/resources/error.log");
            FileOutputStream fileOutputStream = new FileOutputStream(errorLog, true);
            StringWriter sw = new StringWriter();
            PrintWriter pw = new PrintWriter(sw);
            ex.printStackTrace(pw);
            String sStackTrace = sw.toString(); // stack trace as a string
            fileOutputStream.write(("线程池执行任务发送未知错误,执行方法:" + method.getName() + ",异常信息:" + "\n" + sStackTrace + "\n").getBytes(StandardCharsets.UTF_8));
            fileOutputStream.close();
        }
    };
}

如果在执行异步任务时抛出异常:

/**
 * 测试使用SpringBoot定制线程池
 *
 * @throws InterruptedException
 */
@Async
public void customAsyncThreadPool() throws InterruptedException {
    Thread.sleep(TimeUnit.SECONDS.toMillis(2));
    log.info("customAsyncThreadPool: Thread = {}", Thread.currentThread().getName());
    throw new RuntimeException("Asyn execution failed");
}

1.8 ⭐异步任务相关限制

被@Async注解的异步任务方法存在相关限制:

  • 被@Async注解的方法必须是public的,这样方法才可以被代理
  • 不能在同一个类中调用@Async方法,因为同一个类中调用会绕过方法代理,调用的是实际的方法
  • 被@Async注解的方法不能是static
  • @Async不能用于被@Configuration注解的类方法上。
  • @Async注解不能与 Bean 对象的生命周期回调函数(比如@PostConstruct)一起注解到同一个方法中。解决方法可参考:Spring - The @Async annotation
  • 异步类必须注入到 Spring IOC 容器中(也即异步类必须被@Component/@Service等进行注解)
  • 其他类中使用异步类对象必须通过@Autowired等方式进行注入,不能手动new对象

二、SpringBoot线程池

2.1 常用的的线程池对象

jdk原生的两个常用线程池对象

ThreadPoolExecutor、ScheduledThreadPoolExecutor,后者继承前者,主要增加了任务调度相关的一些方法

springboot自动装配的两个常用线程池对象

如果引入了spring-boot-autoconfigure这个依赖,则会自动装配两个线程池对象ThreadPoolTaskExecutor,ThreadPoolTaskScheduler(参考org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration)分别对应jdk的两个线程池,是静态代理增强,故ThreadPoolTaskScheduler的api是最丰富的

2.2 ThreadPoolTaskScheduler核心api测试

@Autowired
private ThreadPoolTaskScheduler taskScheduler;
@Autowired
private ThreadPoolTaskExecutor taskExecutor;

三、@Retry(SpringBoot重试机制)

@Retryable是Spring提供的可重试注解,为了使用spring提供的重试机制,需要做如下操作:

3.1 在pom文件中添加相应的依赖

<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
    <version>1.2.2.RELEASE</version>
</dependency>

3.2 在启动类或者方法所在的类上添加注解@EnableRetry

3.3 在需要重试的方法上添加注解@Retryable,示例如下:

@Retryable(maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5))
public Customer getCustomer(String customerId) {
        if (true) {
            JSONArray data = retObj.getJSONArray("data");
            if (data != null && !data.isEmpty()) {
                return data.toJavaList(Customer.class).get(0);
            }
        } else {
            log.error("异常,{}", customerId);
            throw new RuntimeException("获数据失败");
        }
        return null;
}

@Retryable注解中的参数说明:
  • maxAttempts:最大重试次数,默认为3,如果要设置的重试次数为3,可以不写;
  • value:抛出指定异常才会重试
  • include:和value一样,默认为空,当exclude也为空时,默认所以异常
  • exclude:指定不处理的异常
  • backoff:重试等待策略,默认使用@Backoff@Backoff的value默认为1000L,我们设置为2000L。

    @Backoff注解中的参数说明:
  • value:隔多少毫秒后重试,默认为1000L,我们设置为3000L;

  • delay:和value一样,但是默认为0;
  • multiplier:(指定延迟倍数)默认为0,表示固定暂停1秒后进行重试,如果把multiplier设置为1.5,则第一次重试为2秒,第二次为3秒,第三次为4.5秒。

3.4 可以在指定方法上标记@Recover来开启重试失败后调用的方法(注意,需跟重处理方法在同一个类中)

@Recover :当重试到达指定次数时,被注解的方法将被回调,可以在该方法中进行日志处理。需要注意的是发生的异常和入参类型一致时才会回调

/**
 * 记录抛出的异常
 *
 * @param exception 异常信息
 */
@Recover
public void recover(RuntimeException exception){
    log.error("重试达到最大次数:异常信息:{}", exception.getMessage());
}