背景

需求: 项目二期,需要对某一张表的字段进行扩展,为了兼容二期业务代码逻辑,现需要将一起数据,进行统一计算补充该字段,满足二期逻辑
数据量:50w

解决方案

二期上线初,使用lts -JOB定时任务,统一批量处理老数据,代码实现思想:考虑到,数据量比较大,同时原有系统中已经存在比较多的抛弃任务,故此次处理数据任务,为批量查询数据库,使用单独线程池配合有边界队列ArrayBlockingQueue提高处理数据的效率

ThreadPoolTaskExecutor 参数

ThreadPoolTaskExecutor通常通过XML方式配置,或者通过Executors的工厂方法进行配置。
XML方式配置代码如下:

  1. <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  2. <!-- 线程池维护线程的最少数量 -->
  3. <property name="corePoolSize" value="5" />
  4. <!-- 允许的空闲时间 -->
  5. <property name="keepAliveSeconds" value="200" />
  6. <!-- 线程池维护线程的最大数量 -->
  7. <property name="maxPoolSize" value="20" />
  8. <!-- 缓存队列 -->
  9. <property name="queueCapacity" value="20" />
  10. <!-- 对拒绝task的处理策略 -->
  11. <property name="rejectedExecutionHandler">
  12. <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
  13. </property>
  14. </bean>

注意:rejectedExecutionHandler字段用于配置拒绝策略,常用的拒绝策略如下:

  • AbortPolicy:用于被拒绝任务的处理程序,它将抛出RejectedExecutionException
  • CallerRunsPolicy:用于被拒绝任务的处理程序,它直接在execute方法的调用线程中运行被拒绝的任务
  • DiscardOldestPolicy:用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试execute
  • DiscardPolicy:用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务

自定义策略,只需要实现RejectedExecutionHandler接口即可

执行任务两种方式

  • 无返回值的任务使用**execute(Runnable)**
  • 有返回值的任务使用**submit(Runnable)**

任务处理流程

简单总结一句话:

提交任务,线程池中的线程数可以增长至corePoolSize,之后继续提交任务将暂存至队列中,如果队列满,则看是否能继续增长线程数至maximumPoolSize,超出后将进行拒绝策略处理。显然,如果采用×××队列,那么maximumPoolSize将失效,线程池中的线程最多就是corePoolSize个线程工作。

线程池 ThreadPoolTaskExecutor - 图1

关闭线程池

调用shutdown或者shutdownNow,两者都不会接受新的任务,而且通过调用要停止线程的interrupt方法来中断线程,有可能线程永远不会被中断,不同之处在于shutdownNow会首先将线程池的状态设置为STOP,然后尝试停止所有线程(有可能导致部分任务没有执行完)然后返回未执行任务的列表。而shutdown则只是将线程池的状态设置为shutdown,然后中断所有没有执行任务的线程,并将剩余的任务执行完

一般少有使用

监控线程池状态

常用状态:

  • taskCount:线程需要执行的任务个数。
  • completedTaskCount:线程池在运行过程中已完成的任务数。
  • largestPoolSize:线程池曾经创建过的最大线程数量。
  • getPoolSize:获取当前线程池的线程数量。
  • getActiveCount:获取活动的线程的数量

    通过继承线程池,重写beforeExecuteafterExecuteterminated方法来在线程执行任务前,线程执行任务结束,和线程终结前获取线程的运行情况,根据具体情况调整线程池的线程数量。

代码实现

xml文件

  1. <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  2. <!-- 线程池维护线程的最少数量 -->
  3. <property name="corePoolSize" value="5" />
  4. <!-- 允许的空闲时间 -->
  5. <property name="keepAliveSeconds" value="200" />
  6. <!-- 线程池维护线程的最大数量 -->
  7. <property name="maxPoolSize" value="20" />
  8. <!-- 缓存队列 -->
  9. <property name="queueCapacity" value="20" />
  10. <!-- 对拒绝task的处理策略 -->
  11. <property name="rejectedExecutionHandler">
  12. <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
  13. </property>
  14. </bean>

