背景
需求: 项目二期,需要对某一张表的字段进行扩展,为了兼容二期业务代码逻辑,现需要将一起数据,进行统一计算补充该字段,满足二期逻辑
数据量:50w
解决方案
二期上线初,使用lts -JOB定时任务,统一批量处理老数据,代码实现思想:考虑到,数据量比较大,同时原有系统中已经存在比较多的抛弃任务,故此次处理数据任务,为批量查询数据库,使用单独线程池配合有边界队列ArrayBlockingQueue提高处理数据的效率
ThreadPoolTaskExecutor 参数
ThreadPoolTaskExecutor通常通过XML方式配置,或者通过Executors的工厂方法进行配置。
XML方式配置代码如下:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><!-- 线程池维护线程的最少数量 --><property name="corePoolSize" value="5" /><!-- 允许的空闲时间 --><property name="keepAliveSeconds" value="200" /><!-- 线程池维护线程的最大数量 --><property name="maxPoolSize" value="20" /><!-- 缓存队列 --><property name="queueCapacity" value="20" /><!-- 对拒绝task的处理策略 --><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /></property></bean>
注意:rejectedExecutionHandler字段用于配置拒绝策略,常用的拒绝策略如下:
- AbortPolicy:用于被拒绝任务的处理程序,它将抛出
RejectedExecutionException - CallerRunsPolicy:用于被拒绝任务的处理程序,它直接在execute方法的调用线程中运行被拒绝的任务
- DiscardOldestPolicy:用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试
execute。 - DiscardPolicy:用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务
自定义策略,只需要实现RejectedExecutionHandler接口即可
执行任务两种方式
- 无返回值的任务使用
**execute(Runnable)** - 有返回值的任务使用
**submit(Runnable)**
任务处理流程
简单总结一句话:
提交任务,线程池中的线程数可以增长至corePoolSize,之后继续提交任务将暂存至队列中,如果队列满,则看是否能继续增长线程数至maximumPoolSize,超出后将进行拒绝策略处理。显然,如果采用×××队列,那么maximumPoolSize将失效,线程池中的线程最多就是corePoolSize个线程工作。

关闭线程池
调用shutdown或者shutdownNow,两者都不会接受新的任务,而且通过调用要停止线程的interrupt方法来中断线程,有可能线程永远不会被中断,不同之处在于shutdownNow会首先将线程池的状态设置为STOP,然后尝试停止所有线程(有可能导致部分任务没有执行完)然后返回未执行任务的列表。而shutdown则只是将线程池的状态设置为shutdown,然后中断所有没有执行任务的线程,并将剩余的任务执行完
一般少有使用
监控线程池状态
常用状态:
- taskCount:线程需要执行的任务个数。
- completedTaskCount:线程池在运行过程中已完成的任务数。
- largestPoolSize:线程池曾经创建过的最大线程数量。
- getPoolSize:获取当前线程池的线程数量。
- getActiveCount:获取活动的线程的数量
通过继承线程池,重写beforeExecute,afterExecute和terminated方法来在线程执行任务前,线程执行任务结束,和线程终结前获取线程的运行情况,根据具体情况调整线程池的线程数量。
代码实现
xml文件
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><!-- 线程池维护线程的最少数量 --><property name="corePoolSize" value="5" /><!-- 允许的空闲时间 --><property name="keepAliveSeconds" value="200" /><!-- 线程池维护线程的最大数量 --><property name="maxPoolSize" value="20" /><!-- 缓存队列 --><property name="queueCapacity" value="20" /><!-- 对拒绝task的处理策略 --><property name="rejectedExecutionHandler"><bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /></property></bean>
配置类
import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;@Configurationpublic class JobRunnerConfig {// @Value("${test.statistical.SavedSum.corePoolSize}")private int corePoolSize = 5;//参数可在配置中心配置//@Value("${test.statistical.SavedSum.maxPoolSize}")private int maxPoolSize = 10;/*** 使用线程池 statisticalSavedSumForJob* @return TaskExecutor*/@Beanpublic ThreadPoolTaskExecutor statisticalSavedSumTaskExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(maxPoolSize);executor.setKeepAliveSeconds(600);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setQueueCapacity(0);executor.setThreadNamePrefix("付费会员 统计一期服务单已省金额-");//线程前缀return executor;}}
业务实现类
@Service("testService")public class testServiceImpl implements testService {//注入线程池 注意通过名称查找@Resourceprivate ThreadPoolTaskExecutor statisticalSavedSumTaskExecutor;//@Value("${test.statistical.SavedSum.pageSize}")private int pageSize = 100;//@Value("${test.statistical.SavedSum.threadNum}")private int threadNum = 5;//可配置在配置中心 /配置文件中 @Value("${test.statistical.SavedSum.queueCapacity}")private static int queueCapacity = 1000;private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(queueCapacity);@Overridepublic void statisticalSavedSum() {try {//分页查询所有的数据 这里直接创建一个集合不做查询了List<String> testList = new ArrayList<>()if (testList.size() == 0 || testList.isEmpty()) {return;}for (String str : testList) {//遍历查询出的数据放到队列中queue.offer(str, 5, TimeUnit.SECONDS);}//统计执行失败的任务List<Future<Integer>> failList = new ArrayList<>();for (int i = 0; i < threadNum; i++) {Future<Integer> failCount = statisticalSavedSumTaskExecutor.submit(new TestStatisticalSavedSumThread(memberQueue)); //TestStatisticalSavedSumThread真正执行的类failList.add(failCount);}int count;count = failList.stream().mapToInt(this::getCount).sum();logger.warn("统计失败数量:{}", count);} catch (InterruptedException e) {logger.error("发生中断异常:", e);}}private int getCount(Future<Integer> fail) {int n = 0;try {n = fail.get();} catch (InterruptedException e) {logger.error("发生中断异常:", e);} catch (ExecutionException e) {logger.error(" 线程返回结果异常:", e);}return n;}}
Thread执行类
public class TestStatisticalSavedSumThread implements Callable<Integer> {//队列private BlockingQueue<String> memberQueue;//有参构造public MemberStatisticalSavedSumThread(BlockingQueue<String> memberQueue) {this.memberQueue = memberQueue;}@Overridepublic Integer Call() throws Exception {int failCount = 0;try {//无限循环获取队列中的数据处理while (memberQueue.size() > 0) {// ArrayBlockingQueue 如果队列满了,或者是空的,那么都会在执行操作的时候,阻塞住// 指定时间内获取不到,退出监听String str = memberQueue.poll(5, TimeUnit.SECONDS);if (Optional.ofNullable(str).isPresent()) {//再次执行真正的业务方法}}} catch (Exception e) {Thread.currentThread().interrupt();}return failCount;}}
批量多线程高效处理任务
