Java线程的创建非常昂贵,需要 JVM 和 OS 配合完成大量的工作:

  1. 必须为线程堆栈分配和初始化大量内存块,其中包含至少 1MB 的栈内存
  2. 需要进行系统调用,以便在 OS 中创建和注册本地线程

因此,Java 高并发频繁创建和销毁线程是非常低效的,为了更好的管理线程和提高性能,Java 使用了线程池。线程池主要解决了以下两个问题:

  1. 提升性能:线程池能独立负责线程的创建、维护和分配。在执行大量异步任务时,可以不需要自己创建线程,而是将任务交给线程池去调度。线程池能尽可能使用空闲的线程去执行异步任务,最大限度地对已经创建的线程进行复用,使得性能提升明显
  2. 线程管理:每个 Java 线程池会保持一些基本的线程统计信息,例如完成的任务数量、空闲时间等,以便对线程进行有效管理,使得能对所接收到的异步任务进行高效调度

线程池架构

image.png
图 - JUC 中线程池的类与接口的架构

Executor

  1. public interface Executor {
  2. void execute(Runnable command);
  3. }

Executor 是 Java 异步目标任务的“执行者”接口,其目标是执行目标任务。Executor 作为执行者的角色,其目的是提供一种将“任务提交者”与“任务执行者”分离开来的机制。

ExecutorService

  1. public interface ExecutorService extends Executor {
  2. void shutdown();
  3. List<Runnable> shutdownNow();
  4. boolean isShutdown();
  5. boolean isTerminated();
  6. boolean awaitTermination(long timeout, TimeUnit unit)
  7. throws InterruptedException;
  8. // 向线程池提交单个任务
  9. <T> Future<T> submit(Callable<T> task);
  10. <T> Future<T> submit(Runnable task, T result);
  11. Future<?> submit(Runnable task);
  12. // 向线程池提交批量任务
  13. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  14. throws InterruptedException;
  15. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  16. long timeout, TimeUnit unit)
  17. throws InterruptedException;
  18. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  19. throws InterruptedException, ExecutionException;
  20. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  21. long timeout, TimeUnit unit)
  22. throws InterruptedException, ExecutionException, TimeoutException;
  23. }

ExecutorService 继承于 Executor。它是 Java 异步目标任务的“执行者服务接”口,对外提供异步任务的接收服务。ExecutorService 提供了“接收异步任务并转交给执行者”的方法,如submit系列方法、invoke系列方法等。

AbstractExecutorService

AbstractExecutorService 是一个抽象类,它实现了 ExecutorService 接口。AbstractExecutorService 存在的目的是为 ExecutorService 中的接口提供默认实现。

ThreadPoolExecutor

ThreadPoolExecutor 就是“线程池”实现类,它继承于 AbstractExecutorService 抽象类。ThreadPoolExecutor 是 JUC 线程池的核心实现类。线程的创建和终止需要很大的开销,线程池中预先提供了指定数量的可重用线程,所以使用线程池会节省系统资源,并且每个线程池都维护了一些基础的数据统计,方便线程的管理和监控。

ScheduledExecutorService

ScheduledExecutorService 是一个接口,它继承于 ExecutorService。它是一个可以完成“延时”和“周期性”任务的调度线程池接口,其功能和 Timer/TimerTask 类似。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 继承于 ThreadPoolExecutor,它提供了 ScheduledExecutorService 线程池接口中“延时执行”和“周期执行”等抽象调度方法的具体实现。ScheduledThreadPoolExecutor 类似于 Timer,但是在高并发程序中,ScheduledThreadPoolExecutor 的性能要优于 Timer。

Executors

Executors 是一个静态工厂类,它通过静态工厂方法返回 ExecutorService、ScheduledExecutorService 等线程池示例对象,这些静态工厂方法可以理解为一些快捷的创建线程池的方法。

Executors 的4种快捷创建线程池的方法

newSingleThreadExecutor 创建“单线程化线程池”

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

该方法用于创建一个“单线程化线程池”,也就是只有一个线程的线程池,所创建的线程池用唯一的工作线程来执行任务,使用此方法创建的线程池能保证所有任务按照指定顺序(如FIFO)执行。

