一、定义线程池
将创建好的线程池对象放入 taskExecutor 对象中
package com.wells.demo.customer.pool;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Description
* Created by wells on 2020-07-15 21:18:04
*/
@EnableAsync
@Configuration
public class ThreadPoolConfig {
@Bean(value = "taskExecutor")
public Executor createThreadPoolExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
二、定义CustomerAsyncController: 调用入口
package com.wells.demo.customer.controller;
import com.wells.demo.customer.service.CustomerAsyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Description 异步调用Service
* Created by wells on 2020-07-15 09:26:38
*/
@RestController
@RequestMapping(value = "customer/async")
public class CustomerAsyncController {
Logger logger = LoggerFactory.getLogger(CustomerAsyncController.class);
@Autowired
private CustomerAsyncService customerAsyncService;
@GetMapping(value = "/execute/task")
public String executeTask() throws InterruptedException {
logger.info("customer async request start");
long start = System.currentTimeMillis();
customerAsyncService.executeTaskOne();
customerAsyncService.executeTaskTwo();
long end = System.currentTimeMillis();
logger.info("customer async request end, cost:{}", (end - start));
return "success";
}
}
三、定义Service
package com.wells.demo.customer.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* Description
* Created by wells on 2020-07-15 09:36:29
*/
@Service
public class CustomerAsyncService {
Logger logger = LoggerFactory.getLogger(CustomerAsyncService.class);
@Async(value = "taskExecutor")
public void executeTaskOne() throws InterruptedException {
logger.info("executeTaskOne request start");
TimeUnit.SECONDS.sleep(10);
logger.info("executeTaskOne request end");
}
@Async(value = "taskExecutor")
public void executeTaskTwo() throws InterruptedException {
logger.info("executeTaskTwo request start");
TimeUnit.SECONDS.sleep(10);
logger.info("executeTaskTwo request end");
}
}
通过 Controller 提供的 url测试,观察日志
四、优雅关闭线程池
setAwaitTerminationSeconds
package com.wells.demo.customer.pool;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Description
* Created by wells on 2020-07-15 21:18:04
*/
@EnableAsync
@Configuration
public class ThreadPoolConfig {
@Bean(value = "taskExecutor")
public Executor createThreadPoolExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-");
// 设置线程池关闭时间
executor.setAwaitTerminationSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}