业务背景

定时任务初始化,调用第三方API 接口获取数据,第三方接口为模糊查询,业务会将需要查询的大量关键词提前,放到TEXT文件中,一行一条数据,项目中是使用定时任务去操作我们的文件,读取获取需要关键字,调用API,获得数据,数据加载到本地DB中。

  1. 业务上传到文件服务器,固定路径中
  2. 触发定时任务,获取文件到本地服务,项目读取文件,加载
  3. 调用API ,获得数据,落库

实际业务实现,出现问题

当需要搜索的关键词比较多,量比较大,这个时候可能由于单线程读取文件,加载比较慢,无法实现快速处理,落库

解决方案:

  1. springboot项目,添加单独线程池,专门用来处理批量任务,与核心业务线程进行区别,保证互不影响,提高安全性
  2. 使用多线程读取本地以及下载好的文件【具体实现下文】文件内容量比较小不建议使用,反而可能造成耗时

项目实践

1. springboot配置类,支持线程数量可配置:application.properties

  1. # 线程池相关 线程池配置
  2. async.film-job.core-pool-size=20
  3. async.film-job.max-pool-size=100
  4. async.film-job.keep-alive-seconds=10
  5. async.film-job.queue-capacity=200
  6. async.film-job.thread-name-prefix=async-Thread-film-service-
  7. # 读取文件开启线程数量
  8. file.thread.num=5

实体类

  1. @Data
  2. @Accessors(chain = true)
  3. public class FileThreadVO<T> {
  4. private InputStream is;
  5. private Integer start_line;
  6. private Integer end_line;
  7. private List<T> result;
  8. }

2. AsyncFilmServiceConfig线程池配置类:

  1. import lombok.Setter;
  2. import org.springframework.boot.context.properties.ConfigurationProperties;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.scheduling.annotation.EnableAsync;
  6. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  7. import java.util.concurrent.ThreadPoolExecutor;
  8. /**
  9. * 线程池配置 @Async("asyncOrderService")
  10. * @EnableAsync开始对异步任务的支持,然后使用@ConfigurationProperties把同类配置信息自动封装成一个实体类
  11. * @ConfigurationProperties属性prefix表示application.yml配置文件中配置项的前缀,最后结合Lombok的@Setter保证配置文件的值能够注入到该类对应的属性中
  12. *
  13. **/
  14. @Setter
  15. @ConfigurationProperties(prefix = "async.film-job")
  16. @EnableAsync
  17. @Configuration
  18. public class AsyncFilmServiceConfig {
  19. /**
  20. * 核心线程数(默认线程数)
  21. */
  22. private int corePoolSize;
  23. /**
  24. * 最大线程数
  25. */
  26. private int maxPoolSize;
  27. /**
  28. * 允许线程空闲时间(单位:默认为秒)
  29. */
  30. private int keepAliveSeconds;
  31. /**
  32. * 缓冲队列大小
  33. */
  34. private int queueCapacity;
  35. /**
  36. * 线程池名前缀
  37. */
  38. private String threadNamePrefix;
  39. @Bean
  40. public ThreadPoolTaskExecutor asyncFilmService() {
  41. ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
  42. threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
  43. threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
  44. threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);
  45. threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
  46. threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);
  47. // 线程池对拒绝任务的处理策略
  48. threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  49. // 完成任务自动关闭 , 默认为false
  50. threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  51. // 核心线程超时退出,默认为false
  52. threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
  53. threadPoolTaskExecutor.initialize();
  54. return threadPoolTaskExecutor;
  55. }
  56. }

2.2 @Async注解

注意一点:线程池的@Async

  1. @Async注解,它只有一个String类型的value属性,用于指定一个 Bean 的 Name,类型是 Executor 或 TaskExecutor,表示使用这个指定的线程池来执行异步任务:例如 @Async(“asyncFilmService”)

@Async失效:

  1. 如果使用SpringBoot框架必须在启动类中增加@EnableAsync注解<br /> **● 异步方法不能与被调用的异步方法在同一个类中**<br /> 异步类没有使用@Component注解(或其他同类注解)导致spring无法扫描到异步类<br /> 类中需要使用@Autowired@Resource等注解自动注入,**不能自己手动new对象**<br /> 异步方法使用非publicstatic修饰

2.3 线程池提交执行线程的原理图

ThreadPoolExecutor

springboot 大文件处理 - 图1

springboot 大文件处理 - 图2

3. 分段读取文件工具类 ReadFileThread

  1. import org.apache.logging.log4j.LogManager;
  2. import org.apache.logging.log4j.Logger;
  3. import java.io.BufferedReader;
  4. import java.io.InputStream;
  5. import java.io.InputStreamReader;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. import java.util.concurrent.Callable;
  9. /**
  10. * Description:分段读取文件
  11. */
  12. public class ReadFileThread implements Callable<List<String>> {
  13. private static Logger logger = LogManager.getLogger(ReadFileThread.class);
  14. private Integer start_index; //文件开始读取行数
  15. private Integer end_index; //文件结束读取行数
  16. private InputStream is; //输入流
  17. public ReadFileThread(int start_index, int end_index, InputStream is) {
  18. this.start_index = start_index;
  19. this.end_index = end_index;
  20. this.is = is;
  21. }
  22. /**
  23. * Computes a result, or throws an exception if unable to do so.
  24. *
  25. * @return computed result
  26. * @throws Exception if unable to compute a result
  27. */
  28. @Override
  29. public List<String> call() throws Exception {
  30. long start = System.currentTimeMillis();
  31. StringBuilder result = new StringBuilder();
  32. List<String> resultList = new ArrayList<>();
  33. BufferedReader reader = new BufferedReader(new InputStreamReader(is, "utf-8"));
  34. int loc = 1;
  35. while (loc < start_index) {
  36. reader.readLine();
  37. loc++;
  38. }
  39. while (loc < end_index) {
  40. // result.append(reader.readLine()).append("\r\n"); // 读取成string字符串
  41. resultList.add(reader.readLine().trim());
  42. loc++;
  43. }
  44. // result.append(reader.readLine());
  45. resultList.add(reader.readLine().trim());
  46. // String strResult = result.toString();
  47. reader.close();
  48. is.close();
  49. logger.info("线程={} 文件读取完成 总耗时={}毫秒 读取数据={}条",
  50. Thread.currentThread().getName(), (System.currentTimeMillis() - start), resultList.size());
  51. return resultList;
  52. }
  53. }

