在 HotSpot VM 的线程模型中,Java 线程被一对一映射为内核线程。Java 在使用线程执行程序时,需要创建一个内核线程;当该 Java 线程被终止时,这个内核线程也会被回收。因此 Java 线程的创建与销毁会消耗一定的计算机资源,从而增加系统的性能开销。此外,大量创建线程同样会给系统带来性能问题,因为内存和 CPU 资源都将被线程抢占,如果处理不当,就会发生内存溢出、CPU 使用率超负荷等问题。

为了解决上述两类问题,Java 提供了线程池概念,对于频繁创建线程的业务场景,线程池可以创建固定的线程数量,并且在操作系统底层,轻量级进程将会把这些线程映射到内核。
ThreadPoolExecutor.png

核心参数

Java 并发包里提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,它强调的是 Executor 而不是一般意义上的池化资源。ThreadPoolExecutor 的构造函数非常复杂,最完备的构造函数有 7 个参数。

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler)

corePoolSize:核心线程数

线程池在完成初始化之后,默认情况下,线程池中不会有任何线程,线程池会等待任务提交到线程池的时候再去创建线程。即使有其他空闲线程能够执行新提交的任务也会创建线程,当线程池中已创建的线程数大于核心线程数时就不会再创建了。核心线程创建出来后即使超出了配置的线程存活时间也不会销毁,核心线程只要创建就永驻了,一直等待新任务来进行处理。如果调用了线程池 prestartAllCoreThreads() 方法,线程池会在没有任务到来之前提前创建并启动 corePoolSize 个线程。

maximumPoolSize:最大线程数

当核心线程忙不过来且工作队列已满时,如果此时已创建的线程数小于最大线程数,当有新任务进来的话,线程池会再创建新的线程执行任务,当线程数达到 maximumPoolSize 后就不会产生新线程了。

**keepAliveTime:线程存活时间、

如果线程池当前的线程数多于 corePoolSize 指定的数量,则当多余线程的空闲时间超过 keepAliveTime 及 unit 指定的时间后,这些多余线程(超出核心线程数的那些线程)就会被回收。Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时,这意味着如果线程池很闲,就会将线程池中的所有线程(包括核心线程)都回收。

workQueue:工作队列

当线程数达到核心线程数之后,此时如果有任务继续提交到线程池的话,就先进入 workQueue 等待。该队列是一个实现了 BlockingQueue 接口的阻塞队列,仅用于存放 Runnable 对象。通常有如下选择:

  • ArrayBlockingQueue:有界任务队列。
  • LinkedBlockingQueue:无界任务队列,最大为 Integer.MAX_VALUE。
  • SynchronousQueue:每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。
  • PriorityBlockingQueue:带有执行优先级的队列,可根据任务自身的优先级顺序先后执行。

threadFactory:线程创建工厂

默认采用的是 DefaultThreadFactory,主要负责创建线程,创建出来的线程都在同一个线程组且优先级也是一样的。可以使用 Guava 提供的 ThreadFactoryBuilder 给线程池里的线程设置有意义的名字。

handler:拒绝策略

当任务队列已满,且线程数已达到最大线程数时,如何拒绝任务。默认采用的是 AbortPolicy,即采取直接拒绝的策略,并抛出 RejectedExecutionException 异常。当然,我们也可以根据应用场景的需要来自定义实现 RejectedExecutionHandler 接口,如记录日志或持久化存储不能处理的任务等。

JAVA 线程池框架提供了 4 种内置的策略:

  • AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常。
  • CallerRunsPolicy:提交任务的线程自己去执行该任务。
  • DiscardOldestPolicy:丢弃最老的任务。
  • DiscardPolicy:丢弃任务但不抛出异常。

工作流程

ThreadPoolExecutor 的具体工作流程如下:
image.png
假设线程池参数配置为:核心线程数 5 个,最大线程数 10 个,队列长度为 100。

