背景
需求: 项目二期,需要对某一张表的字段进行扩展,为了兼容二期业务代码逻辑,现需要将一起数据,进行统一计算补充该字段,满足二期逻辑
数据量:50w
解决方案
二期上线初,使用lts -JOB定时任务,统一批量处理老数据,代码实现思想:考虑到,数据量比较大,同时原有系统中已经存在比较多的抛弃任务,故此次处理数据任务,为批量查询数据库,使用单独线程池配合有边界队列ArrayBlockingQueue提高处理数据的效率
ThreadPoolTaskExecutor 参数
ThreadPoolTaskExecutor
通常通过XML
方式配置,或者通过Executors
的工厂方法进行配置。
XML方式配置代码如下:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 线程池维护线程的最少数量 -->
<property name="corePoolSize" value="5" />
<!-- 允许的空闲时间 -->
<property name="keepAliveSeconds" value="200" />
<!-- 线程池维护线程的最大数量 -->
<property name="maxPoolSize" value="20" />
<!-- 缓存队列 -->
<property name="queueCapacity" value="20" />
<!-- 对拒绝task的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
注意:rejectedExecutionHandler字段用于配置拒绝策略,常用的拒绝策略如下:
- AbortPolicy:用于被拒绝任务的处理程序,它将抛出
RejectedExecutionException
- CallerRunsPolicy:用于被拒绝任务的处理程序,它直接在execute方法的调用线程中运行被拒绝的任务
- DiscardOldestPolicy:用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试
execute
。 - DiscardPolicy:用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务
自定义策略,只需要实现RejectedExecutionHandler
接口即可
执行任务两种方式
- 无返回值的任务使用
**execute(Runnable)**
- 有返回值的任务使用
**submit(Runnable)**
任务处理流程
简单总结一句话:
提交任务,线程池中的线程数可以增长至corePoolSize,之后继续提交任务将暂存至队列中,如果队列满,则看是否能继续增长线程数至maximumPoolSize,超出后将进行拒绝策略处理。显然,如果采用×××队列,那么maximumPoolSize将失效,线程池中的线程最多就是corePoolSize个线程工作。
关闭线程池
调用shutdown或者shutdownNow,两者都不会接受新的任务,而且通过调用要停止线程的interrupt方法来中断线程,有可能线程永远不会被中断,不同之处在于shutdownNow会首先将线程池的状态设置为STOP,然后尝试停止所有线程(有可能导致部分任务没有执行完)然后返回未执行任务的列表。而shutdown则只是将线程池的状态设置为shutdown,然后中断所有没有执行任务的线程,并将剩余的任务执行完
一般少有使用
监控线程池状态
常用状态:
- taskCount:线程需要执行的任务个数。
- completedTaskCount:线程池在运行过程中已完成的任务数。
- largestPoolSize:线程池曾经创建过的最大线程数量。
- getPoolSize:获取当前线程池的线程数量。
- getActiveCount:获取活动的线程的数量
通过继承线程池,重写beforeExecute,afterExecute和terminated方法来在线程执行任务前,线程执行任务结束,和线程终结前获取线程的运行情况,根据具体情况调整线程池的线程数量。
代码实现
xml文件
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 线程池维护线程的最少数量 -->
<property name="corePoolSize" value="5" />
<!-- 允许的空闲时间 -->
<property name="keepAliveSeconds" value="200" />
<!-- 线程池维护线程的最大数量 -->
<property name="maxPoolSize" value="20" />
<!-- 缓存队列 -->
<property name="queueCapacity" value="20" />
<!-- 对拒绝task的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
配置类
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class JobRunnerConfig {
// @Value("${test.statistical.SavedSum.corePoolSize}")
private int corePoolSize = 5;
//参数可在配置中心配置
//@Value("${test.statistical.SavedSum.maxPoolSize}")
private int maxPoolSize = 10;
/**
* 使用线程池 statisticalSavedSumForJob
* @return TaskExecutor
*/
@Bean
public ThreadPoolTaskExecutor statisticalSavedSumTaskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setKeepAliveSeconds(600);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setQueueCapacity(0);
executor.setThreadNamePrefix("付费会员 统计一期服务单已省金额-");//线程前缀
return executor;
}
}
业务实现类
@Service("testService")
public class testServiceImpl implements testService {
//注入线程池 注意通过名称查找
@Resource
private ThreadPoolTaskExecutor statisticalSavedSumTaskExecutor;
//@Value("${test.statistical.SavedSum.pageSize}")
private int pageSize = 100;
//@Value("${test.statistical.SavedSum.threadNum}")
private int threadNum = 5;
//可配置在配置中心 /配置文件中 @Value("${test.statistical.SavedSum.queueCapacity}")
private static int queueCapacity = 1000;
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(queueCapacity);
@Override
public void statisticalSavedSum() {
try {
//分页查询所有的数据 这里直接创建一个集合不做查询了
List<String> testList = new ArrayList<>()
if (testList.size() == 0 || testList.isEmpty()) {
return;
}
for (String str : testList) {
//遍历查询出的数据放到队列中
queue.offer(str, 5, TimeUnit.SECONDS);
}
//统计执行失败的任务
List<Future<Integer>> failList = new ArrayList<>();
for (int i = 0; i < threadNum; i++) {
Future<Integer> failCount = statisticalSavedSumTaskExecutor
.submit(new TestStatisticalSavedSumThread(memberQueue)); //TestStatisticalSavedSumThread真正执行的类
failList.add(failCount);
}
int count;
count = failList.stream().mapToInt(this::getCount).sum();
logger.warn("统计失败数量:{}", count);
} catch (InterruptedException e) {
logger.error("发生中断异常:", e);
}
}
private int getCount(Future<Integer> fail) {
int n = 0;
try {
n = fail.get();
} catch (InterruptedException e) {
logger.error("发生中断异常:", e);
} catch (ExecutionException e) {
logger.error(" 线程返回结果异常:", e);
}
return n;
}
}
Thread执行类
public class TestStatisticalSavedSumThread implements Callable<Integer> {
//队列
private BlockingQueue<String> memberQueue;
//有参构造
public MemberStatisticalSavedSumThread(BlockingQueue<String> memberQueue) {
this.memberQueue = memberQueue;
}
@Override
public Integer Call() throws Exception {
int failCount = 0;
try {
//无限循环获取队列中的数据处理
while (memberQueue.size() > 0) {
// ArrayBlockingQueue 如果队列满了,或者是空的,那么都会在执行操作的时候,阻塞住
// 指定时间内获取不到,退出监听
String str = memberQueue.poll(5, TimeUnit.SECONDS);
if (Optional.ofNullable(str).isPresent()) {
//再次执行真正的业务方法
}
}
} catch (Exception e) {
Thread.currentThread().interrupt();
}
return failCount;
}
}
批量多线程高效处理任务