业务背景
定时任务初始化,调用第三方API 接口获取数据,第三方接口为模糊查询,业务会将需要查询的大量关键词提前,放到TEXT文件中,一行一条数据,项目中是使用定时任务去操作我们的文件,读取获取需要关键字,调用API,获得数据,数据加载到本地DB中。
- 业务上传到文件服务器,固定路径中
- 触发定时任务,获取文件到本地服务,项目读取文件,加载
- 调用API ,获得数据,落库
实际业务实现,出现问题
当需要搜索的关键词比较多,量比较大,这个时候可能由于单线程读取文件,加载比较慢,无法实现快速处理,落库
解决方案:
- springboot项目,添加单独线程池,专门用来处理批量任务,与核心业务线程进行区别,保证互不影响,提高安全性
- 使用多线程读取本地以及下载好的文件【具体实现下文】文件内容量比较小不建议使用,反而可能造成耗时
项目实践
1. springboot配置类,支持线程数量可配置:application.properties
# 线程池相关 线程池配置async.film-job.core-pool-size=20async.film-job.max-pool-size=100async.film-job.keep-alive-seconds=10async.film-job.queue-capacity=200async.film-job.thread-name-prefix=async-Thread-film-service-# 读取文件开启线程数量file.thread.num=5
实体类
@Data@Accessors(chain = true)public class FileThreadVO<T> {private InputStream is;private Integer start_line;private Integer end_line;private List<T> result;}
2. AsyncFilmServiceConfig线程池配置类:
import lombok.Setter;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;/*** 线程池配置 @Async("asyncOrderService")* @EnableAsync开始对异步任务的支持,然后使用@ConfigurationProperties把同类配置信息自动封装成一个实体类* @ConfigurationProperties属性prefix表示application.yml配置文件中配置项的前缀,最后结合Lombok的@Setter保证配置文件的值能够注入到该类对应的属性中***/@Setter@ConfigurationProperties(prefix = "async.film-job")@EnableAsync@Configurationpublic class AsyncFilmServiceConfig {/*** 核心线程数(默认线程数)*/private int corePoolSize;/*** 最大线程数*/private int maxPoolSize;/*** 允许线程空闲时间(单位:默认为秒)*/private int keepAliveSeconds;/*** 缓冲队列大小*/private int queueCapacity;/*** 线程池名前缀*/private String threadNamePrefix;@Beanpublic ThreadPoolTaskExecutor asyncFilmService() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(corePoolSize);threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);threadPoolTaskExecutor.setQueueCapacity(queueCapacity);threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);// 线程池对拒绝任务的处理策略threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 完成任务自动关闭 , 默认为falsethreadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);// 核心线程超时退出,默认为falsethreadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);threadPoolTaskExecutor.initialize();return threadPoolTaskExecutor;}}
2.2 @Async注解
注意一点:线程池的@Async
- @Async注解,它只有一个String类型的value属性,用于指定一个 Bean 的 Name,类型是 Executor 或 TaskExecutor,表示使用这个指定的线程池来执行异步任务:例如 @Async(“asyncFilmService”)
@Async失效:
● 如果使用SpringBoot框架必须在启动类中增加@EnableAsync注解<br /> **● 异步方法不能与被调用的异步方法在同一个类中**<br /> ● 异步类没有使用@Component注解(或其他同类注解)导致spring无法扫描到异步类<br /> ● 类中需要使用@Autowired或@Resource等注解自动注入,**不能自己手动new对象**<br /> ● 异步方法使用非public或static修饰
2.3 线程池提交执行线程的原理图
ThreadPoolExecutor