当线程池启动的时候不会创建任何线程,假设请求进来 6 个,则会创建 5 个核心线程来处理五个请求,另一个没被处理到的进入到队列。这时候又进来 99 个请求,线程池发现核心线程满了,队列还空着 99 个位置,所以会把这 99 个请求放到队列里,现在加上刚才的 1 个正好 100 个,队列已满。这时候再次进来 5 个请求,线程池会再开辟 5 个非核心线程来处理这五个请求。如果此时又进来 1 个请求,则直接走拒绝策略。

既然 Java 线程池是先用工作队列来存放来不及处理的任务,满了之后再扩容线程池。当我们的工作队列设置得很大时,最大线程数这个参数就显得没有意义了。我们有没有办法让线程池更激进一点,优先开启更多的线程,而把队列当成一个后备方案呢?

参考 Tomcat 线程池的实现:https://github.com/apache/tomcat/blob/a801409b37294c3f3dd5590453fb9580d7e33af2/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java

内部结构

线程池 ThreadPoolExecutor 的成员变量如下所示:

  1. // 用于保留任务并移交给工作线程的队列
  2. private final BlockingQueue<Runnable> workQueue;
  3. // 线程池的主要状态锁,当创建线程、回收线程、改变线程池状态等场景时要使用这个锁
  4. private final ReentrantLock mainLock = new ReentrantLock();
  5. // 用来存放线程池中的所有工作线程,仅在获取锁的情况下可以访问
  6. private final HashSet<Worker> workers = new HashSet<Worker>();
  7. // 等待条件以支持awaitTermination()方法
  8. private final Condition termination = mainLock.newCondition();
  9. // 记录线程池中曾经出现过的最大线程数,仅在获取锁的情况下可以访问
  10. private int largestPoolSize;
  11. // 记录完成的任务数,仅在工作线程终止时更新,仅在获取锁的情况下可以访问
  12. private long completedTaskCount;
  13. private volatile ThreadFactory threadFactory;
  14. private volatile RejectedExecutionHandler handler;
  15. private volatile long keepAliveTime;
  16. // 如果为false(默认),核心线程即使处于空闲状态也保持活动状态。如果为true则使用keepAliveTime来超时等待工作。
  17. private volatile boolean allowCoreThreadTimeOut;
  18. private volatile int corePoolSize;
  19. private volatile int maximumPoolSize;

其中,线程池的工作线程被抽象为静态内部类 Worker,基于 AQS 实现。

线程池通过一个 AtomicInteger 即保存了工作线程数,又保存了线程池运行状态,通过高低位的不同,将 32 位的 int 类型分为了两部分,分别保存了 runState 与 workerCount。其中,runState 被保存在高 3 位中,表示线程池的生命周期状态;其余低位用来保存 workerCount,表示有效的线程数。这是一个典型的高效优化。

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2. // 真正决定了工作线程数的理论上限
  3. private static final int COUNT_BITS = Integer.SIZE - 3;
  4. private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
  5. // 线程池状态,存储在数字的高位
  6. private static final int RUNNING = -1 << COUNT_BITS;
  7. private static int runStateOf(int c) { return c & ~COUNT_MASK; }
  8. private static int workerCountOf(int c) { return c & COUNT_MASK; }

这里在创建时设置了 runState 为 RUNNING 状态,这些状态之间的数字顺序很重要,runState 在运行过程中是单调增加的,但不是每个过程都需要经历:

  1. // 运行状态,可以接受新任务并处理排队中的任务
  2. private static final int RUNNING = -1 << COUNT_BITS;
  3. // 停工状态,不接受新任务,只处理排队中的任务
  4. private static final int SHUTDOWN = 0 << COUNT_BITS;
  5. // 停止状态,不接受新任务,不处理排队任务,并尝试中断正在执行的任务
  6. private static final int STOP = 1 << COUNT_BITS;
  7. // 清空状态,所有任务都已终止,workCount为零
  8. private static final int TIDYING = 2 << COUNT_BITS;
  9. // 终止状态,线程池已销毁
  10. private static final int TERMINATED = 3 << COUNT_BITS;