单线程化线程池有如下特点:

  1. 单线程化的线程池中的任务是按照提交的次序顺序执行的
  2. 池中的唯一线程的存活时间是无限的
  3. 当池中的唯一线程正繁忙时,新提交的任务实例会进入内部的阻塞队列中,并且其阻塞队列是无界的(使用过程中注意 **OOM** 问题

总体来说,单线程化的线程池所适用的场景是:任务按照提交次序,一个任务一个任务地逐个执行的场景

以上用例在最后调用 shutdown() 方法来关闭线程池。执行 shutdown() 方法后,线程池状态变为 SHUTDOWN,此时线程池将拒绝新任务,不能再往线程池中添加新任务,否则会抛出 RejectedExecutionException 异常。此时,线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成才会退出。还有一个与 shutdown() 类似的方法,叫作 shutdownNow(),执行 shutdownNow() 方法后,线程池状态会立刻变成 STOP,并试图停止所有正在执行的线程,并且不再处理还在阻塞队列中等待的任务,会返回那些未执行的任务。

newFixedThreadPool 创建“固定数量的线程池”

  1. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>(),
  5. threadFactory);
  6. }

该方法用于创建一个“固定数量的线程池”,其唯一的参数用于设置池中线程的“固定数量”。

固定数量的线程池的有如下特点:

  1. 如果线程数没有达到“固定数量”,每次提交一个任务线程池内就创建一个新线程,直到线程达到线程池固定的数量
  2. 线程池的大小一旦达到“固定数量”就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程
  3. 在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列,注意 **OOM** 问题

固定数量线程池的使用场景如下:需要任务长期执行的场景。“固定数量的线程池”的线程数能够比较稳定地保证一个数,能够避免频繁回收线程和创建线程,故适用于处理 CPU 密集型的任务,在 CPU 被工作线程长时间占用的情况下,能确保尽可能少地分配线程。

newCachedThreadPool 创建“可缓存线程池”

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

该方法用于创建一个“可缓存线程池”,如果线程池内的某些线程无事可干成为空闲线程,“可缓存线程池”可灵活回收这些空闲线程。

可缓存线程池的特点如下:

  1. 在接收新的异步任务 target 执行目标实例时,如果池内所有线程繁忙,此线程池就会添加新线程来处理任务
  2. 此线程池不会对线程池大小进行限制,线程池大小完全依赖于操作系统(或者说 JVM )能够创建的最大线程大小(注意 **OOM** 问题
  3. 如果部分线程空闲,也就是存量线程的数量超过了处理任务数量,就会回收空闲(60秒不执行任务)线程

    可缓存线程池的使用场景:需要快速处理突发性强、耗时较短的任务场景,如 Netty 的 NIO 处理场景、 REST API 接口的瞬时削峰场景。“可缓存线程池”的线程数量不固定,只要有空闲线程就会被回收;接收到的新异步任务执行目标,查看是否有线程处于空闲状态,如果没有就直接创建新的线程。

newScheduledThreadPool 创建“可调度线程池”

  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2. return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }
  4. public ScheduledThreadPoolExecutor(int corePoolSize) {
  5. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  6. new DelayedWorkQueue());
  7. }

该方法用于创建一个“可调度线程池”,即一个提供“延时”和“周期性”任务调度功能的 ScheduledExecutorService 类型的线程池。ScheduleExecutorService 接口中有多个重要的接收被调目标任务的方法,其中 scheduleAtFixedRatescheduleWithFixedDelay 使用得比较多。

**scheduleAtFixedRate** :

  1. public ScheduledFuture<?> scheduleAtFixedRate(
  2. Runnable command, // 异步任务 target 执行目标实例
  3. long initialDelay, // 首次执行延时
  4. long period, // 两次开始执行最小间隔时间
  5. TimeUnit unit) { // 时间单位
  6. ......
  7. }

**scheduleWithFixedDelay**

  1. public ScheduledFuture<?> scheduleWithFixedDelay(
  2. Runnable command, // 异步任务 target 执行目标实例
  3. long initialDelay, // 首次执行延时
  4. long delay, // 前一次执行结束到下一次开始执行的间隔时间
  5. TimeUnit unit) { // 时间单位
  6. ......
  7. }

当被调任务的执行时间大于指定的间隔时间时,ScheduleExecutorService 并不会创建一个新的线程去并发执行这个任务,而是等待前一次调度执行完毕。

可调度线程池的使用场景:周期性地执行任务的场景。

