Java线程的创建非常昂贵,需要 JVM 和 OS 配合完成大量的工作:
- 必须为线程堆栈分配和初始化大量内存块,其中包含至少
1MB
的栈内存 - 需要进行系统调用,以便在 OS 中创建和注册本地线程
因此,Java 高并发频繁创建和销毁线程是非常低效的,为了更好的管理线程和提高性能,Java 使用了线程池。线程池主要解决了以下两个问题:
- 提升性能:线程池能独立负责线程的创建、维护和分配。在执行大量异步任务时,可以不需要自己创建线程,而是将任务交给线程池去调度。线程池能尽可能使用空闲的线程去执行异步任务,最大限度地对已经创建的线程进行复用,使得性能提升明显
- 线程管理:每个 Java 线程池会保持一些基本的线程统计信息,例如完成的任务数量、空闲时间等,以便对线程进行有效管理,使得能对所接收到的异步任务进行高效调度
线程池架构
图 - JUC 中线程池的类与接口的架构
Executor
public interface Executor {
void execute(Runnable command);
}
Executor 是 Java 异步目标任务的“执行者”接口,其目标是执行目标任务。Executor 作为执行者的角色,其目的是提供一种将“任务提交者”与“任务执行者”分离开来的机制。
ExecutorService
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 向线程池提交单个任务
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// 向线程池提交批量任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService 继承于 Executor。它是 Java 异步目标任务的“执行者服务接”口,对外提供异步任务的接收服务。ExecutorService 提供了“接收异步任务并转交给执行者”的方法,如submit系列方法、invoke系列方法等。
AbstractExecutorService
AbstractExecutorService 是一个抽象类,它实现了 ExecutorService 接口。AbstractExecutorService 存在的目的是为 ExecutorService 中的接口提供默认实现。
ThreadPoolExecutor
ThreadPoolExecutor 就是“线程池”实现类,它继承于 AbstractExecutorService 抽象类。ThreadPoolExecutor 是 JUC 线程池的核心实现类。线程的创建和终止需要很大的开销,线程池中预先提供了指定数量的可重用线程,所以使用线程池会节省系统资源,并且每个线程池都维护了一些基础的数据统计,方便线程的管理和监控。
ScheduledExecutorService
ScheduledExecutorService 是一个接口,它继承于 ExecutorService。它是一个可以完成“延时”和“周期性”任务的调度线程池接口,其功能和 Timer/TimerTask
类似。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 继承于 ThreadPoolExecutor,它提供了 ScheduledExecutorService 线程池接口中“延时执行”和“周期执行”等抽象调度方法的具体实现。ScheduledThreadPoolExecutor 类似于 Timer,但是在高并发程序中,ScheduledThreadPoolExecutor 的性能要优于 Timer。
Executors
Executors 是一个静态工厂类,它通过静态工厂方法返回 ExecutorService、ScheduledExecutorService 等线程池示例对象,这些静态工厂方法可以理解为一些快捷的创建线程池的方法。
Executors 的4种快捷创建线程池的方法
newSingleThreadExecutor 创建“单线程化线程池”
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
该方法用于创建一个“单线程化线程池”,也就是只有一个线程的线程池,所创建的线程池用唯一的工作线程来执行任务,使用此方法创建的线程池能保证所有任务按照指定顺序(如FIFO)执行。
单线程化线程池有如下特点:
- 单线程化的线程池中的任务是按照提交的次序顺序执行的
- 池中的唯一线程的存活时间是无限的
- 当池中的唯一线程正繁忙时,新提交的任务实例会进入内部的阻塞队列中,并且其阻塞队列是无界的(使用过程中注意
**OOM**
问题)
总体来说,单线程化的线程池所适用的场景是:任务按照提交次序,一个任务一个任务地逐个执行的场景。
以上用例在最后调用 shutdown()
方法来关闭线程池。执行 shutdown()
方法后,线程池状态变为 SHUTDOWN
,此时线程池将拒绝新任务,不能再往线程池中添加新任务,否则会抛出 RejectedExecutionException 异常。此时,线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成才会退出。还有一个与 shutdown()
类似的方法,叫作 shutdownNow()
,执行 shutdownNow()
方法后,线程池状态会立刻变成 STOP
,并试图停止所有正在执行的线程,并且不再处理还在阻塞队列中等待的任务,会返回那些未执行的任务。
newFixedThreadPool 创建“固定数量的线程池”
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
该方法用于创建一个“固定数量的线程池”,其唯一的参数用于设置池中线程的“固定数量”。
固定数量的线程池的有如下特点:
- 如果线程数没有达到“固定数量”,每次提交一个任务线程池内就创建一个新线程,直到线程达到线程池固定的数量
- 线程池的大小一旦达到“固定数量”就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程
- 在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列,注意
**OOM**
问题)
固定数量线程池的使用场景如下:需要任务长期执行的场景。“固定数量的线程池”的线程数能够比较稳定地保证一个数,能够避免频繁回收线程和创建线程,故适用于处理 CPU 密集型的任务,在 CPU 被工作线程长时间占用的情况下,能确保尽可能少地分配线程。
newCachedThreadPool 创建“可缓存线程池”
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
该方法用于创建一个“可缓存线程池”,如果线程池内的某些线程无事可干成为空闲线程,“可缓存线程池”可灵活回收这些空闲线程。
可缓存线程池的特点如下:
- 在接收新的异步任务 target 执行目标实例时,如果池内所有线程繁忙,此线程池就会添加新线程来处理任务
- 此线程池不会对线程池大小进行限制,线程池大小完全依赖于操作系统(或者说 JVM )能够创建的最大线程大小(注意
**OOM**
问题) 如果部分线程空闲,也就是存量线程的数量超过了处理任务数量,就会回收空闲(60秒不执行任务)线程
可缓存线程池的使用场景:需要快速处理突发性强、耗时较短的任务场景,如 Netty 的 NIO 处理场景、 REST API 接口的瞬时削峰场景。“可缓存线程池”的线程数量不固定,只要有空闲线程就会被回收;接收到的新异步任务执行目标,查看是否有线程处于空闲状态,如果没有就直接创建新的线程。
newScheduledThreadPool 创建“可调度线程池”
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
该方法用于创建一个“可调度线程池”,即一个提供“延时”和“周期性”任务调度功能的 ScheduledExecutorService
类型的线程池。ScheduleExecutorService 接口中有多个重要的接收被调目标任务的方法,其中 scheduleAtFixedRate
和 scheduleWithFixedDelay
使用得比较多。
**scheduleAtFixedRate**
:
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, // 异步任务 target 执行目标实例
long initialDelay, // 首次执行延时
long period, // 两次开始执行最小间隔时间
TimeUnit unit) { // 时间单位
......
}
**scheduleWithFixedDelay**
:
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, // 异步任务 target 执行目标实例
long initialDelay, // 首次执行延时
long delay, // 前一次执行结束到下一次开始执行的间隔时间
TimeUnit unit) { // 时间单位
......
}
当被调任务的执行时间大于指定的间隔时间时,ScheduleExecutorService 并不会创建一个新的线程去并发执行这个任务,而是等待前一次调度执行完毕。
可调度线程池的使用场景:周期性地执行任务的场景。
线程池的标准创建方式
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- keepAliveTime:空闲线程(默认情况下不包括核心线程)存活时间,超过该时间对应线程会被回收
- unit:空闲线程存活时间单位
- workQueue:任务队列,用于暂时接收异步任务。如果线程池的核心线程都在忙,那么所接收到的目标任务缓存在阻塞队列中
- threadFactory:线程工厂
- handler:拒绝策略
默认情况下,空闲线程超时策略仅适用于存在超过 corePoolSize 线程的情况。但若调用了 allowCoreThreadTimeOut(boolean)
方法,并且传入了参数 true,则 keepAliveTime 参数所设置的超时策略也将被应用于核心线程。
线程池工作原理
- 如果当前工作线程数量小于 corePoolSize,执行器总是优先创建一个任务线程,而不是从线程队列中获取一个空闲线程
- 如果线程池中总的任务数量大于 corePoolSize,新接收的任务将被加入阻塞队列中,一直到阻塞队列已满。在核心线程池数量已经用完、阻塞队列没有满的场景下,线程池不会为新任务创建一个新线程
- 当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光
- 在核心线程池数量已经用完、阻塞队列也已经满了的场景下,如果线程池接收到新的任务,将会为新任务创建一个线程(非核心线程),并且立即开始执行新任务
- 在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出 maximumPoolSize。如果线程池的线程总数超过 maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时,会为新任务执行拒绝策略
拒绝策略
提交任务到线程池被拒绝有两种情况:
- 线程池已经被关闭
- 工作队列已满且
maximumPoolSize
已满
在任务被决绝之后会触发拒绝策略,也就是 RejectedExecutionHandler 实例的 rejectedExecution()
方法。默认的拒绝策略有以下几种:
拒绝策略(AbortPolicy)
使用该策略时,如果线程池队列满了,新任务就会被拒绝,并且抛出 RejectedExecutionException 异常。该策略是线程池默认的拒绝策略。
抛弃策略(DiscardPolicy)
如果线程池队列满了,新任务就会直接被丢掉,并且不会有任何异常抛出。
抛弃最老任务策略(DiscardOldestPolicy)
如果队列满了,就会将最早进入队列的任务抛弃,从队列中腾出空间,再尝试加入队列。因为队列是队尾进队头出,队头元素是最老的,所以每次都是移除队头元素后再尝试入队。
调用者执行策略(CallerRunsPolicy)
在新任务被添加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务。
线程池提交任务的方式
execute 方法
public void execute(Runnable command) {
}
execute()
方法只能接收 Runnable 类型的参数,且提交任务之后没有返回值。
submit 方法
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
submit()
方法可以接收 Callable、Runnable 两种类型的参数,且 submit()
在启动之后会返回 Future 对象,代表一个异步执行实例,可以通过该异步执行实例去获取结果。另外,返回的 Future 对象(异步执行实例),可以在异步执行过程中进行异常捕获。
:::info Callable 是 JDK 1.5 加入的执行目标接口,作为 Runnable 的一种补充,允许有返回值,允许抛出异常。 Runnable 和 Callable 的主要区别为:
- Callable 允许有返回值,Runnable 不允许有返回值;
- Runnable 不允许抛出异常,Callable 允许抛出异常。 :::
事实上,submit()
最后调用的还是 execute()
。
调度器的钩子方法
ThreadPoolExecutor 线程池调度器为每个任务执行前后都提供了钩子方法。ThreadPoolExecutor 类提供了三个钩子方法,这三个钩子方法一般用作被子类重写,具体如下:
// 任务执行之前回调
protected void beforeExecute(Thread t, Runnable r) { }
// 任务执行之后回调
protected void afterExecute(Runnable r, Throwable t) { }
// 线程终止后回调
protected void terminated() { }
线程池的状态
public class ThreadPoolExecutor extends AbstractExecutorService {
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
}
线程池总共有5种状态:
- RUNNING:线程池创建之后的初始状态,这种状态下可以执行任务
- SHUTDOWN:该状态下线程池不再接受新任务,但是会将工作队列中的任务执行完毕
- STOP:该状态下线程池不再接受新任务,也不会处理工作队列中的剩余任务,并且将会中断所有工作线程
- TIDYING:该状态下所有任务都已终止或者处理完成,将会执行
terminated()
钩子方 - TERMINATED:执行完
terminated()
钩子方法之后的状态
线程池状态之间的转换关系如下:
- 线程池创建之后状态为 RUNNING
- 执行线程池的
shutdown()
实例方法,会使线程池状态从 RUNNING 转变为 SHUTDOWN - 执行线程池的
shutdownNow()
实例方法,会使线程池状态从 RUNNING 转变为 STOP - 当线程池处于 SHUTDOWN 状态时,执行其
shutdownNow()
方法会将其状态转变为 STOP - 等待线程池的所有工作线程停止,工作队列清空之后,线程池状态会从 STOP 转变为 TIDYING
- 执行完
terminated()
钩子方法之后,线程池状态从 TIDYING 转变为 TERMINATED
图 - 线程池的状态转换规则
如何优雅地关闭线程池
线程池提供了3个关闭线程池的方法:
**shutdown**
:是 JUC 提供的一个有序关闭线程池的方法,此方法会等待当前工作队列中的剩余任务全部执行完成之后,才会执行关闭,但是此方法被调用之后线程池的状态转为 SHUTDOWN,线程池不会再接收新的任务**shutdownNow**
:是 JUC 提供的一个立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务**awaitTermination**
:等待线程池完成关闭。在调用线程池的shutdown()
与shutdownNow()
方法时,当前线程会立即返回,不会一直等待直到线程池完成关闭。如果需要等到线程池关闭完成,可以调用awaitTermination()
方法
shutdown 方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// 检查线程池关闭的 Java Security 权限
checkShutdownAccess();
// 更新线程池状态。SHUTDOWN 状态下还继续提交任务就会触发拒绝策略
advanceRunState(SHUTDOWN);
// 中断空闲线程。中断只是设置了中断状态,并不会立即结束线程,需要用户线程主动配合中断操作结束线程
interruptIdleWorkers();
// 钩子函数,主要用于清理一些资源
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow 方法
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// 检查线程池关闭的 Java Security 权限
checkShutdownAccess();
// 更新线程池状态。STOP 状态下还继续提交任务就会触发拒绝策略
advanceRunState(STOP);
// 中断所有线程,包括空闲线程以及工作线程。中断只是设置了中断状态,并不会立即结束线程,需要用户线程主动配合中断操作结束线程
interruptWorkers();
// 丢弃工作队列中的剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
awaitTermination 方法
调用了 shutdown()
与 shutdownNow()
方法之后,用户程序并不会等待线程池关闭完成。需要调用该方法进行主动等待:
threadPool.shutdown();
try {
while(!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
System.out.println("线程池任务还未执行完毕");
}
} catch(InterruptedException e) {
}
如果线程池完成关闭,awaitTermination()
方法将会返回 true,否则当等待时间超过指定时间后将会返回 false。如果需要调用 awaitTermination()
,建议不是永久等待,而是设置一定重试次数。
public void shutdownThreadPool() {
if (!threadPoolExecutor.isTerminated()) {
try {
// 循环1000次,每次等待10ms
for (int i = 0; i < 1000; i++) {
if (threadPoolExecutor.awaitTermination(10, TimeUnit.MILLISECONDS)) {
break;
}
threadPoolExecutor.shutdownNow()
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
线程池优雅关闭示例
- 执行
shutdown()
方法,拒绝新任务的提交,并等待所有任务有序地执行完毕 - 执行
awaitTermination(long, TimeUnit)
方法,指定超时时间,判断是否线程池关闭完成 - 如果
awaitTermination()
方法返回 false,或者被中断,就调用shutdownNow()
方法立即关闭线程池所有任务 - 补充执行
awaitTermination(long, TimeUnit)
方法,判断线程池是否关闭完成。如果超时,就可以进入循环关闭,循环一定的次数(如1000次),不断关闭线程池,直到其关闭或者循环结束
public void shutdownThreadPool(ExecutorService threadPool) {
if (threadPool.isTerminated()) {
return;
}
// 关闭线程池,拒绝接受新任务
threadPool.shutdownNow();
try {
if (!threadPool.awaitTermination(60, TimeUnit.MILLISECONDS)) {
// 取消正在执行的任务
threadPool.shutdownNow();
// 等待一定的时间之后还未关闭完成,进入循环关闭缓解
if (!threadPool.awaitTermination(60, TimeUnit.MILLISECONDS)) {
System.out.println("线程池任务还未结束完毕");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!threadPool.isTerminated()) {
try {
for (int i = 0; i < 1000; i++) {
if (threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
break;
}
threadPool.shutdownNow();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
注册 JVM 钩子函数自动关闭线程池
如果使用了线程池,可以在 JVM 中注册一个钩子函数,在 JVM 进程关闭之前,由钩子函数自动将线程池优雅地关闭,以确保资源正常释放。
static {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
shutdownThreadPool(threadPoolExecutor);
}
}, "自动释放线程池线程"));
}
确定线程池的线程数
IO 密集型
此类任务主要是执行 IO 操作。由于执行 IO 操作的时间较长,导致 CPU 的利用率不高,这类任务 CPU 常处于空闲状态。
由于 IO 密集型任务的 CPU 使用率较低,导致线程空余时间很多,因此通常需要开 CPU 核心数两倍的线程。当 IO 线程空闲时,可以启用其他线程继续使用 CPU,以提高 CPU 的使用率。
private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2;
CPU 密集型
CPU 密集型任务也叫计算密集型任务,其特点是要进行大量计算而需要消耗 CPU 资源,比如计算圆周率、对视频进行高清解码等。CPU 密集型任务虽然也可以并行完成,但是并行的任务越多,花在任务切换的时间就越多,CPU 执行任务的效率就越低,所以要最高效地利用 CPU,CPU 密集型任务并行执行的数量应当等于 CPU 的核心数。
private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors();
混合性任务
此类任务既要执行逻辑计算,又要进行 IO 操作(如 RPC 调用、数据库访问)。相对来说,由于执行 IO 操作的耗时较长(一次网络往返往往在数百毫秒级别),这类任务的 CPU 利用率也不是太高。Web 服务器的 HTTP 请求处理操作为此类任务的典型例子。
在为混合线任务确定线程数时,业界有一个比较成熟的公式:
:::info 最佳线程数 = ((线程等待时间 + 线程 CPU 时间)/ 线程 CPU 时间) CPU核心数 ::: 通过公式可以看出:*等待时间所占的比例越高,需要的线程就越多;CPU 耗时所占的比例越高,需要的线程就越少。
例如,在 Web 服务器处理 HTTP 请求时,假设平均线程 CPU 运行时间为 100ms,而线程等待时间(比如包括 DB 操作、RPC 操作、缓存操作等)为 900ms,如果 CPU 核数为 8,那么根据上面这个公式,估算如下:
((900 + 100) / 100)* 8 = 80