线程池 - 图3

核心方法

1. execute

下面我们选择典型的 execute 方法,来看看其是如何工作的:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. // 通过workerCountOf(c)获取低29位,计算工作线程数
  6. // 如果worker数量比核心线程数小,直接创建worker执行任务
  7. if (workerCountOf(c) < corePoolSize) {
  8. // 创建线程并执行任务,参数里的true表示新建的是一个核心线程
  9. if (addWorker(command, true))
  10. return;
  11. c = ctl.get();
  12. }
  13. // 如果worker数量已经大于核心线程数,则进入等待队列,注意offer是不会阻塞的,和put区分开。
  14. if (isRunning(c) && workQueue.offer(command)) {
  15. // 因为任务入队列前后,线程池的状态可能会发生变化,所以需要再recheck下
  16. int recheck = ctl.get();
  17. // 如果线程池不是RUNNING状态,说明执行过shutdown命令,需要将刚添加的任务从队列移除并执行拒绝策略
  18. if (! isRunning(recheck) && remove(command))
  19. reject(command);
  20. // 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
  21. else if (workerCountOf(recheck) == 0)
  22. addWorker(null, false);
  23. }
  24. // 如果进入等待队列失败,则直接将任务提交给线程池,此时入参为false,表示新建的是非核心线程
  25. else if (!addWorker(command, false))
  26. // 若达到线程池的最大线程数,则任务提交失败,此时执行拒绝策略
  27. reject(command);
  28. }

可以看到,execute 方法本身是没有加锁的,但是在 addWorker 方法内部会加锁,这样可以保证不会创建超过我们预期的线程数,并且做到了在最小的范围内的加锁,尽量减少锁竞争。

addWorker 代码逻辑:

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. ......
  3. boolean workerStarted = false;
  4. boolean workerAdded = false;
  5. Worker w = null;
  6. try {
  7. // 创建一个新的Worker,并传入当前任务,内部会创建一个新的线程
  8. w = new Worker(firstTask);
  9. final Thread t = w.thread;
  10. if (t != null) {
  11. // 在进行线程池敏感操作时,需要获取主锁,避免在添加和启动线程时被干扰
  12. final ReentrantLock mainLock = this.mainLock;
  13. mainLock.lock();
  14. try {
  15. // 重新检查状态,因为在获取锁前后,线程池状态可能发生变化
  16. int rs = runStateOf(ctl.get());
  17. if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
  18. if (t.isAlive())
  19. throw new IllegalThreadStateException();
  20. // 添加到workers集合
  21. workers.add(w);
  22. int s = workers.size();
  23. // 整个线程池在运行期间的最大并发任务个数
  24. if (s > largestPoolSize)
  25. largestPoolSize = s;
  26. workerAdded = true;
  27. }
  28. } finally {
  29. mainLock.unlock();
  30. }
  31. if (workerAdded) {
  32. // 添加到集合成功,则调用start方法启动Worker开始执行任务
  33. t.start();
  34. workerStarted = true;
  35. }
  36. }
  37. } finally {
  38. if (! workerStarted)
  39. // 线程启动失败逻辑
  40. addWorkerFailed(w);
  41. }
  42. return workerStarted;
  43. }

ThreadPoolExecutor 有一个内部类 Worker,查看该内部类的结构及构造方法,可以看到 Worker 本身也是实现了 Runnable 接口,在创建线程时会将当前 Worker 实例作为任务传入到新建线程中去,所以当我们在 addWorker 方法中启动线程后,线程执行的是该 Worker 实例的 run 方法,其内部又调用了 runWorker 方法,来实现每一个工作线程的固有工作。

  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  2. final Thread thread;
  3. Runnable firstTask;
  4. Worker(Runnable firstTask) {
  5. // 调用AQS内部的setState方法将锁状态设置为-1,用来禁止中断直到调用runWorker方法
  6. setState(-1);
  7. this.firstTask = firstTask;
  8. this.thread = getThreadFactory().newThread(this);
  9. }
  10. public void run() {
  11. runWorker(this);
  12. }
  13. }