4. FileService 服务实现类

  1. import com.zy.website.code.ApiReturnCode;
  2. import com.zy.website.exception.WebsiteBusinessException;
  3. import com.zy.website.model.vo.FileThreadVO;
  4. import com.zy.website.utils.multi.ReadFileThread;
  5. import org.apache.logging.log4j.LogManager;
  6. import org.apache.logging.log4j.Logger;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  9. import org.springframework.stereotype.Service;
  10. import org.springframework.web.multipart.MultipartFile;
  11. import javax.annotation.Resource;
  12. import java.io.BufferedReader;
  13. import java.io.IOException;
  14. import java.io.InputStream;
  15. import java.io.InputStreamReader;
  16. import java.util.ArrayList;
  17. import java.util.List;
  18. @Service("fileService")
  19. public class FileService{
  20. //日志
  21. private static Logger logger = LogManager.getLogger(FileService.class);
  22. @Value("${file.thread.num}")
  23. private Integer threadNum; //线程数
  24. @Resource(name = "asyncFilmService")
  25. private ThreadPoolTaskExecutor executor; //线程池
  26. /**
  27. * 启用多个线程分段读取文件
  28. * PS:若文件行数小于线程数会造成线程浪费
  29. * 适用于读取一行一行的数据报文
  30. * @return
  31. */
  32. public List uploadByThread(MultipartFile file) throws Exception {
  33. if (file.isEmpty()) {
  34. return null;
  35. }
  36. InputStream is = file.getInputStream();
  37. List<FileThreadVO> threadVOS = new ArrayList<>(threadNum); //自定义线程实体对象
  38. //为线程分配读取行数
  39. Integer lines = getLineNum(is); //文件总行数
  40. Integer line; //每个线程分配行数
  41. Integer start_line; //线程读取文件开始行数
  42. Integer end_line; //线程读取文件结束行数
  43. //根据文件行数和线程数计算分配的行数,这里有点繁琐了,待优化
  44. if (lines < threadNum) {
  45. for (int i = 1; i <= lines; i++) {
  46. FileThreadVO fileThreadVO = new FileThreadVO();
  47. start_line = end_line = i;
  48. InputStream stream = file.getInputStream();
  49. ReadFileThread readFileThread = new ReadFileThread(start_line, end_line, stream);
  50. fileThreadVO.setStart_line(start_line);
  51. fileThreadVO.setIs(stream);
  52. fileThreadVO.setEnd_line(end_line);
  53. fileThreadVO.setResult(executor.submit(readFileThread).get());
  54. threadVOS.add(fileThreadVO);
  55. }
  56. } else {
  57. for (int i = 1, tempLine = 0; i <= threadNum; i++, tempLine = ++end_line) {
  58. InputStream stream = file.getInputStream();
  59. FileThreadVO fileThreadVO = new FileThreadVO();
  60. Integer var1 = lines / threadNum;
  61. Integer var2 = lines % threadNum;
  62. line = (i == threadNum) ? (var2 == 0 ? var1 : var1 + var2) : var1;
  63. start_line = (i == 1) ? 1 : tempLine;
  64. end_line = (i == threadNum) ? lines : start_line + line - 1;
  65. ReadFileThread readFileThread = new ReadFileThread(start_line, end_line, stream);
  66. fileThreadVO.setStart_line(start_line);
  67. fileThreadVO.setIs(stream);
  68. fileThreadVO.setEnd_line(end_line);
  69. fileThreadVO.setResult(executor.submit(readFileThread).get());
  70. threadVOS.add(fileThreadVO);
  71. }
  72. }
  73. List resultCompleteList = new ArrayList<>(); //整合多个线程的读取结果
  74. threadVOS.forEach(record->{
  75. List<String> result = record.getResult();
  76. resultCompleteList.addAll(result);
  77. });
  78. boolean isComplete = false;
  79. if (resultCompleteList != null ) {
  80. //校验行数 由于本项目使用的是读取行为一个条件 所以只校验行数 也可以校验字节
  81. int i = resultCompleteList.size() - lines;
  82. if (i == 0) {
  83. isComplete = true;
  84. }
  85. }
  86. if (!isComplete) {
  87. logger.error(">>>>>====uploadByThread====>>>>>>文件完整性校验失败!");
  88. throw new WebsiteBusinessException("The file is incomplete!", ApiReturnCode.HTTP_ERROR.getCode());//自定义异常以及错误码
  89. } else {
  90. return resultCompleteList;
  91. }
  92. }
  93. /**
  94. * 获取文件行数
  95. * @param is
  96. * @return
  97. * @throws IOException
  98. */
  99. public int getLineNum(InputStream is) throws IOException {
  100. int line = 0;
  101. BufferedReader reader = new BufferedReader(new InputStreamReader(is));
  102. while (reader.readLine() != null) {
  103. line++;
  104. }
  105. reader.close();
  106. is.close();
  107. return line;
  108. }
  109. }

5. 方法中具体使用

springboot 大文件处理 - 图3


该方法只有对文件,利用线程池多线程的读取并未写入,主要业务暂不需要,后续在更。。。