线程池的标准创建方式

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. }
  1. corePoolSize:核心线程数
  2. maximumPoolSize:最大线程数
  3. keepAliveTime:空闲线程(默认情况下不包括核心线程)存活时间,超过该时间对应线程会被回收
  4. unit:空闲线程存活时间单位
  5. workQueue:任务队列,用于暂时接收异步任务。如果线程池的核心线程都在忙,那么所接收到的目标任务缓存在阻塞队列中
  6. threadFactory:线程工厂
  7. handler:拒绝策略

默认情况下,空闲线程超时策略仅适用于存在超过 corePoolSize 线程的情况。但若调用了 allowCoreThreadTimeOut(boolean) 方法,并且传入了参数 true,则 keepAliveTime 参数所设置的超时策略也将被应用于核心线程。

线程池工作原理


  1. 如果当前工作线程数量小于 corePoolSize,执行器总是优先创建一个任务线程,而不是从线程队列中获取一个空闲线程
  2. 如果线程池中总的任务数量大于 corePoolSize,新接收的任务将被加入阻塞队列中,一直到阻塞队列已满。在核心线程池数量已经用完、阻塞队列没有满的场景下,线程池不会为新任务创建一个新线程
  3. 当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光
  4. 在核心线程池数量已经用完、阻塞队列也已经满了的场景下,如果线程池接收到新的任务,将会为新任务创建一个线程(非核心线程),并且立即开始执行新任务
  5. 在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出 maximumPoolSize。如果线程池的线程总数超过 maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时,会为新任务执行拒绝策略

拒绝策略

提交任务到线程池被拒绝有两种情况:

  1. 线程池已经被关闭
  2. 工作队列已满且 maximumPoolSize 已满

在任务被决绝之后会触发拒绝策略,也就是 RejectedExecutionHandler 实例的 rejectedExecution() 方法。默认的拒绝策略有以下几种:

拒绝策略(AbortPolicy)

使用该策略时,如果线程池队列满了,新任务就会被拒绝,并且抛出 RejectedExecutionException 异常。该策略是线程池默认的拒绝策略。

抛弃策略(DiscardPolicy)

如果线程池队列满了,新任务就会直接被丢掉,并且不会有任何异常抛出。

抛弃最老任务策略(DiscardOldestPolicy)

如果队列满了,就会将最早进入队列的任务抛弃,从队列中腾出空间,再尝试加入队列。因为队列是队尾进队头出,队头元素是最老的,所以每次都是移除队头元素后再尝试入队。

调用者执行策略(CallerRunsPolicy)

在新任务被添加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务。

线程池提交任务的方式

execute 方法

  1. public void execute(Runnable command) {
  2. }

execute() 方法只能接收 Runnable 类型的参数,且提交任务之后没有返回值。

submit 方法

  1. <T> Future<T> submit(Callable<T> task);
  2. <T> Future<T> submit(Runnable task, T result);
  3. Future<?> submit(Runnable task);

submit() 方法可以接收 Callable、Runnable 两种类型的参数,且 submit() 在启动之后会返回 Future 对象,代表一个异步执行实例,可以通过该异步执行实例去获取结果。另外,返回的 Future 对象(异步执行实例),可以在异步执行过程中进行异常捕获。

:::info Callable 是 JDK 1.5 加入的执行目标接口,作为 Runnable 的一种补充,允许有返回值,允许抛出异常。 Runnable 和 Callable 的主要区别为:

  1. Callable 允许有返回值,Runnable 不允许有返回值;
  2. Runnable 不允许抛出异常,Callable 允许抛出异常。 :::

事实上,submit() 最后调用的还是 execute()

调度器的钩子方法

ThreadPoolExecutor 线程池调度器为每个任务执行前后都提供了钩子方法。ThreadPoolExecutor 类提供了三个钩子方法,这三个钩子方法一般用作被子类重写,具体如下:

  1. // 任务执行之前回调
  2. protected void beforeExecute(Thread t, Runnable r) { }
  3. // 任务执行之后回调
  4. protected void afterExecute(Runnable r, Throwable t) { }
  5. // 线程终止后回调
  6. protected void terminated() { }