runWorker 方法内部对我们实际传入的 Runnable 对象做了一层封装,对外提供了 beforeExecute() 和 afterExecute() 两个扩展方法。在实际应用中,可以对其进行扩展来实现对线程池运行状态的跟踪, 输出一些有用的调试信息等,帮助系统故障诊断。

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock();
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {
  9. w.lock();
  10. ......
  11. try {
  12. // 任务执行的前置扩展
  13. beforeExecute(wt, task);
  14. Throwable thrown = null;
  15. try {
  16. // 原始任务的执行,注意如果任务执行异常会被抛出,线程会中断退出
  17. task.run();
  18. } catch (RuntimeException x) {
  19. thrown = x; throw x;
  20. } catch (Error x) {
  21. thrown = x; throw x;
  22. } catch (Throwable x) {
  23. thrown = x; throw new Error(x);
  24. } finally {
  25. // 任务执行的后置扩展
  26. afterExecute(task, thrown);
  27. }
  28. } finally {
  29. task = null;
  30. w.completedTasks++;
  31. w.unlock();
  32. }
  33. }
  34. completedAbruptly = false;
  35. } finally {
  36. processWorkerExit(w, completedAbruptly);
  37. }
  38. }

2. submit

submit() 方法用于提交需要获取返回值的任务。线程池会返回一个 Future 类型的对象,可以通过 Future 提供的 get() 方法可以获取返回值。注意入参是 Callable 接口,其解决了 Runnable 接口没有返回值的问题。

  1. public <T> Future<T> submit(Callable<T> task) {
  2. if (task == null) throw new NullPointerException();
  3. // 将Callable包装成FutureTask,所以submit方法的返回值是FutureTask对象
  4. RunnableFuture<T> ftask = newTaskFor(task);
  5. // 本质还是调用线程池的execute方法
  6. execute(ftask);
  7. return ftask;
  8. }
  9. // 可以通过重写该方法改变返回的实际对象类型
  10. protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  11. return new FutureTask<T>(callable);
  12. }

可以看到 submit() 方法默认返回的是一个 FutureTask 对象,它同时实现了 Runnable 和 Future 接口。我们可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以由调用线程直接执行 FutureTask的 run() 方法,并通过 get() 方法获得任务的执行结果。

Future 接口有如下 5 个方法:

  1. // 取消任务,mayInterruptIfRunning表示任务是否能够接收中断
  2. boolean cancel(boolean mayInterruptIfRunning);
  3. // 判断任务是否已取消
  4. boolean isCancelled();
  5. // 判断任务是否已结束
  6. boolean isDone();
  7. // 阻塞获取任务执行结果
  8. V get() throws InterruptedException, ExecutionException;
  9. // 阻塞获取任务执行结果,支持超时机制
  10. V get(long timeout, TimeUnit unit)
  11. throws InterruptedException, ExecutionException, TimeoutException;

线程池 - 图4
注意:当通过 submit 方法提交任务时,如果触发了拒绝策略,并且采用的是 DiscardPolicy 拒绝策略,则返回的 FutureTask 对象实例的 get() 方法永远不会返回,这可能会导致线程一直阻塞。因为 DiscardPolicy 策略会直接丢弃并且不抛出异常,导致 FutureTask 对象内部的状态始终不会变化,所以 get() 方法的返回条件始终不满足,导致方法始终不返回。对于这种情况,我们可以采用带超时的 get() 方法或替换拒绝策略。

3. shutdown