3. 分段读取文件工具类 ReadFileThread
import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import java.io.BufferedReader;import java.io.InputStream;import java.io.InputStreamReader;import java.util.ArrayList;import java.util.List;import java.util.concurrent.Callable;/*** Description:分段读取文件*/public class ReadFileThread implements Callable<List<String>> {private static Logger logger = LogManager.getLogger(ReadFileThread.class);private Integer start_index; //文件开始读取行数private Integer end_index; //文件结束读取行数private InputStream is; //输入流public ReadFileThread(int start_index, int end_index, InputStream is) {this.start_index = start_index;this.end_index = end_index;this.is = is;}/*** Computes a result, or throws an exception if unable to do so.** @return computed result* @throws Exception if unable to compute a result*/@Overridepublic List<String> call() throws Exception {long start = System.currentTimeMillis();StringBuilder result = new StringBuilder();List<String> resultList = new ArrayList<>();BufferedReader reader = new BufferedReader(new InputStreamReader(is, "utf-8"));int loc = 1;while (loc < start_index) {reader.readLine();loc++;}while (loc < end_index) {// result.append(reader.readLine()).append("\r\n"); // 读取成string字符串resultList.add(reader.readLine().trim());loc++;}// result.append(reader.readLine());resultList.add(reader.readLine().trim());// String strResult = result.toString();reader.close();is.close();logger.info("线程={} 文件读取完成 总耗时={}毫秒 读取数据={}条",Thread.currentThread().getName(), (System.currentTimeMillis() - start), resultList.size());return resultList;}}
4. FileService 服务实现类
import com.zy.website.code.ApiReturnCode;import com.zy.website.exception.WebsiteBusinessException;import com.zy.website.model.vo.FileThreadVO;import com.zy.website.utils.multi.ReadFileThread;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.springframework.beans.factory.annotation.Value;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import org.springframework.stereotype.Service;import org.springframework.web.multipart.MultipartFile;import javax.annotation.Resource;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.util.ArrayList;import java.util.List;@Service("fileService")public class FileService{//日志private static Logger logger = LogManager.getLogger(FileService.class);@Value("${file.thread.num}")private Integer threadNum; //线程数@Resource(name = "asyncFilmService")private ThreadPoolTaskExecutor executor; //线程池/*** 启用多个线程分段读取文件* PS:若文件行数小于线程数会造成线程浪费* 适用于读取一行一行的数据报文* @return*/public List uploadByThread(MultipartFile file) throws Exception {if (file.isEmpty()) {return null;}InputStream is = file.getInputStream();List<FileThreadVO> threadVOS = new ArrayList<>(threadNum); //自定义线程实体对象//为线程分配读取行数Integer lines = getLineNum(is); //文件总行数Integer line; //每个线程分配行数Integer start_line; //线程读取文件开始行数Integer end_line; //线程读取文件结束行数//根据文件行数和线程数计算分配的行数,这里有点繁琐了,待优化if (lines < threadNum) {for (int i = 1; i <= lines; i++) {FileThreadVO fileThreadVO = new FileThreadVO();start_line = end_line = i;InputStream stream = file.getInputStream();ReadFileThread readFileThread = new ReadFileThread(start_line, end_line, stream);fileThreadVO.setStart_line(start_line);fileThreadVO.setIs(stream);fileThreadVO.setEnd_line(end_line);fileThreadVO.setResult(executor.submit(readFileThread).get());threadVOS.add(fileThreadVO);}} else {for (int i = 1, tempLine = 0; i <= threadNum; i++, tempLine = ++end_line) {InputStream stream = file.getInputStream();FileThreadVO fileThreadVO = new FileThreadVO();Integer var1 = lines / threadNum;Integer var2 = lines % threadNum;line = (i == threadNum) ? (var2 == 0 ? var1 : var1 + var2) : var1;start_line = (i == 1) ? 1 : tempLine;end_line = (i == threadNum) ? lines : start_line + line - 1;ReadFileThread readFileThread = new ReadFileThread(start_line, end_line, stream);fileThreadVO.setStart_line(start_line);fileThreadVO.setIs(stream);fileThreadVO.setEnd_line(end_line);fileThreadVO.setResult(executor.submit(readFileThread).get());threadVOS.add(fileThreadVO);}}List resultCompleteList = new ArrayList<>(); //整合多个线程的读取结果threadVOS.forEach(record->{List<String> result = record.getResult();resultCompleteList.addAll(result);});boolean isComplete = false;if (resultCompleteList != null ) {//校验行数 由于本项目使用的是读取行为一个条件 所以只校验行数 也可以校验字节int i = resultCompleteList.size() - lines;if (i == 0) {isComplete = true;}}if (!isComplete) {logger.error(">>>>>====uploadByThread====>>>>>>文件完整性校验失败!");throw new WebsiteBusinessException("The file is incomplete!", ApiReturnCode.HTTP_ERROR.getCode());//自定义异常以及错误码} else {return resultCompleteList;}}/*** 获取文件行数* @param is* @return* @throws IOException*/public int getLineNum(InputStream is) throws IOException {int line = 0;BufferedReader reader = new BufferedReader(new InputStreamReader(is));while (reader.readLine() != null) {line++;}reader.close();is.close();return line;}}
5. 方法中具体使用

该方法只有对文件,利用线程池多线程的读取并未写入,主要业务暂不需要,后续在更。。。
