定义

什么是线程池:是一种基于池化思想管理线程的工具,如MYSQL。
线程过多会带来额外开销,如线程创建、销毁、调度等,同时降低计算机整体性能。线程池内维护多个线程,由管理者分配可并发执行的任务,这样一方面降低处理任务时由线程造成的额外开销,还可以避免线程数量激增导致的调度问题。
使用线程池的好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

    应用场景

    大量请求到达同一个查询等接口,造成长时间无法得到查询的数据等场景。

线程池基础

核心参数

进入ThreadPoolExecutor方法内,查看核心参数

  1. /*缓存队列。当请求线程数大于maximumPoolSize时,线程进入BlockingQueue阻塞队列。*/
  2. private final BlockingQueue<Runnable> workQueue;
  3. /**
  4. * 包含池中所有工作线程的集合。仅在持有 mainLock 时访问
  5. */
  6. private final HashSet<Worker> workers = new HashSet<Worker>();
  7. /**
  8. * 跟踪达到的最大池大小
  9. */
  10. private int largestPoolSize;
  11. /**
  12. * 已完成任务的计数器。仅在工作线程终止时更新.
  13. */
  14. private long completedTaskCount;
  15. /*
  16. * All user control parameters are declared as volatiles so that
  17. * ongoing actions are based on freshest values, but without need
  18. * for locking, since no internal invariants depend on them
  19. * changing synchronously with respect to other actions.
  20. */
  21. /**
  22. * 线程工厂。用来生产一组相同任务的线程。主要用于设置生成的线程名词前缀、是否
  23. * 为守护线程以及优先级等。设置有意义的名称前缀有利于在进行虚拟机分析时,知道线
  24. * 程是由哪个线程工厂创建的。
  25. */
  26. private volatile ThreadFactory threadFactory;
  27. /**
  28. * 拒绝策略
  29. */
  30. private volatile RejectedExecutionHandler handler;
  31. /**
  32. * 线程空闲时间。线程池中线程空闲时间达到keepAliveTime值时,线程会被销毁,
  33. * 只到剩下corePoolSize个线程为止。默认情况下,线程池的最大线程数大于corePoolSize时,
  34. * keepAliveTime才会起作用。如果allowCoreThreadTimeOut被设置为true,
  35. * 即使线程池的最大线程数等于corePoolSize,keepAliveTime也会起作用(回收超时的核心线程).
  36. */
  37. private volatile long keepAliveTime;
  38. /**
  39. * 如果为 false(默认),核心线程即使在空闲时也保持活动状态。如果为真,
  40. * 核心线程使用 keepAliveTime 超时等待工作.
  41. */
  42. private volatile boolean allowCoreThreadTimeOut;
  43. /**
  44. * 核心线程数。如果等于0,则任务执行完后,没有任务请求进入时销毁线程池中的线程。
  45. * 如果大于0,即使本地任务执行完毕,核心线程也不会被销毁。设置过大会浪费系统资源,
  46. * 设置过小导致线程频繁创建.
  47. */
  48. private volatile int corePoolSize;
  49. /**
  50. * 最大线程数。必须大于等于1,且大于等于corePoolSize。
  51. * 如果与corePoolSize相等,则线程池大小固定。
  52. * 如果大于corePoolSize,则最多创建maximumPoolSize个线程执行任务,
  53. * 其他任务加入到workQueue缓存队列中,
  54. * 当workQueue为空且执行任务数小于maximumPoolSize时,线程空闲时间超过keepAliveTime会被回收.
  55. */
  56. private volatile int maximumPoolSize;

阻塞队列

使用不同的队列可以实现不一样的任务存取策略
image.png

拒绝策略

可以通过实现**RejectedExecutionHandler**接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略
image.png

ThreadPoolExecutor设计

总体设计

