介绍
线程池主要解决两个问题:一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接 new 一个线程来运行,而线程的创建和销毁是需要开销的。线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。二是线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。每个 ThreadPoolExecutor 也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。
另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同情境的需要,程序员可以使用更方便的 Executors 的工厂方法,比如 newCachedThreadPool(线程池线程个数最多可达 Integer.MAX_VALUE,线程自动回收)、newFixedThreadPool(固定大小的线程池)和 newSingleThreadExecutor(单个线程)等来创建线程池,当然用户还可以自定义。
类图介绍
在如图 8-1 所示的类图中,Executors 其实是个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程池实例。ThreadPoolExecutor
继承了 AbstractExecutorService
,成员变量 ctl 是一个 Integer 的原子变量,用来记录线程池状态和线程池中线程个数,类似于 ReentrantReadWriteLock 使用一个变量来保存两种信息。
这里假设 Integer 类型是 32 位二进制表示,则其中高 3 位用来表示线程池状态,后面 29 位用来记录线程池线程个数。
//(高 3 位)用来表示线程池状态,(低 29 位)用来表示线程个数
//默认是 RUNNING 状态,线程个数为 0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数掩码位数,并不是所有平台的 int 类型都是 32 位的,所以准确地说,是具体平台下 Integer 的二进制
// 位数-3 后的剩余位数所表示的数才是线程的个数
private static final int COUNT_BITS = Integer.SIZE -3;
//线程最大个数(低 29 位)00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) -1;
线程池状态:
//(高 3 位):11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//(高 3 位):00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//(高 3 位):00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//(高 3 位):01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//(高 3 位):01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取高 3 位(运行状态)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取低 29 位(线程个数)
private static int workerCountOf(int c) { return c & CAPACITY; }
//计算 ctl 新值(线程状态与线程个数)
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池状态含义如下。
● RUNNING:接受新任务并且处理阻塞队列里的任务。
● SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务。
● STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务。
● TIDYING:所有任务都执行完(包含阻塞队列里面的任务)后当前线程池活动线程数为 0,将要调用 terminated 方法。
● TERMINATED:终止状态。terminated 方法调用完成以后的状态。
线程池状态转换列举如下。
● RUNNING -> SHUTDOWN :显式调用 shutdown()方法,或者隐式调用了 finalize()方法里面的 shutdown()方法。
● RUNNING 或 SHUTDOWN)-> STOP :显式调用 shutdownNow()方法时。
● SHUTDOWN -> TIDYING :当线程池和任务队列都为空时。
● STOP -> TIDYING :当线程池为空时。
● TIDYING -> TERMINATED:当 terminated()hook 方法执行完成时。
线程池参数如下。
● corePoolSize:线程池核心线程个数。
● maximunPoolSize:线程池最大线程数量。
● keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间。
● TimeUnit:存活时间的时间单位。
● workQueue:用于保存等待执行的任务的阻塞队列,比如基于数组的有界 ArrayBlockingQueue、基于链表的无界 LinkedBlockingQueue、最多只有一个元素的同步队列 SynchronousQueue 及优先级队列 PriorityBlockingQueue 等。
● ThreadFactory:创建线程的工厂。
● RejectedExecutionHandler:饱和策略,当队列满并且线程个数达到 maximunPoolSize 后采取的策略,比如 AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用者所在线程来运行任务)、DiscardOldestPolicy(调用 poll 丢弃一个任务,执行当前任务)及 DiscardPolicy(默默丢弃,不抛出异常)
线程池类型如下。
● newFixedThreadPool :创建一个核心线程个数和最大线程个数都为 nThreads 的线程池,并且阻塞队列长度为 Integer.MAX_VALUE。keeyAliveTime=0 说明只要线程个数比核心线程个数多并且当前空闲则回收。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//使用自定义线程创建工厂
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
● newSingleThreadExecutor:创建一个核心线程个数和最大线程个数都为 1 的线程池,并且阻塞队列长度为 Integer.MAX_VALUE。keeyAliveTime=0 说明只要线程个数比核心线程个数多并且当前空闲则回收。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//使用自己的线程工厂
public static ExecutorService newSingleThreadExecutor(ThreadFactory
threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
● newCachedThreadPool :创建一个按需创建线程的线程池,初始线程个数为 0,最多线程个数为 Integer.MAX_VALUE,并且阻塞队列为同步队列。keeyAliveTime=60
说明只要当前线程在 60s 内空闲则回收。这个类型的特殊之处在于,加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//使用自定义的线程工厂
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
如上 ThreadPoolExecutor 类图所示,其中 mainLock 是独占锁,用来控制新增 Worker 线程操作的原子性。termination 是该锁对应的条件队列,在线程调用 awaitTermination 时用来存放阻塞的线程。
Worker 继承 AQS 和 Runnable 接口,是具体承载任务的对象。Worker 继承了 AQS,自己实现了简单不可重入独占锁,其中 state=0 表示锁未被获取状态,state=1 表示锁已经被获取的状态,state=-1 是创建 Worker 时默认的状态,创建时状态设置为-1 是为了避免该线程在运行 runWorker()方法前被中断,下面会具体讲解。其中变量 firstTask 记录该工作线程执行的第一个任务,thread 是具体执行任务的线程。
DefaultThreadFactory 是线程工厂,newThread 方法是对线程的一个修饰。其中 poolNumber 是个静态的原子变量,用来统计线程工厂的个数,threadNumber 用来记录每个线程工厂创建了多少线程,这两个值也作为线程池和线程的名称的一部分。