线程池的状态

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. // runState is stored in the high-order bits
  3. private static final int RUNNING = -1 << COUNT_BITS;
  4. private static final int SHUTDOWN = 0 << COUNT_BITS;
  5. private static final int STOP = 1 << COUNT_BITS;
  6. private static final int TIDYING = 2 << COUNT_BITS;
  7. private static final int TERMINATED = 3 << COUNT_BITS;
  8. }

线程池总共有5种状态:

  1. RUNNING:线程池创建之后的初始状态,这种状态下可以执行任务
  2. SHUTDOWN:该状态下线程池不再接受新任务,但是会将工作队列中的任务执行完毕
  3. STOP:该状态下线程池不再接受新任务,也不会处理工作队列中的剩余任务,并且将会中断所有工作线程
  4. TIDYING:该状态下所有任务都已终止或者处理完成,将会执行 terminated() 钩子方
  5. TERMINATED:执行完 terminated() 钩子方法之后的状态

线程池状态之间的转换关系如下:

  1. 线程池创建之后状态为 RUNNING
  2. 执行线程池的 shutdown() 实例方法,会使线程池状态从 RUNNING 转变为 SHUTDOWN
  3. 执行线程池的 shutdownNow() 实例方法,会使线程池状态从 RUNNING 转变为 STOP
  4. 当线程池处于 SHUTDOWN 状态时,执行其 shutdownNow() 方法会将其状态转变为 STOP
  5. 等待线程池的所有工作线程停止,工作队列清空之后,线程池状态会从 STOP 转变为 TIDYING
  6. 执行完 terminated() 钩子方法之后,线程池状态从 TIDYING 转变为 TERMINATED

image.png
图 - 线程池的状态转换规则

如何优雅地关闭线程池

线程池提供了3个关闭线程池的方法:

  1. **shutdown**:是 JUC 提供的一个有序关闭线程池的方法,此方法会等待当前工作队列中的剩余任务全部执行完成之后,才会执行关闭,但是此方法被调用之后线程池的状态转为 SHUTDOWN,线程池不会再接收新的任务
  2. **shutdownNow**:是 JUC 提供的一个立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务
  3. **awaitTermination**:等待线程池完成关闭。在调用线程池的 shutdown()shutdownNow() 方法时,当前线程会立即返回,不会一直等待直到线程池完成关闭。如果需要等到线程池关闭完成,可以调用 awaitTermination() 方法

shutdown 方法

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. // 加锁
  4. mainLock.lock();
  5. try {
  6. // 检查线程池关闭的 Java Security 权限
  7. checkShutdownAccess();
  8. // 更新线程池状态。SHUTDOWN 状态下还继续提交任务就会触发拒绝策略
  9. advanceRunState(SHUTDOWN);
  10. // 中断空闲线程。中断只是设置了中断状态,并不会立即结束线程,需要用户线程主动配合中断操作结束线程
  11. interruptIdleWorkers();
  12. // 钩子函数,主要用于清理一些资源
  13. onShutdown(); // hook for ScheduledThreadPoolExecutor
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. tryTerminate();
  18. }

shutdownNow 方法

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. // 加锁
  5. mainLock.lock();
  6. try {
  7. // 检查线程池关闭的 Java Security 权限
  8. checkShutdownAccess();
  9. // 更新线程池状态。STOP 状态下还继续提交任务就会触发拒绝策略
  10. advanceRunState(STOP);
  11. // 中断所有线程,包括空闲线程以及工作线程。中断只是设置了中断状态,并不会立即结束线程,需要用户线程主动配合中断操作结束线程
  12. interruptWorkers();
  13. // 丢弃工作队列中的剩余任务
  14. tasks = drainQueue();
  15. } finally {
  16. mainLock.unlock();
  17. }
  18. tryTerminate();
  19. return tasks;
  20. }

awaitTermination 方法

调用了 shutdown()shutdownNow() 方法之后,用户程序并不会等待线程池关闭完成。需要调用该方法进行主动等待:

  1. threadPool.shutdown();
  2. try {
  3. while(!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
  4. System.out.println("线程池任务还未执行完毕");
  5. }
  6. } catch(InterruptedException e) {
  7. }

