介绍

线程池主要解决两个问题:一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接 new 一个线程来运行,而线程的创建和销毁是需要开销的。线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。二是线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。每个 ThreadPoolExecutor 也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。

另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同情境的需要,程序员可以使用更方便的 Executors 的工厂方法,比如 newCachedThreadPool(线程池线程个数最多可达 Integer.MAX_VALUE,线程自动回收)、newFixedThreadPool(固定大小的线程池)和 newSingleThreadExecutor(单个线程)等来创建线程池,当然用户还可以自定义。

类图介绍

在如图 8-1 所示的类图中,Executors 其实是个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程池实例。ThreadPoolExecutor 继承了 AbstractExecutorService,成员变量 ctl 是一个 Integer 的原子变量,用来记录线程池状态和线程池中线程个数,类似于 ReentrantReadWriteLock 使用一个变量来保存两种信息。

线程池介绍 - 图1

这里假设 Integer 类型是 32 位二进制表示,则其中高 3 位用来表示线程池状态,后面 29 位用来记录线程池线程个数。

  1. //(高 3 位)用来表示线程池状态,(低 29 位)用来表示线程个数
  2. //默认是 RUNNING 状态,线程个数为 0
  3. private final AtomicInteger ctl = new AtomicIntegerctlOf(RUNNING, 0));
  4. //线程个数掩码位数,并不是所有平台的 int 类型都是 32 位的,所以准确地说,是具体平台下 Integer 的二进制
  5. // 位数-3 后的剩余位数所表示的数才是线程的个数
  6. private static final int COUNT_BITS = Integer.SIZE -3
  7. //线程最大个数(低 29 位)00011111111111111111111111111111
  8. private static final int CAPACITY = 1 << COUNT_BITS -1

线程池状态:

  1. //(高 3 位):11100000000000000000000000000000
  2. private static final int RUNNING = -1 << COUNT_BITS
  3. //(高 3 位):00000000000000000000000000000000
  4. private static final int SHUTDOWN = 0 << COUNT_BITS
  5. //(高 3 位):00100000000000000000000000000000
  6. private static final int STOP = 1 << COUNT_BITS
  7. //(高 3 位):01000000000000000000000000000000
  8. private static final int TIDYING = 2 << COUNT_BITS
  9. //(高 3 位):01100000000000000000000000000000
  10. private static final int TERMINATED = 3 << COUNT_BITS
  11. // 获取高 3 位(运行状态)
  12. private static int runStateOfint c { return c & CAPACITY }
  13. //获取低 29 位(线程个数)
  14. private static int workerCountOfint c { return c & CAPACITY }
  15. //计算 ctl 新值(线程状态与线程个数)
  16. private static int ctlOfint rs, int wc { return rs | wc }

线程池状态含义如下。

● RUNNING:接受新任务并且处理阻塞队列里的任务。

● SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务

● STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务

● TIDYING:所有任务都执行完(包含阻塞队列里面的任务)后当前线程池活动线程数为 0,将要调用 terminated 方法

● TERMINATED:终止状态。terminated 方法调用完成以后的状态。

线程池状态转换列举如下。

● RUNNING -> SHUTDOWN :显式调用 shutdown()方法,或者隐式调用了 finalize()方法里面的 shutdown()方法。

● RUNNING 或 SHUTDOWN)-> STOP :显式调用 shutdownNow()方法时。

● SHUTDOWN -> TIDYING :当线程池和任务队列都为空时。

● STOP -> TIDYING :当线程池为空时。

● TIDYING -> TERMINATED:当 terminated()hook 方法执行完成时。

线程池参数如下。

● corePoolSize:线程池核心线程个数。

● maximunPoolSize:线程池最大线程数量。

● keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间。

● TimeUnit:存活时间的时间单位。

● workQueue:用于保存等待执行的任务的阻塞队列,比如基于数组的有界 ArrayBlockingQueue、基于链表的无界 LinkedBlockingQueue、最多只有一个元素的同步队列 SynchronousQueue 及优先级队列 PriorityBlockingQueue 等。

● ThreadFactory:创建线程的工厂。

● RejectedExecutionHandler:饱和策略,当队列满并且线程个数达到 maximunPoolSize 后采取的策略,比如 AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用者所在线程来运行任务)、DiscardOldestPolicy(调用 poll 丢弃一个任务,执行当前任务)及 DiscardPolicy(默默丢弃,不抛出异常)

线程池类型如下。

● newFixedThreadPool :创建一个核心线程个数和最大线程个数都为 nThreads 的线程池,并且阻塞队列长度为 Integer.MAX_VALUE。keeyAliveTime=0 说明只要线程个数比核心线程个数多并且当前空闲则回收。

  1. public static ExecutorService newFixedThreadPoolint nThreads {
  2. return new ThreadPoolExecutornThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }
  6. //使用自定义线程创建工厂
  7. public static ExecutorService newFixedThreadPoolint nThreads, ThreadFactory threadFactory {
  8. return new ThreadPoolExecutornThreads, nThreads,
  9. 0L, TimeUnit.MILLISECONDS,
  10. new LinkedBlockingQueue<Runnable>(),
  11. threadFactory);
  12. }

● newSingleThreadExecutor:创建一个核心线程个数和最大线程个数都为 1 的线程池,并且阻塞队列长度为 Integer.MAX_VALUE。keeyAliveTime=0 说明只要线程个数比核心线程个数多并且当前空闲则回收。

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }
  7. //使用自己的线程工厂
  8. public static ExecutorService newSingleThreadExecutorThreadFactory
  9. threadFactory {
  10. return new FinalizableDelegatedExecutorService
  11. new ThreadPoolExecutor(1, 1,
  12. 0L, TimeUnit.MILLISECONDS,
  13. new LinkedBlockingQueue<Runnable>(),
  14. threadFactory));
  15. }

● newCachedThreadPool :创建一个按需创建线程的线程池,初始线程个数为 0,最多线程个数为 Integer.MAX_VALUE,并且阻塞队列为同步队列。keeyAliveTime=60 说明只要当前线程在 60s 内空闲则回收。这个类型的特殊之处在于,加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务。

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }
  6. //使用自定义的线程工厂
  7. public static ExecutorService newCachedThreadPoolThreadFactory threadFactory {
  8. return new ThreadPoolExecutor0, Integer.MAX_VALUE,
  9. 60L, TimeUnit.SECONDS,
  10. new SynchronousQueue<Runnable>(),
  11. threadFactory);
  12. }

如上 ThreadPoolExecutor 类图所示,其中 mainLock 是独占锁,用来控制新增 Worker 线程操作的原子性。termination 是该锁对应的条件队列,在线程调用 awaitTermination 时用来存放阻塞的线程。

Worker 继承 AQS 和 Runnable 接口,是具体承载任务的对象。Worker 继承了 AQS,自己实现了简单不可重入独占锁,其中 state=0 表示锁未被获取状态,state=1 表示锁已经被获取的状态,state=-1 是创建 Worker 时默认的状态,创建时状态设置为-1 是为了避免该线程在运行 runWorker()方法前被中断,下面会具体讲解。其中变量 firstTask 记录该工作线程执行的第一个任务,thread 是具体执行任务的线程。

DefaultThreadFactory 是线程工厂,newThread 方法是对线程的一个修饰。其中 poolNumber 是个静态的原子变量,用来统计线程工厂的个数,threadNumber 用来记录每个线程工厂创建了多少线程,这两个值也作为线程池和线程的名称的一部分。