在开发中,有时会遇到批量处理的业务。如果单线程处理,速度会非常慢,可能会导致上游超时。这是就需要使用多线程开发。

可以使用J.U.C提供的线程池:ThreadPoolExecutor类。在Spring框架中,也可以使用ThreadPoolTaskExecutor类。ThreadPoolTaskExecutor其实是对ThreadPoolExecutor的一种封装。

一、线程池

使用线程池可以带来一系列好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

1、线程池生命周期

image.png

2、任务调度

任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。
首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

image.png

二、在Spring框架中使用ThreadPoolTaskExecutor类

为了应对大多数业务场景,配合Spring Boot框架,我们可以使用ThreadPoolTaskExecutor创建线程池,并把它注入到ioc容器中,全局都可以使用

1、首先,配置线程池参数

配置文件

  1. thread-pool:
  2. core-pool-size: 4
  3. max-pool-size: 16
  4. queue-capacity: 80
  5. keep-alive-seconds: 120

线程池实体类

  1. @Data
  2. @Component
  3. @ConfigurationProperties(prefix = "thread-pool")
  4. public class ThreadPoolProperties {
  5. private int corePoolSize;
  6. private int maxPoolSize;
  7. private int queueCapacity;
  8. private int keepAliveSeconds;
  9. }

配置类

这里@EnableAsync是与@Async配合使用,用于执行异步任务

  1. @EnableAsync
  2. @Configuration
  3. public class ThreadPoolConfig {
  4. private final ThreadPoolProperties threadPoolProperties;
  5. @Autowired
  6. public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) {
  7. this.threadPoolProperties = threadPoolProperties;
  8. }
  9. @Bean(name = "threadPoolTaskExecutor")
  10. public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
  11. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  12. executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
  13. executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
  14. executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
  15. executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
  16. executor.setThreadNamePrefix("thread-pool-");
  17. return executor;
  18. }
  19. }

在业务类中通过自定义SpringUtils类获取bean或使用@Async,来使用线程池。

使用情况

  1. @Override
  2. public String singleProcess(String input) {
  3. log.info("Processing...");
  4. int i = Integer.parseInt(input) * 1000 / 24 + 3;
  5. return "success" + i;
  6. }
  7. /**
  8. * 批量处理
  9. *
  10. * @param inputList 输入对象列表
  11. * @return 输出对象列表
  12. */
  13. @Override
  14. public List<String> multiProcess(List<String> inputList) {
  15. //创建线程池
  16. ThreadPoolTaskExecutor executor = SpringUtils.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class);
  17. //创建并使用CountDownLatch等待所有线程执行结束:
  18. CountDownLatch latch = new CountDownLatch(inputList.size());
  19. //每个线程把执行结果添加到线程安全的List中。这里List应当使用SynchronizedList
  20. List<String> outputList = Collections.synchronizedList(new ArrayList<>(inputList.size()));
  21. for (String input : inputList) {
  22. //分配线程进行执行
  23. executor.execute(() -> {
  24. try {
  25. //执行单个线程方法进行执行
  26. String output = singleProcess(input);
  27. outputList.add(output);
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. } finally {
  31. latch.countDown();
  32. }
  33. });
  34. }
  35. try {
  36. //等待所有线程执行完毕
  37. latch.await();
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. return outputList;
  42. }
  43. /**
  44. * 异步处理
  45. *
  46. * @param input 输入对象
  47. * @return 输出Future对象
  48. */
  49. @Async("threadPoolTaskExecutor")
  50. @Override
  51. public Future<String> asyncProcess(String input) {
  52. return new AsyncResult<>(singleProcess(input));
  53. }

当数据量较小时,单线程的执行效率远远高于多线程

附录:

连接数 = ((核心数 * 2) + 有效磁盘数)

异步线程

异步线程的调度,主线程执行结束就结束,异步线程不干扰主线程的执行过程

案例:

@RestController
@RequestMapping("async")
@Slf4j
public class AsyncController {

    @Autowired
    IAsyncService asyncService;

    @GetMapping
    public String test(String msg) {
        log.info("主线程start*****************");
        asyncService.sentMsg(msg);
        log.info("主线程end*****************");
        return "success";
    }
}
public interface IAsyncService {
    void sentMsg(String msg);
}



@Service
@Slf4j
public class AsyncServiceImpl implements IAsyncService{

    @Override
    @Async
    public void sentMsg(String msg) {
    @Override
    @Async
    public void sentMsg(String msg) {
        log.info("子线程发送短信");
        int i = 0;
        while (i<10) {
            i++;
            System.out.println("短信发送"+i);
        }
        log.info("子线程发送完毕");
    }
}

执行结果:
image.png