一 基础知识

1.1 线程池的优势

线程池做的工作主要是控制运行的线程的数量,处理过程中将任务加入队列,然后在线程创建后启动这些任务,如果线程超过了最大数量,超出的数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
他的主要特点为:线程复用控制最大并发数管理线程

  1. 降低资源消耗,通过重复利用自己创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度,当任务到达时,任务可以不需要等到线程和粗昂就爱你就能立即执行。
  3. 提高线程的可管理性,线程是稀缺资源,如果无限的创建,不仅会消耗资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控。

    1.2 线程池的使用

    Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类。
    Executors类的几个创建线程的方法

  4. Executors.newScheduledThreadPool();

  5. Executors.newWorkStealingPool(n):java8新增,使用目前机器上可以的处理器作为他的并行级别
  6. Executors.newFixedThreadPool(n):创建有 n 个线程的线程池
  7. Executors.newSingleThreadExecutor():创建只有一个线程的线程池
  8. Executors.newCachedThreadPool():适用执行很多短期异步的小程序或者负载均衡或者负载较轻的服务器

    1. public class MyThreadPoolDemo {
    2. public static void main(String[] args) {
    3. // 一池5个处理线程
    4. ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
    5. // 一池只有一个线程
    6. ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
    7. // 一池N个线程,不固定,按需
    8. ExecutorService threadPool3 = Executors.newCachedThreadPool();
    9. try {
    10. for (int i = 0; i < 10; i++) {
    11. threadPool3.execute(() -> {
    12. System.out.println(Thread.currentThread().getName() + " 执行业务");
    13. });
    14. TimeUnit.MICROSECONDS.sleep(200);
    15. }
    16. } catch (Exception e) {
    17. e.printStackTrace();
    18. }finally {
    19. threadPool3.shutdown();
    20. }
    21. }
    22. }

    1.3 生命周期

    线程池从诞生到死亡,中间会经历runningshutdownstoptidyingterminated五个生命周期状态。

  9. RUNNING 表示线程池处于运行状态,能够接受新提交的任务且能对已添加的任务进行处理。RUNNING状态是线程池的初始化状态,线程池一旦被创建就处于RUNNING状态。

  10. SHUTDOWN 线程处于关闭状态,不接受新任务,但可以处理已添加的任务。RUNNING状态的线程池调用shutdown方法 后会进入SHUTDOWN状态。
  11. STOP 线程池处于停止状态,不接收任务,不处理已添加的任务,且会中断正在执行任务的线程。RUNNING状态的线程池调用了 shutdownNow方法 后会进入STOP状态。
  12. TIDYING 当所有任务已终止,且任务数量为0时,线程池会进入TIDYING。当线程池处于SHUTDOWN状态时,阻塞队列中的任务被执行完了,且线程池中没有正在执行的任务了,状态会由SHUTDOWN变为TIDYING。当线程处于STOP状态时,线程池中没有正在执行的任务时则会由STOP变为TIDYING。
  13. TERMINATED 线程终止状态。处于TIDYING状态的线程执行terminated()后进入TERMINATED状态。

image.png