以下为ThreadPoolExecutor的UML类图,
image.png
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
ExecutorService接口增加了一些能力:
(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
(2)提供了管控线程池的方法,比如停止线程池的运行。
AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

ThreadPoolExecutor运行机制如下
image.png
线程池的内部将线程和任务解耦,达到复用线程和缓冲作用。

构造方法

查阅源码发现,本质调用的是下面的构造方法

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. if (workQueue == null || threadFactory == null || handler == null)
  14. throw new NullPointerException();
  15. this.acc = System.getSecurityManager() == null ?
  16. null :
  17. AccessController.getContext();
  18. this.corePoolSize = corePoolSize;
  19. this.maximumPoolSize = maximumPoolSize;
  20. this.workQueue = workQueue;
  21. this.keepAliveTime = unit.toNanos(keepAliveTime);
  22. this.threadFactory = threadFactory;
  23. this.handler = handler;
  24. }

生命周期管理

线程池的运行状态无法显示控制,只能由内部来维护,线程池内部通过一个变量维护两个值:运行状态(runState)和线程数量 (workerCount),如下:

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

高3位保存runState,低29位保存workerCount
内部封装的获取生命周期状态、获取线程池线程数量的计算方法:

  1. private static int runStateOf(int c) { return c & ~CAPACITY; }
  2. private static int workerCountOf(int c) { return c & CAPACITY; }
  3. private static int ctlOf(int rs, int wc) { return rs | wc; }

ThreadPoolExecutor运行状态有以下五种

  1. private static final int RUNNING = -1 << COUNT_BITS;
  2. private static final int SHUTDOWN = 0 << COUNT_BITS;
  3. private static final int STOP = 1 << COUNT_BITS;
  4. private static final int TIDYING = 2 << COUNT_BITS;
  5. private static final int TERMINATED = 3 << COUNT_BITS;

image.png
生命周期转换如下:
image.png

任务执行

流程:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

来张流程图
image.png
执行的核心代码

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. if (workerCountOf(c) < corePoolSize) {
  6. // 则增加一个线程,并把该任务交给它去执行
  7. if (addWorker(command, true))
  8. return;
  9. // 创建核心线程失败,需要再次获取临时变量c
  10. c = ctl.get();
  11. }
  12. // 线程池的运行状态是RUNNING,并且尝试将新任务加入到阻塞队列,成功返回true
  13. if (isRunning(c) && workQueue.offer(command)) {
  14. // 已经向任务队列投放任务成功,再次获取线程池状态和有效线程数
  15. int recheck = ctl.get();
  16. // 如果线程池状态不是RUNNING(线程池异常终止了),将线程从工作队列中移除
  17. if (! isRunning(recheck) && remove(command))
  18. // 执行拒绝策略
  19. reject(command);
  20. // 线程池可能是running或移除失败。所有存活线程可能在最后一次检查后被回收,需要进行二次检查
  21. else if (workerCountOf(recheck) == 0)
  22. // 创建新线程并执行
  23. addWorker(null, false);
  24. }
  25. // 任务队列满,创建非核心线程 新建失败时执行拒绝策略
  26. else if (!addWorker(command, false))
  27. reject(command);
  28. }

worker线程管理

Worker线程

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。

  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
  2. // Worker持有的线程
  3. final Thread thread;
  4. // 初始化的任务,可以为null
  5. Runnable firstTask;
  6. }


Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。
Worker执行任务的模型:
image.png

线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  2. 如果正在执行任务,则不应该中断线程。
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。 4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

在线程回收过程中就使用到了这种特性,回收过程如下图所示:
image.png

Worker线程增加

增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,其执行流程如下图所示:
image.png

Worker线程回收

线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

线程回收的工作是在processWorkerExit方法完成的。
image.png

事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。

Worker线程执行任务

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

  1. while循环不断地通过getTask()方法获取任务。
  2. getTask()方法从阻塞队列中取任务。
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  4. 执行任务。
  5. 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

执行流程如下图所示:
image.png