1. 介绍
@Async 注解在 service 内的方法上(@EnableAsync开启异步),可以实现在 controller 的异步调用,调用的被 @Async 注解的方法会在一个单独线程内运行,适合即使返回,异步解耦,service 慢慢去处理
@Async 注解的方法只能 返回 void
或者 future
类型的返回值,其他值会使 注解无效,因为不能异步执行
被 @Async
的方法在独立线程调用,不能被 @ControllerAdvice
全局异常处理器捕获,所以需要自己设置异常处理
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* @description:
* @author: qinyu
* @Date: 2019/8/22 21:09
*/
@Configuration
public class ThreadAsyncConfiguration implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//设置核心线程数
threadPool.setCorePoolSize(10);
//设置最大线程数
threadPool.setMaxPoolSize(100);
//线程池所使用的缓冲队列
threadPool.setQueueCapacity(10);
//等待任务在关机时完成--表明等待所有线程执行完
threadPool.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
threadPool.setAwaitTerminationSeconds(60);
// 线程名称前缀s
threadPool.setThreadNamePrefix("MyAsync-");
// 初始化线程
threadPool.initialize();
return threadPool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SpringAsyncExceptionHandler();
}
}
SpringAsyncExceptionHandler 类
@Slf4j
public class SpringAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
log.error("Exception occurs in async method", throwable.getMessage());
}
}
2. 使用实例一
springboot 使用异步注解 @Async
@Component
public class AsyncExceptionDemo {
private static final Logger log = LoggerFactory.getLogger(AsyncExceptionDemo.class);
/**
* 最简单的异步调用,返回值为void
*/
@Async
public void asyncInvokeSimplest() {
log.info("asyncSimplest");
}
/**
* 带参数的异步调用 异步方法可以传入参数
* 对于返回值是void,异常会被AsyncUncaughtExceptionHandler处理掉
* @param s
*/
@Async
public void asyncInvokeWithException(String s) {
log.info("asyncInvokeWithParameter, parementer={}", s);
throw new IllegalArgumentException(s);
}
/**
* 异常调用返回Future
* 对于返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
* 或者在调用方在调用Futrue.get时捕获异常进行处理
*
* @param i
* @return
*/
@Async
public Future<String> asyncInvokeReturnFuture(int i) {
log.info("asyncInvokeReturnFuture, parementer={}", i);
Future<String> future;
try {
Thread.sleep(1000 * 1);
future = new AsyncResult<String>("success:" + i);
throw new IllegalArgumentException("a");
} catch (InterruptedException e) {
future = new AsyncResult<String>("error");
} catch(IllegalArgumentException e){
future = new AsyncResult<String>("error-IllegalArgumentException");
}
return future;
}
}
启动异步调用
@SpringBootApplication
@EnableAsync // 启动异步调用
public class AsyncApplicationWithAsyncConfigurer {
private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithAsyncConfigurer.class);
public static void main(String[] args) {
log.info("Start AsyncApplication.. ");
SpringApplication.run(AsyncApplicationWithAsyncConfigurer.class, args);
}
}
3. 结合 CountDownLatch 异步获取结果
调用
@Override
public Result queryQuestionUseCountData(List<String> queIdList) throws ServiceException {
try {
if (ListUtils.isEmpty(queIdList)) {
throw new Exception("试题id不能为空");
}
CountDownLatch countDownLatch = new CountDownLatch(3);
//查询相似题数量
Future<Map> relativeQuestionCountMapFuture = questionAsyncService.queryRelativeQuestionCount(queIdList, countDownLatch);
//查询试题使用数据
Future<List> questionStateCountListFuture = questionAsyncService.queryQuestionStateCountList(queIdList, countDownLatch);
// 查询网校使用数量
Future<List> onlineSchoolPaperNumberFuture = questionAsyncService.queryOnlineSchoolUseNumList(queIdList, countDownLatch);
try {
countDownLatch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new ServiceException("子线程等待异常");
}
// 封装试题使用数据返回结果
Map<String, QueUseCountVo> returnMap = packageQuestionUseCountDataResult(queIdList, relativeQuestionCountMapFuture.get(), questionStateCountListFuture.get(), onlineSchoolPaperNumberFuture.get());
// 根据配置字参数对使用/作答/正确率三个参数的显示进行过滤
filterQuestionCountData(returnMap);
return Result.buildSuccessResult(returnMap);
} catch (Exception e) {
log.error("查询试题使用数据失败.", e);
throw new ServiceException(e);
}
}
结合 @Async
异步方法
/**
* @description: @Async 标注的方法和调用者不能在同一个类中,会产生代理绕过问题,因此创建此类放置异步方法
* @author: zcq
* @date: 2020/8/4 4:43 下午
*/
@Service
@Slf4j
public class QuestionServiceAsyncImpl implements IQuestionAsyncService {
@Autowired
private DataCenterRestClient dataCenterRestClient;
@Autowired
private EsRestClient esRestClient;
@Autowired
private EpAnalyRestClient epAnalyRestClient;
/**
* @description: 查询相似题数量
* @author: zcq
* @date: 2020/8/3 11:16 上午
*/
@Override
@Async
public Future<Map> queryRelativeQuestionCount(List<String> queIdList, CountDownLatch countDownLatch) throws Exception {
Map<String, Object> relativeQuestionCountMap = new HashMap<>();
try {
Result relativeQuestionCountResult = dataCenterRestClient.queryRelativeQuestionCount(queIdList);
if (relativeQuestionCountResult.getSuccess()) {
relativeQuestionCountMap = (Map<String, Object>) relativeQuestionCountResult.getData();
}
return new AsyncResult(relativeQuestionCountMap);
} catch (RestClientException e) {
log.error("通过DataCenterRestClient 查询相似题数量失败.", e);
throw new Exception(e);
} finally {
countDownLatch.countDown();
}
}
/**
* @description: 查询试题使用数据
* @author: zcq
* @date: 2020/8/3 11:16 上午
*/
@Override
@Async
public Future<List> queryQuestionStateCountList(List<String> queIdList, CountDownLatch countDownLatch) throws Exception {
List<QueUseCountVo> questionStateCountList = new ArrayList<>();
try {
Result questionStateCountResult = esRestClient.queryQuestionState(queIdList);
if (questionStateCountResult.getSuccess() && questionStateCountResult.getData() != null) {
questionStateCountList = JSON.parseArray(JSON.toJSONString(questionStateCountResult.getData()), QueUseCountVo.class);
}
return new AsyncResult(questionStateCountList);
} catch (RestClientException e) {
log.error("通过EsRestClient 获取试题统计信息失败.", e);
throw new Exception(e);
} finally {
countDownLatch.countDown();
}
}
/**
* @description: 查询试题网校的使用数据
* @author: zcq
* @date: 2020/8/3 11:16 上午
*/
@Override
@Async
public Future<List> queryOnlineSchoolUseNumList(List<String> queIdList, CountDownLatch countDownLatch) throws Exception {
List<QueUseCountVo> onlineSchoolUseNumList = new ArrayList<>();
try {
Result questionWxUseCountResult = epAnalyRestClient.queryOnlineSchoolUseNumList(queIdList);
if (questionWxUseCountResult.getSuccess() && questionWxUseCountResult.getData() != null) {
onlineSchoolUseNumList = JSON.parseArray(JSON.toJSONString(questionWxUseCountResult.getData()), QueUseCountVo.class);
}
return new AsyncResult(onlineSchoolUseNumList);
} catch (RestClientException e) {
log.error("调用接口查询网校试题使用数据失败.", e);
throw new Exception(e);
} finally {
countDownLatch.countDown();
}
}
}
注意:
@Async 异步方法不要和同步调用方法写在同一个类中,即异步方法和调用者不要再同一个类中,会产生代理绕过问题。