ThreadPoolTaskExecutor是spring core包中的。
ThreadPoolExecutor是JDK中的JUC。
ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。

image.png

1.自定义线程池

  1. import java.util.concurrent.ThreadFactory;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. public class NamedThreadFactory implements ThreadFactory {
  4. private static final AtomicInteger poolNumber = new AtomicInteger(1);
  5. private final AtomicInteger threadNumber = new AtomicInteger(1);
  6. private String namePrefix;
  7. private final ThreadGroup group;
  8. public NamedThreadFactory( String name ) {
  9. this.namePrefix = namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
  10. SecurityManager s = System.getSecurityManager();
  11. group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
  12. }
  13. @Override
  14. public Thread newThread(Runnable r) {
  15. Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
  16. if (t.isDaemon())
  17. t.setDaemon(false);
  18. if (t.getPriority() != Thread.NORM_PRIORITY)
  19. t.setPriority(Thread.NORM_PRIORITY);
  20. return t;
  21. }
  22. }
  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.scheduling.annotation.EnableAsync;
  4. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  5. import java.util.concurrent.ThreadPoolExecutor;
  6. @Configuration
  7. public class ThreadPoolConfig {
  8. @Bean
  9. public ThreadPoolTaskExecutor myThreadPoolTaskExecutor() {
  10. ThreadPoolTaskExecutor myThreadPoolTaskExecutor = new ThreadPoolTaskExecutor();
  11. //线程池维护线程的最少数量
  12. /*
  13. *
  14. * N为cpu个数
  15. * 如果是CPU密集型应用,则线程池大小设置为N+1
  16. * 如果是IO密集型应用,则线程池大小设置为2N+1
  17. * 此方法返回可用处理器的虚拟机的最大数量; 不小于1
  18. * Runtime.getRuntime().availableProcessors();
  19. * linux查询方式:
  20. * cat /proc/cpuinfo| grep "processor"| wc -l
  21. */
  22. //配置核心线程数
  23. myThreadPoolTaskExecutor.setCorePoolSize(5);
  24. //配置最大线程数
  25. myThreadPoolTaskExecutor.setMaxPoolSize(6);
  26. //配置队列大小
  27. myThreadPoolTaskExecutor.setQueueCapacity(200);
  28. //设置保活秒数
  29. myThreadPoolTaskExecutor.setKeepAliveSeconds(60);
  30. //配置线程池中的线程的名称前缀
  31. myThreadPoolTaskExecutor.setThreadNamePrefix("Async-Thread-");
  32. //rejection-policy:设置拒绝策略,当pool已经达到max size的时候,如何处理新任务
  33. //CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
  34. myThreadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  35. //设置等待任务完成时关机
  36. myThreadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  37. //设置等待终止时间
  38. myThreadPoolTaskExecutor.setAwaitTerminationSeconds(60);
  39. //执行初始化
  40. myThreadPoolTaskExecutor.initialize();
  41. // 设置线程池中的线程的名称前缀(创建线程或线程池时请指定有意义的线程名称,方便出错时回溯)
  42. myThreadPoolTaskExecutor.setThreadFactory(new NamedThreadFactory("Async-ThreadPool-"));
  43. return myThreadPoolTaskExecutor;
  44. }
  45. }

2.某个实现类中的某个方法使用线程池

  1. import com.xqny.logtest.service.ExecutorService;
  2. import lombok.SneakyThrows;
  3. import org.springframework.scheduling.annotation.Async;
  4. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  5. import org.springframework.stereotype.Service;
  6. import javax.annotation.Resource;
  7. import java.util.concurrent.CompletableFuture;
  8. /**
  9. * @author Raint
  10. * @date 2022年04月13日 11:37
  11. */
  12. @Service("ExecutorService")
  13. public class ExecutorServiceImpl implements ExecutorService {
  14. @Resource
  15. private ThreadPoolTaskExecutor myThreadPoolTaskExecutor; // myThreadPoolTaskExecutor 为beanName
  16. @Override
  17. @SneakyThrows
  18. public void testasync(){
  19. CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
  20. try {
  21. Thread.sleep(2000);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. System.out.println("业务线程1");
  26. }, myThreadPoolTaskExecutor);
  27. CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
  28. try {
  29. Thread.sleep(5000);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. System.out.println("业务线程2");
  34. }, myThreadPoolTaskExecutor);
  35. // 等待两个线程执行完
  36. CompletableFuture.allOf(completableFuture1, completableFuture2).get();
  37. return;
  38. }
  39. }

第一个线程睡眠2秒,第二个现场睡眠5秒,异步执行时间为5秒,而不是7秒