配置
- corePoolSize : 核心线程数,一旦创建将不会再释放。
- maximumPoolSize : 最大线程数,允许创建的最大线程数量。如果最大线程数等于核心线程数,则无法创建非核心线程;如果非核心线程处于空闲时,超过设置的空闲时间,则将被回收,释放占用的资源。
- keepAliveSeconds : 也就是当线程空闲时,所允许保存的最大时间,超过这个时间,线程将被释放销毁,但只针对于非核心线程。
- handler : 当线程边界和队列容量已经达到最大时,用于处理阻塞时的程序
示例:
package com.greatonce.tuya.biz.impl;
@EnableCaching
@Configuration
public class BizConfiguration {
@Value("${tuya.biz.task.corePoolSize:10}")
private int corePoolSize;
@Value("${tuya.biz.task.maxPoolSize:20}")
private int maxPoolSize;
@Value("${tuya.biz.task.queueCapacity:50}")
private int queueCapacity;
@Value("${tuya.biz.task.keepAliveSeconds:60}")
private int keepAliveSeconds;
@Bean
public ThreadPoolExecutorFactoryBean bizExecutor() {
ThreadPoolExecutorFactoryBean factoryBean = new ThreadPoolExecutorFactoryBean();
// 核心线程数,一直存活,图鸦中设置为1
factoryBean.setCorePoolSize(corePoolSize);
// 当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。
// 如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
// 图鸦中设置为10
factoryBean.setMaxPoolSize(maxPoolSize);
// 任务队列容量,图鸦中设置为20
factoryBean.setQueueCapacity(queueCapacity);
// 当线程空闲时间达到setKeepAliveSeconds,该线程会退出,直到线程数量等于corePoolSize。
// 图鸦中设置为60
factoryBean.setKeepAliveSeconds(keepAliveSeconds);
factoryBean.setThreadNamePrefix("biz-task");
//(1) 默认的ThreadPoolExecutor.AbortPolicy 处理程序遭到拒绝将抛出运行时RejectedExecutionException;
//(2) ThreadPoolExecutor.CallerRunsPolicy 线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度
//(3) ThreadPoolExecutor.DiscardPolicy 不能执行的任务将被删除;
//(4) ThreadPoolExecutor.DiscardOldestPolicy 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
factoryBean.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return factoryBean;
}
使用
CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
示例:
@Autowired
private ExecutorService executorService;
/**
* 立即投放
*/
@Override
public void goPublish(PublishProduct publishProduct) {
Store store = storeService.getByKey(publishProduct.getStoreId());
String publishSetting = publishProduct.getPublishSetting();
List<PublishProductImage> publishProductImages = publishProduct.getPublishProductImages();
CountDownLatch countDownLatch = new CountDownLatch(publishProductImages.size());
publishProductImages.forEach(publishProductImage -> {
executorService.execute(() -> {
try {
publishProductImageService.publish(store, publishProductImage, publishSetting);
} catch (Exception e) {
publishLOGGER.error("投放图片失败", e);
} finally {
countDownLatch.countDown();
}
});
});
try {
countDownLatch.await();
publishAfter(publishProduct);
} catch (InterruptedException e) {
publishLOGGER.error("服务异常中断", e);
}
}
@Async
在Spring中,基于@Async标注的方法会异步执行, 使用方式如下:
- 启用异步, 在启动类上标注@EnableAsync
配置对应的业务线程池, 示例代码如下
/**
* 默认的业务线程池.
*/
@Bean
public AsyncTaskExecutor bizExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(bizCorePoolSize);
executor.setMaxPoolSize(bizMaxPoolSize);
executor.setQueueCapacity(bizQueueCapacity);
executor.setThreadNamePrefix("oms-biz-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setKeepAliveSeconds(bizKeepAliveSeconds);
return executor;
}
使用注解并指定线程池, 示例代码如下
@Override
@Async("bizExecutor")
public void asyncDownload(ProductMallMappingDownloadBO downloadBO) {
BizContext.setNickname(downloadBO.getOperator());
Store store = storeService.getByKey(downloadBO.getStoreId());
downloadBO.setStore(store);
download(downloadBO);
BIZ_LOGGER.log("铺货下载", "参数:{}", JsonUtil.toJson(downloadBO));
}