示例

  1. @Configuration
  2. @EnableAsync
  3. public class AsyncConfig implements AsyncConfigurer {
  4. @Override
  5. public Executor getAsyncExecutor() {
  6. //线程池设置
  7. ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  8. //核心线程
  9. taskExecutor.setCorePoolSize(8);
  10. //最大线程
  11. taskExecutor.setMaxPoolSize(16);
  12. //队列大小
  13. taskExecutor.setQueueCapacity(40);
  14. taskExecutor.setThreadNamePrefix("async-service-");
  15. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
  16. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
  17. taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  18. taskExecutor.initialize();
  19. return taskExecutor;
  20. }
  21. @Override
  22. public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
  23. //直接返回 祖籍 AsyncUncaughtExceptionHandler 对象
  24. return AsyncConfigurer.super.getAsyncUncaughtExceptionHandler();
  25. }
  26. }

参数说明

corePoolSize: 线程池维护线程最小数量
maxPoolSize: 线程池维护线程最大数量
keepAliveSeconds: (maxPoolSize-corePoolSize)部分线程空闲最大存活时间
queueCapacity: 阻塞任务队列(任务超过corePoolSize部分就会放到queueCapacity)
AllowCoreThreadTimeOut: 设置为true的话,keepAliveSeconds参数设置的有效时间对corePoolSize线程也有效
RejectedExecutionHandler: 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理。

拒绝策略RejectedExecutionHandler

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

线程创建流程:

  • 当线程数量小于corePoolSize时,创建线程,不管线程是不是闲置的
  • 当线程数量大于等于corePoolSize时,把任务放到queueCapacity队列
  • 当queueCapacity满了,就创建新的线程来执行,直到线程池大小超过maxPoolSize
  • 当线程数量大于等于maxPoolSize时,根据RejectedExecutionHandler设置的策略来处理新加入的任务

ThreadPoolTaskExecutor 会配合@Async 使用,在方法上加了此注解,说明此方法是异步方法。

流程图

ThreadPoolTaskExecutor 线程池使用详解 - 图1

关闭线程池

调用shutdown或者shutdownNow,两者都不会接受新的任务,而且通过调用要停止线程的interrupt方法来中断线程,有可能线程永远不会被中断,不同之处在于shutdownNow会首先将线程池的状态设置为STOP,然后尝试停止所有线程(有可能导致部分任务没有执行完)然后返回未执行任务的列表。而shutdown则只是将线程池的状态设置为shutdown,然后中断所有没有执行任务的线程,并将剩余的任务执行完。

配置线程个数

  • 如果是CPU密集型任务,那么线程池的线程个数应该尽量少一些,一般为CPU的个数+1条线程。
  • 如果是IO密集型任务,那么线程池的线程可以放的很大,如2*CPU的个数。
  • 对于混合型任务,如果可以拆分的话,通过拆分成CPU密集型和IO密集型两种来提高执行效率;如果不能拆分的的话就可以根据实际情况来调整线程池中线程的个数。

监控线程池状态

常用状态:

  • taskCount:线程需要执行的任务个数。
  • completedTaskCount:线程池在运行过程中已完成的任务数。
  • largestPoolSize:线程池曾经创建过的最大线程数量。
  • getPoolSize获取当前线程池的线程数量。
  • getActiveCount:获取活动的线程的数量

多线程池

不推荐一个项目配置一个线程池,这样若是某些业务出现异常时,会影响到整个项目的健壮性。故我们可以根据业务,为不同的业务配置不同参数的数据库连接池。
示例

  1. @Configuration
  2. @EnableAsync
  3. @Slf4j
  4. public class ExecutorConfig {
  5. @Bean
  6. public Executor asyncServiceExecutor() {
  7. ThreadPoolTaskExecutor threadPoolTaskExecutor = new VisiableThreadPoolTaskExecutor();
  8. //核心线程数
  9. threadPoolTaskExecutor.setCorePoolSize(5);
  10. threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
  11. //最大线程数
  12. threadPoolTaskExecutor.setMaxPoolSize(5);
  13. //配置队列大小
  14. threadPoolTaskExecutor.setQueueCapacity(50);
  15. //配置线程池前缀
  16. threadPoolTaskExecutor.setThreadNamePrefix("async-service-");
  17. //拒绝策略
  18. // threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
  19. threadPoolTaskExecutor.setRejectedExecutionHandler(new PrintingPolicy());
  20. threadPoolTaskExecutor.initialize();
  21. return threadPoolTaskExecutor;
  22. }
  23. @Bean
  24. public Executor customServiceExecutor(){
  25. ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor();
  26. //线程核心数目
  27. threadPoolTaskExecutor.setCorePoolSize(10);
  28. threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
  29. //最大线程数
  30. threadPoolTaskExecutor.setMaxPoolSize(10);
  31. //配置队列大小
  32. threadPoolTaskExecutor.setQueueCapacity(50);
  33. //配置线程池前缀
  34. threadPoolTaskExecutor.setThreadNamePrefix("custom-service-");
  35. //配置拒绝策略
  36. threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
  37. //数据初始化
  38. threadPoolTaskExecutor.initialize();
  39. return threadPoolTaskExecutor;
  40. }
  41. }

如何选择使用线程池,例如:@Async(“asyncServiceExecutor”),这样就选择了想使用的线程池。

具体详解看这篇文章