配置类

  1. import org.springframework.beans.factory.annotation.Value;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  5. import java.util.concurrent.ThreadPoolExecutor;
  6. @Configuration
  7. public class JobRunnerConfig {
  8. // @Value("${test.statistical.SavedSum.corePoolSize}")
  9. private int corePoolSize = 5;
  10. //参数可在配置中心配置
  11. //@Value("${test.statistical.SavedSum.maxPoolSize}")
  12. private int maxPoolSize = 10;
  13. /**
  14. * 使用线程池 statisticalSavedSumForJob
  15. * @return TaskExecutor
  16. */
  17. @Bean
  18. public ThreadPoolTaskExecutor statisticalSavedSumTaskExecutor(){
  19. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  20. executor.setCorePoolSize(corePoolSize);
  21. executor.setMaxPoolSize(maxPoolSize);
  22. executor.setKeepAliveSeconds(600);
  23. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  24. executor.setQueueCapacity(0);
  25. executor.setThreadNamePrefix("付费会员 统计一期服务单已省金额-");//线程前缀
  26. return executor;
  27. }
  28. }

业务实现类

  1. @Service("testService")
  2. public class testServiceImpl implements testService {
  3. //注入线程池 注意通过名称查找
  4. @Resource
  5. private ThreadPoolTaskExecutor statisticalSavedSumTaskExecutor;
  6. //@Value("${test.statistical.SavedSum.pageSize}")
  7. private int pageSize = 100;
  8. //@Value("${test.statistical.SavedSum.threadNum}")
  9. private int threadNum = 5;
  10. //可配置在配置中心 /配置文件中 @Value("${test.statistical.SavedSum.queueCapacity}")
  11. private static int queueCapacity = 1000;
  12. private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(queueCapacity);
  13. @Override
  14. public void statisticalSavedSum() {
  15. try {
  16. //分页查询所有的数据 这里直接创建一个集合不做查询了
  17. List<String> testList = new ArrayList<>()
  18. if (testList.size() == 0 || testList.isEmpty()) {
  19. return;
  20. }
  21. for (String str : testList) {
  22. //遍历查询出的数据放到队列中
  23. queue.offer(str, 5, TimeUnit.SECONDS);
  24. }
  25. //统计执行失败的任务
  26. List<Future<Integer>> failList = new ArrayList<>();
  27. for (int i = 0; i < threadNum; i++) {
  28. Future<Integer> failCount = statisticalSavedSumTaskExecutor
  29. .submit(new TestStatisticalSavedSumThread(memberQueue)); //TestStatisticalSavedSumThread真正执行的类
  30. failList.add(failCount);
  31. }
  32. int count;
  33. count = failList.stream().mapToInt(this::getCount).sum();
  34. logger.warn("统计失败数量:{}", count);
  35. } catch (InterruptedException e) {
  36. logger.error("发生中断异常:", e);
  37. }
  38. }
  39. private int getCount(Future<Integer> fail) {
  40. int n = 0;
  41. try {
  42. n = fail.get();
  43. } catch (InterruptedException e) {
  44. logger.error("发生中断异常:", e);
  45. } catch (ExecutionException e) {
  46. logger.error(" 线程返回结果异常:", e);
  47. }
  48. return n;
  49. }
  50. }

Thread执行类

  1. public class TestStatisticalSavedSumThread implements Callable<Integer> {
  2. //队列
  3. private BlockingQueue<String> memberQueue;
  4. //有参构造
  5. public MemberStatisticalSavedSumThread(BlockingQueue<String> memberQueue) {
  6. this.memberQueue = memberQueue;
  7. }
  8. @Override
  9. public Integer Call() throws Exception {
  10. int failCount = 0;
  11. try {
  12. //无限循环获取队列中的数据处理
  13. while (memberQueue.size() > 0) {
  14. // ArrayBlockingQueue 如果队列满了,或者是空的,那么都会在执行操作的时候,阻塞住
  15. // 指定时间内获取不到,退出监听
  16. String str = memberQueue.poll(5, TimeUnit.SECONDS);
  17. if (Optional.ofNullable(str).isPresent()) {
  18. //再次执行真正的业务方法
  19. }
  20. }
  21. } catch (Exception e) {
  22. Thread.currentThread().interrupt();
  23. }
  24. return failCount;
  25. }
  26. }

批量多线程高效处理任务