1.4 七大参数

  1. // JDK8源码:ThreadPoolExecutor
  2. public class ThreadPoolExecutor extends AbstractExecutorService {
  3. public ThreadPoolExecutor(int corePoolSize,
  4. int maximumPoolSize,
  5. long keepAliveTime,
  6. TimeUnit unit,
  7. BlockingQueue<Runnable> workQueue,
  8. ThreadFactory threadFactory,
  9. RejectedExecutionHandler handler) {
  10. if (corePoolSize < 0 ||
  11. maximumPoolSize <= 0 ||
  12. maximumPoolSize < corePoolSize ||
  13. keepAliveTime < 0)
  14. throw new IllegalArgumentException();
  15. if (workQueue == null || threadFactory == null || handler == null)
  16. throw new NullPointerException();
  17. this.acc = System.getSecurityManager() == null ?
  18. null :
  19. AccessController.getContext();
  20. this.corePoolSize = corePoolSize;
  21. this.maximumPoolSize = maximumPoolSize;
  22. this.workQueue = workQueue;
  23. this.keepAliveTime = unit.toNanos(keepAliveTime);
  24. this.threadFactory = threadFactory;
  25. this.handler = handler;
  26. }
  27. }
  1. corePoolSize:线程池中的常驻核心线程数。在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近是理解为今日当值线程。当线程池中的线程数目达到corePoolSize后,就会把到达的任务放入到缓存队列当中。
  2. maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值大于等于1。
  3. keepAliveTime:多余的空闲线程存活时间,当空间时间达到keepAliveTime值时,多余的线程会被销毁直到只剩下corePoolSize个线程为止。只有当线程池中的线程数大于corePoolSize时keepAliveTime才会起作用,直到线程池中的线程数不大于corepoolSIze。
  4. unit:keepAliveTime的单位。
  5. workQueue:任务阻塞队列,被提交但尚未被执行的任务。
  6. threadFactory:表示生成线程池中工作线程的线程工厂,用户创建新线程,一般用默认即可。
  7. handler:拒绝策略,表示当线程队列满了并且工作线程大于等于线程池的最大显示数(maxnumPoolSize)时如何来拒绝。参考

    二 工作机制

    2.1 工作流程

    线程池提交任务是从 executor 方法开始的:

  8. 在创建了线程池之后,等待提交过来的任务请求。

  9. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断。
    1. 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务如果正在运行的线程数量大于等于corePoolSize,那么就把这个任务放到队列里面。
    2. 如果这时候队列满了,且正在运行的线程数量还小于maxnumPoolSize,那么还是需要创建非核心线程立刻运行这个任务。
    3. 如果队列满了,且正在运行的线程大于等于maxnumPoolSize,那么线程池会启动饱和拒绝策略来执行。
  10. 当一个线程完成任务后,它会从队列中取出下一个任务来执行。
  11. 当一个线程无事可做超过一定时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于corePoolSize,那么这个线程就会被停掉销毁。

    2.2 拒绝策略

    等待队列也已经排满了,再也塞不下新的任务了。同时,线程池的maxnumPoolSize也到达了,无法继续为新任务服务,这时我们需要拒绝策略机制合理的处理这个问题。JDK内置了4种拒绝策略。
    在JDK中提供了RejectedExecutionHandler接口来执行拒绝操作。实现RejectedExecutionHandler的类有四个,对应了四种拒绝策略。

  12. AbortPolicy(默认):直接抛出RejectedException异常阻止系统正常运行

  13. CallerRunPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
  14. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
  15. DiscardPolicy:接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种拒绝策略方案。

    三 手写线程池

    1)你在工作中单一的/固定数的/可变你的三种创建线程池的方法,你用哪个多?超级大坑!
  • 答案是一个都不用,我们生产上只能使用自定义的线程池,也即手写线程池。

2)Executors中JDK给你提供了,为什么不用?

  • 在《阿里巴巴Java开发手册》中提到:线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
  • Executors 返回的线程池对象的弊端如下:

    • FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
    • CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

      1. public class MyThreadPoolDemo {
      2. public static void main(String[] args) {
      3. // 手写线程池
      4. ThreadPoolExecutor threadPool4 = new ThreadPoolExecutor(
      5. 2, // 核心线程线
      6. 5, // 最大线程数
      7. 1L, // 多余线程存活时间
      8. TimeUnit.SECONDS, // 时间单位
      9. new LinkedBlockingDeque<>(3), // 阻塞队列,存放需要执行的任务
      10. Executors.defaultThreadFactory(), // 线程工厂
      11. new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
      12. );
      13. try {
      14. for (int i = 0; i < 10; i++) {
      15. threadPool4.execute(() -> {
      16. System.out.println(Thread.currentThread().getName() + " 执行业务");
      17. });
      18. }
      19. } catch (Exception e) {
      20. e.printStackTrace();
      21. }finally {
      22. threadPool4.shutdown();
      23. }
      24. }
      25. }

      3.1 配置线程池参数

      Runtime.getRuntime().availableProcessor():获取CPU核数

1)CPU密集型
CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。CPU密集任务只有真正的多核CPU才可能得到加速(通过多线程方式)。如果是单核CPU,那么开几个模拟的多线程任务都不可能得到加速,因为CPU的运算能力就只有这点。
CPU密集型任务配置只可能少的线程数量:CPU核数+1个线程的线程池