调用 shutdown 方法后,线程池会将状态设置成 SHUTDOWN 状态并且不会再接收新任务了,如果继续提交任务到线程池,则直接采取拒绝策略。如果当前线程池中还有未执行完的任务(包括等待队列中未执行的任务,因为 Worker 线程会不断获取待执行的任务),线程池会等待所有任务都执行完才会 shutdown 掉。

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. checkShutdownAccess();
  6. // 死循环CAS修改线程池状态为SHUTDOWN
  7. advanceRunState(SHUTDOWN);
  8. // 中断空闲的工作线程
  9. interruptIdleWorkers();
  10. // 钩子函数
  11. onShutdown(); // hook for ScheduledThreadPoolExecutor
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. // 等待任务执行完成后终止
  16. tryTerminate();
  17. }
  18. private void interruptIdleWorkers(boolean onlyOne) {
  19. ......
  20. // 遍历worker列表
  21. for (Worker w : workers) {
  22. Thread t = w.thread;
  23. // 当线程未被中断且线程空闲时(未执行任务的线程)才去尝试设置中断标志,否则不做处理
  24. if (!t.isInterrupted() && w.tryLock()) {
  25. try {
  26. // 因为空闲线程会阻塞性地获取任务,所以这里发送一个中断通知线程退出,不要再等待任务了
  27. t.interrupt();
  28. } catch (SecurityException ignore) {
  29. } finally {
  30. w.unlock();
  31. }
  32. }
  33. }
  34. ......
  35. }

我们知道 shutdown 是无法关闭正在运行的线程的,那怎么才能识别正在运行的线程呢?

答案是:通过对 Worker 线程加锁实现的,其中 w.tryLock() 返回 true 表示能拿到 worker 的独占锁,如果能拿到锁,则表示 worker 线程当前没有在执行任务,因为当 worker 在获取到任务执行时会加锁。
image.png

4. shutdownNow

调用 shutdownNow() 方法后,线程池会将状态设置成 STOP 状态,此时线程池不会接受新的任务,并且会去尝试终止所有正在执行或暂停执行任务的线程,并返回尚未启动、等待执行任务的列表。

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess();
  7. // 死循环CAS修改线程池状态为STOP
  8. advanceRunState(STOP);
  9. // 中断工作线程
  10. interruptWorkers();
  11. tasks = drainQueue();
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. tryTerminate();
  16. // 返回尚未执行的任务列表
  17. return tasks;
  18. }
  19. private void interruptWorkers() {
  20. final ReentrantLock mainLock = this.mainLock;
  21. mainLock.lock();
  22. try {
  23. for (Worker w : workers)
  24. // 如果是工作线程则终止其运行
  25. w.interruptIfStarted();
  26. } finally {
  27. mainLock.unlock();
  28. }
  29. }
  30. void interruptIfStarted() {
  31. Thread t;
  32. // AQS里面的state为-1时表示线程还未为执行,会忽略中断操作,因此这里先判断下state>=0
  33. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  34. try {
  35. // 中断正在执行任务的Worker线程
  36. t.interrupt();
  37. } catch (SecurityException ignore) {
  38. }
  39. }
  40. }

它的原理是遍历线程池中的工作线程,然后逐个调用工作线程的 interrupt() 方法来中断线程,并且由于 Worker 在执行任务时如果被中断,则会直接抛出异常,所以正在执行任务的 Worker 线程也会立即退出。

线程数优化

一般多线程执行的任务类型可以分为 CPU 密集型和 I/O 密集型,根据不同的任务类型,我们计算线程数的方法也不一样。

CPU 密集型:这种任务消耗的主要是 CPU 资源,这种情况我们不能够通过增加线程数提高计算能力,因为如果线程太多,反倒可能导致大量的上下文切换开销。所以,通常建议按照 CPU 核的数目 N 或者 N+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

I/O 密集型:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。可以参考 Brain Goetz 推荐的计算方法:

  1. 线程数 = CPU核数 × 目标CPU利用率 ×(1 + 平均等待时间/平均工作时间)

CompletionService

CompletionService 接口的实现类是 ExecutorCompletionService,这个实现类有两个构造方法:

  1. public ExecutorCompletionService(Executor executor);
  2. public ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue);

如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象会加入到 completionQueue 中。

当需要批量提交异步任务时建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。此外 CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,可以轻松实现后续处理的有序性,避免无谓的等待。