如果线程池完成关闭,awaitTermination() 方法将会返回 true,否则当等待时间超过指定时间后将会返回 false。如果需要调用 awaitTermination(),建议不是永久等待,而是设置一定重试次数。

  1. public void shutdownThreadPool() {
  2. if (!threadPoolExecutor.isTerminated()) {
  3. try {
  4. // 循环1000次,每次等待10ms
  5. for (int i = 0; i < 1000; i++) {
  6. if (threadPoolExecutor.awaitTermination(10, TimeUnit.MILLISECONDS)) {
  7. break;
  8. }
  9. threadPoolExecutor.shutdownNow()
  10. }
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }

线程池优雅关闭示例

  1. 执行 shutdown() 方法,拒绝新任务的提交,并等待所有任务有序地执行完毕
  2. 执行 awaitTermination(long, TimeUnit) 方法,指定超时时间,判断是否线程池关闭完成
  3. 如果 awaitTermination() 方法返回 false,或者被中断,就调用 shutdownNow() 方法立即关闭线程池所有任务
  4. 补充执行 awaitTermination(long, TimeUnit) 方法,判断线程池是否关闭完成。如果超时,就可以进入循环关闭,循环一定的次数(如1000次),不断关闭线程池,直到其关闭或者循环结束
  1. public void shutdownThreadPool(ExecutorService threadPool) {
  2. if (threadPool.isTerminated()) {
  3. return;
  4. }
  5. // 关闭线程池,拒绝接受新任务
  6. threadPool.shutdownNow();
  7. try {
  8. if (!threadPool.awaitTermination(60, TimeUnit.MILLISECONDS)) {
  9. // 取消正在执行的任务
  10. threadPool.shutdownNow();
  11. // 等待一定的时间之后还未关闭完成,进入循环关闭缓解
  12. if (!threadPool.awaitTermination(60, TimeUnit.MILLISECONDS)) {
  13. System.out.println("线程池任务还未结束完毕");
  14. }
  15. }
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. if (!threadPool.isTerminated()) {
  20. try {
  21. for (int i = 0; i < 1000; i++) {
  22. if (threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
  23. break;
  24. }
  25. threadPool.shutdownNow();
  26. }
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }

注册 JVM 钩子函数自动关闭线程池

如果使用了线程池,可以在 JVM 中注册一个钩子函数,在 JVM 进程关闭之前,由钩子函数自动将线程池优雅地关闭,以确保资源正常释放。

  1. static {
  2. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  3. @Override
  4. public void run() {
  5. shutdownThreadPool(threadPoolExecutor);
  6. }
  7. }, "自动释放线程池线程"));
  8. }

确定线程池的线程数

IO 密集型

此类任务主要是执行 IO 操作。由于执行 IO 操作的时间较长,导致 CPU 的利用率不高,这类任务 CPU 常处于空闲状态。

由于 IO 密集型任务的 CPU 使用率较低,导致线程空余时间很多,因此通常需要开 CPU 核心数两倍的线程。当 IO 线程空闲时,可以启用其他线程继续使用 CPU,以提高 CPU 的使用率。

  1. private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2;

CPU 密集型

CPU 密集型任务也叫计算密集型任务,其特点是要进行大量计算而需要消耗 CPU 资源,比如计算圆周率、对视频进行高清解码等。CPU 密集型任务虽然也可以并行完成,但是并行的任务越多,花在任务切换的时间就越多,CPU 执行任务的效率就越低,所以要最高效地利用 CPU,CPU 密集型任务并行执行的数量应当等于 CPU 的核心数。

  1. private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors();

混合性任务

此类任务既要执行逻辑计算,又要进行 IO 操作(如 RPC 调用、数据库访问)。相对来说,由于执行 IO 操作的耗时较长(一次网络往返往往在数百毫秒级别),这类任务的 CPU 利用率也不是太高。Web 服务器的 HTTP 请求处理操作为此类任务的典型例子。

在为混合线任务确定线程数时,业界有一个比较成熟的公式:

:::info 最佳线程数 = ((线程等待时间 + 线程 CPU 时间)/ 线程 CPU 时间) CPU核心数 ::: 通过公式可以看出:*等待时间所占的比例越高,需要的线程就越多;CPU 耗时所占的比例越高,需要的线程就越少。

例如,在 Web 服务器处理 HTTP 请求时,假设平均线程 CPU 运行时间为 100ms,而线程等待时间(比如包括 DB 操作、RPC 操作、缓存操作等)为 900ms,如果 CPU 核数为 8,那么根据上面这个公式,估算如下:

((900 + 100) / 100)* 8 = 80