2)IO密集型
有两种方式:

  1. CPU核数 * 2
  2. CPU核数 /(1 - 阻塞系数),一般阻塞系数在0.8-0.9之间。

    四 Spring与线程池

    在SpringBoot中,可以使用 @EnableAsync@Async 这两个注解搭配使用,这样可以快速使用线程池异步调用方法。

    4.1 小案例

    配置SpringBoot使用的自定义线程池

    1. public class MyAsyncConfigurer implements AsyncConfigurer {
    2. /**
    3. * 返回一个线程池
    4. */
    5. @Override
    6. public Executor getAsyncExecutor() {
    7. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    8. executor.setCorePoolSize(5);
    9. executor.setMaxPoolSize(50);
    10. executor.setKeepAliveSeconds(60);
    11. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    12. executor.initialize();
    13. return executor;
    14. }
    15. /**
    16. * 在使用void返回类型的异步方法执行期间抛出异常时要使用的实例
    17. */
    18. @Override
    19. public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    20. return new SimpleAsyncUncaughtExceptionHandler();
    21. }
    22. }

    调用异步方法
    当在程序中定义了多个线程池时,@Async注解可以指定使用那个线程池,使用value属性指定线程池名

    1. @Component
    2. public class AsyncMethod {
    3. @Async
    4. public void fun1(String str) {
    5. System.out.println("I am another thread: " + str);
    6. }
    7. }

    4.2 Spring实现的线程池

  3. SimpleAsyncTaskExecutor:如果没有自定义线程池,Spring默认使用这个。注意:这个类并不是一个真正的线程池,在它调用executor方法时,会创建一个线程来执行任务,但是并不是维护了一个线程池存储线程。

  4. SyncTaskExecutor:这也不是一个线程池,而且是同步执行。
  5. ConcurrentTaskExecutor:不推荐使用,只要当 ThreadPoolTaskExecutor 不满需求时,才考虑使用这个。
  6. ThreadPoolTaskExecutor:最常推荐使用,本质是对 ThreadPoolExecutor 的封装。

在SpringBoot中配置多个线程池,@Async注解的value属性可以指定使用那一个线程。这样做的好处是:当项目中有多个逻辑需要使用到多线程,那么可以让不同的业务使用不同的线程池,防止一个线程池阻塞时,影响到其他的业务

  1. @Configuration
  2. public class ThreadPoolConfig {
  3. @Bean(value = "primaryExecutor")
  4. public Executor executor1() {
  5. return getExecutor(10, 60);
  6. }
  7. @Bean(value = "secondExecutor")
  8. public Executor executor2() {
  9. return getExecutor(5, 30);
  10. }
  11. private Executor getExecutor(int min, int max) {
  12. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  13. executor.setCorePoolSize(min);
  14. executor.setMaxPoolSize(max);
  15. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  16. executor.setKeepAliveSeconds(60);
  17. executor.setThreadNamePrefix("apps-thread-");
  18. executor.initialize();
  19. return executor;
  20. }
  21. }

4.3 @Async修饰的异步方法

被@Async修饰的方法有三种:

  1. 无返回值 void 类型的方法
  2. 返回值为:Future类型
  3. 返回值为:CompletableFuture类型

    五 源码分析

    4.1 executor方法

    向线程池提交任务的方法是execute方法,execute方法是ThreadPoolExecutor的核心方法
    1. public void execute(Runnable command) {
    2. if (command == null)
    3. throw new NullPointerException();
    4. // 获取ctl的值
    5. int c = ctl.get();
    6. // 1.线程数小于corePoolSize
    7. if (workerCountOf(c) < corePoolSize) {
    8. // 线程池中线程数小于核心线程数,则尝试创建核心线程执行任务
    9. if (addWorker(command, true))
    10. return;
    11. c = ctl.get();
    12. }
    13. // 2.到此处说明线程池中线程数大于核心线程数或者创建线程失败
    14. if (isRunning(c) && workQueue.offer(command)) {
    15. // 如果线程是运行状态并且可以使用offer将任务加入阻塞队列未满,offer是非阻塞操作。
    16. int recheck = ctl.get();
    17. // 重新检查线程池状态,因为上次检测后线程池状态可能发生改变,如果非运行状态就移除任务并执行拒绝策略
    18. if (! isRunning(recheck) && remove(command))
    19. reject(command);
    20. // 如果是运行状态,并且线程数是0,则创建线程
    21. else if (workerCountOf(recheck) == 0)
    22. // 线程数是0,则创建非核心线程,且不指定首次执行任务,这里的第二个参数其实没有实际意义
    23. addWorker(null, false);
    24. }
    25. // 3.阻塞队列已满,创建非核心线程执行任务
    26. else if (!addWorker(command, false))
    27. // 如果失败,则执行拒绝策略
    28. reject(command);
    29. }
    未完待续

参考资料:
1丶https://juejin.cn/post/6983213662383112206/#heading-9
2丶https://juejin.cn/post/6971334859717345311#heading-0