概述

ThreadPoolExecutor 是 Java 非常重要的工具类,用好线程池可以成倍缩短业务执行时间。但是对于并发编程,用好也十分困难,它是一把双刃剑,只有勇士才能发挥最大效能。
Executor 体系结构如下图所示:
ThreadPoolExecutor.png
Executor 是最顶层的接口,接口定义的 API 非常简单,仅定义了一个 execute(Runnable) 接口,实现类会在未来的某一刻执行 Runnable
ExecutorService 同样也是接口,它的 API 比较丰富,通常我们也是用这里的 API 实现更高级的功能。里面包含大致有三类 API:

  1. 提交任务。包括单个任务提交(submit())和批量任务提交(invokeAll())。
  2. 状态判断。判断线程池内的任务是否已全部执行完成(isTerminated),判断线程池是否被关闭。
  3. 关闭线程池。比如立即关闭线程池,不会给还未执行的任务执行机会。ExecutorService 还提供了一个 awaitTermination 方法,用来当当前线程阻塞等待直到线程池完全关闭后被唤醒。

AbstractExecutorService 是一个抽象类,实现了 ExecutorService 部分接口,主要是提交任务。
ThreadPoolExecutor 是本篇文章最重要的类,它也是我们经常使用的线程池实现类。

状态定义

Doug Lea 使用一个 32 位 int 类型变量保存线程池的状态池中线程数量

  • 3 位存放线程池状态。
  • 29 位保存学院路线程数量。

线程池状态解析如下:

状态 描述
RUNNING (111) 线程池正常运行:线程池可以接收新的任务,并且线程也正常执行任务
SHUTDOWN (000) 拒绝新任务(提交),继续执行未完成的任务(包括 workQueue 队列中的任务)
STOP (001) 拒绝新任务(提交),中断正在执行的任务,不处理 workQueue 队列中的任务
TIDYING (010) 销毁所有任务,重置 workCount 为 0。当线程池状态变更为 TIDYING 时,会调用 terminated() 钩子函数。
TERMINATED (011) terminated() 方法结束后,意味着线程池已被关闭。
  1. state < 0,意味着线程池正常运行。
  2. state = 0,意味着无法提交任务,但仍能继续执行未完成的任务。
  3. state > 0,意味着正在执行的任务也要立即被中断执行。

线程池状态转换示意图.png

源码分析

内部类:Worker

Doug Lea 把线程池中的线程使用 Worker 内部类进行包装,而且 Worker 继承抽象灶 AQS,因此,它本身也是一把锁。

  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable
  2. {
  3. // 每个 Worker 类都会绑定一个 Thread 对象,由 ThreadFactory 线程工厂生成
  4. final Thread thread;
  5. // Worker 首个待执行的任务,在创建 Worker 时,通过构造函数传入,可能为 null
  6. // 这里感觉可以起到一个加速作用,不一定必须得通过阻塞队列才能获得任务
  7. Runnable firstTask;
  8. // 记录此线程完成的任务数量
  9. volatile long completedTasks;
  10. // 构造函数
  11. Worker(Runnable firstTask) {
  12. setState(-1); // inhibit interrupts until runWorker
  13. this.firstTask = firstTask;
  14. this.thread = getThreadFactory().newThread(this);
  15. }
  16. // ...
  17. }

Worker 继承 AQS 以实现一个简单的排它锁,当任务需要执行时,首先需要获取锁,才会调用 Runnable#run 方法,当任务执行完后,必须要释放锁,以便下一个任务可以获得该线程的执行权。

提交单个任务 submit

我们从最简单的提交单个 Runnable 任务开始讲起。submit(Runnable) 是在抽象类 AbstractExecutorService 实现的:

  1. // java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)
  2. /**
  3. * 向线程池提供一个Runnalbe任务,并返回一个Future凭证。
  4. * 可以通过 Future#get() 方法等待获取计算结果
  5. */
  6. public Future<?> submit(Runnable task) {
  7. if (task == null) throw new NullPointerException();
  8. // #1 包装适配类
  9. RunnableFuture<Void> ftask = newTaskFor(task, null);
  10. // #2 交给线程池执行任务
  11. execute(ftask);
  12. // #3 返回 Future 凭证
  13. return ftask;
  14. }
  15. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  16. return new FutureTask<T>(runnable, value);
  17. }

方法会将 Runnable 任务包装为 RunnableFuture 实现类 FutureTask,然后调用 Executor#execute(Runnable) 执行任务。包装这一步是为了可以从 FutureTask 获得计算结果。

任务执行:execute

  1. // java.util.concurrent.ThreadPoolExecutor#execute
  2. public void execute(Runnable command) {
  3. if (command == null)
  4. throw new NullPointerException();
  5. // #1 c 表示线程池状态和线程数
  6. int c = ctl.get();
  7. // #2 当前线程数小于核心线程数,同步创建一个新的Worker线程,异步执行任务
  8. if (workerCountOf(c) < corePoolSize) {
  9. if (addWorker(command, true))
  10. // #3 创建Worker线程并成功添加任务,直接返回
  11. return;
  12. // #4 返回false意味着出现错误,可能其它线程并发修改了
  13. c = ctl.get();
  14. }
  15. // #5 走到这里,说明当前线程数大于等于核心线程数,或者 addWorker() 方法调用失败
  16. // ① 判断当前线程池状态,没有异常,继续执行; ② 将任务添加到workQueue队列中
  17. if (isRunning(c) && workQueue.offer(command)) {
  18. // #6 任务入队成功,重新检查线程池状态
  19. int recheck = ctl.get();
  20. // #7 double-check 如果线程池没有处于运行状态,则从队列中移除任务
  21. if (! isRunning(recheck) && remove(command))
  22. // #8 执行拒绝策略
  23. reject(command);
  24. // #8 线程池状态正常,并且线程数为 0,那么创建新的 Worker 线程
  25. // 这里是一个兜底操作,担心任务提交到队列,但是没有活的线程可以来执行任务
  26. else if (workerCountOf(recheck) == 0)
  27. addWorker(null, false);
  28. }
  29. // #9 任务队列已满,offer 失败,那么创建新的 Worker 线程
  30. else if (!addWorker(command, false))
  31. // #10 Worker 数量不能超过 maxPoolSize,如果超过,则创建失败,执行拒绝策略
  32. reject(command);
  33. }

Java 线程池执行模型.png

创建 Worker 任务执行线程:addWorker

这是任务提交线程调用的方法,当我们阅读源码的时候,需要明确这个方法到底是被哪个线程所调用,存在哪些共享变量。
方法执行流程:

  1. 检查线程池状态。如果 state >= 0,就不允许提交新的任务。
  2. 状态没问题,获取当前线程池存活的线程数量。并判断是否出现溢出,即超出所设置的阈值。如果一切正常,那么通过 CAS 将线程数 +1。这里可能会发生失败,如果失败,重头再来。
  3. 这里我们会走到 #6,意味着我们 CAS 设置成功,接下来就需要创建 Worker 对象。首先,我们需要获取全局锁,因为向非安全类 HashSet 插入 Worker,而且,在获取全局锁后,我们会再次检查线程状态是否正常,如果不正常,抛出相关异常。如果一切满足要求,将 Worker 放入 HashSet 后,释放全局锁。
  4. 启动 Worker 工作线程,至此,该线程可以从阻塞队列中获取任务并执行任务。
  5. 如果线程没有启动成功,则会重置相关变量,并调用 terminate() 方法。

    1. // java.util.concurrent.ThreadPoolExecutor#addWorker
    2. /**
    3. * 检查是否可以根据当前线程池状态和给定的界限(corePoolSize 或 maxPoolSize)创建新的 Worker 任务线程。
    4. * 如果允许,则将 firstTask 作为新的 Wroker 线程的首个任务。
    5. * 如果线程池
    6. */
    7. private boolean addWorker(Runnable firstTask, boolean core) {
    8. retry:
    9. for (;;) {
    10. int c = ctl.get();
    11. int rs = runStateOf(c);
    12. // #1 这里只需要记住,虽然线程状态为 SHUTDOWN,但是当任务队列workQueue不为空时
    13. // 是允许创建 Worker 工作线程。但是不允许提交新的任务(firstTask 不为null,创建失败)
    14. // 因此,这里是对 >= SHUTDOWN 状态的处理,其中存在一种特殊情况是可以允许创建新的Worker线程
    15. // 否则,直接返回 false
    16. if (rs >= SHUTDOWN &&
    17. ! (rs == SHUTDOWN &&
    18. firstTask == null &&
    19. ! workQueue.isEmpty()))
    20. return false;
    21. // 死循环
    22. for (;;) {
    23. // #2 获取线程池 worker 数量
    24. int wc = workerCountOf(c);
    25. // #3 判断 wc 是否"溢出",如果超过预设值,返回 false,不允许创建 worker 线程
    26. if (wc >= CAPACITY ||
    27. wc >= (core ? corePoolSize : maximumPoolSize))
    28. return false;
    29. // #4 条件都满足了,CAS 将 woker 数量 + 1
    30. if (compareAndIncrementWorkerCount(c))
    31. // CAS 成功,跳出第一层 for(;;)
    32. break retry;
    33. // #5 CAS 失败,说明有其它线程也在尝试创建新的 Worker 线程
    34. // 重新来过
    35. c = ctl.get();
    36. if (runStateOf(c) != rs)
    37. continue retry;
    38. // else CAS failed due to workerCount change; retry inner loop
    39. }
    40. }
    41. // #6 走到这里,说明 CAS 设置成功(相当于占位),现在可以慢慢创建线程,并执行任务了
    42. // 注意,这里仍是提交任务的线程在执行代码
    43. boolean workerStarted = false; // worker 线程是否已启动
    44. boolean workerAdded = false; // worker 线程是否已被添加到 HashSet 中
    45. Worker w = null; // 新创建的 Worker 线程
    46. try {
    47. // #7 二话不说,直接创建 Worker 实例对象
    48. // Wroker 对象有三个重要的变量,其中包括首个任务、由线程工作创建的线程、还有一个已完成的任务数
    49. // 别忘记,Worker 可是继承 AQS 的
    50. w = new Worker(firstTask);
    51. // #8 获取 Worker 内部的线程
    52. final Thread t = w.thread;
    53. if (t != null) { // 安全判断是否为 null
    54. // #9 这是 ThreadPoolExecutor 对象的锁,可理解为线程池全局锁
    55. // 因为关闭线程也需要持有这把锁,所以在获得锁这段时间内,可以保证线程池不会被关闭
    56. final ReentrantLock mainLock = this.mainLock;
    57. // #10 加锁
    58. mainLock.lock();
    59. try {
    60. // #11 当持有全局锁时,重新检查一下线程状态
    61. int rs = runStateOf(ctl.get());
    62. if (rs < SHUTDOWN ||
    63. (rs == SHUTDOWN && firstTask == null)) {
    64. // #12 进到这里,说明线程池状态正常,满足启动Worker线程执行任务
    65. if (t.isAlive())
    66. // #13 Worker 所创建的线程出现问题,抛出异常
    67. throw new IllegalThreadStateException();
    68. // #13 将 Worker 添加到线程池全局变量中(HashSet 集合)
    69. workers.add(w);
    70. // #14 largestPoolSize 记录 workers 工作线程数量的最大值
    71. // 通过这个值可以得到当前线程池的峰值
    72. int s = workers.size();
    73. if (s > largestPoolSize)
    74. largestPoolSize = s;
    75. workerAdded = true;
    76. }
    77. } finally {
    78. // #15 释放全局锁
    79. mainLock.unlock();
    80. }
    81. // #16 启动线程
    82. if (workerAdded) {
    83. t.start();
    84. workerStarted = true;
    85. }
    86. }
    87. } finally {
    88. // #17 如果线程没有启动,做一些收尾工作,比如复原 workCount 数量等等
    89. if (! workerStarted)
    90. addWorkerFailed(w);
    91. }
    92. // #18 返回线程是否启动成功
    93. return workerStarted;
    94. }

    addWorkerFailed

    // java.util.concurrent.ThreadPoolExecutor#addWorkerFailed
    private void addWorkerFailed(Worker w) {
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();
     try {
         if (w != null)
             workers.remove(w);
         decrementWorkerCount();
         tryTerminate();
     } finally {
         mainLock.unlock();
     }
    }
    

    tryTerminate

    final void tryTerminate() {
     for (;;) {
         int c = ctl.get();
         if (isRunning(c) ||
             runStateAtLeast(c, TIDYING) ||
             (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
             return;
         if (workerCountOf(c) != 0) { // Eligible to terminate
             interruptIdleWorkers(ONLY_ONE);
             return;
         }
    
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                 try {
                     terminated();
                 } finally {
                     ctl.set(ctlOf(TERMINATED, 0));
                     termination.signalAll();
                 }
                 return;
             }
         } finally {
             mainLock.unlock();
         }
         // else retry on failed CAS
     }
    }
    

    Worker 执行任务:Worker#run

    Worker 线程启动是由提交任务线程来触发的,调用 Worker#start() 方法后,Worker 线程就会执行 run() 方法逻辑。

    // java.util.concurrent.ThreadPoolExecutor.Worker#run
    public void run() {
     runWorker(this);
    }
    

    runWorker

    逻辑相对简单,执行流程:

  6. 首先打开中断响应。

  7. 如果 firstTask != null,则先执行。否则调用 getTask() 从阻塞队列中获取任务。
  8. 获取独占锁 w.lock(),表示当前线程正在执行任务。任务执行完后,需要释放锁。
  9. 进入 finally ,线程执行完后,为了节约系统资源,执行完任务的线程需要被回收。 ```java // java.util.concurrent.ThreadPoolExecutor#runWorker final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // #1 获取当前 Worker 首个任务 Runnable task = w.firstTask;

    // #2 设置 firstTask 为 null w.firstTask = null;

    // #3 允许响应中断 w.unlock(); boolean completedAbruptly = true; try {

     // #4 循环调用 getTask 获取任务
     while (task != null || (task = getTask()) != null) {
         //#5 上锁,锁的范围仅限于当前 Worker 工作线程
         w.lock();
         // If pool is stopping, ensure thread is interrupted;
         // if not, ensure thread is not interrupted.  This
         // requires a recheck in second case to deal with
         // shutdownNow race while clearing interrupt
         // #6 如果线程池状态 >= STOP,确保该线程也要被中断
         if ((runStateAtLeast(ctl.get(), STOP) ||
              (Thread.interrupted() &&
               runStateAtLeast(ctl.get(), STOP))) &&
             !wt.isInterrupted())
             wt.interrupt();
         try {
             // #7 这是一个钩子函数,留给需要的子类去实现
             beforeExecute(wt, task);
    
             // #8 任务执行的过程中抛出的异常
             Throwable thrown = null;
             try {
    
                 // #9 执行任务
                 task.run();
             // #10 捕获并记录异常                    
             } catch (RuntimeException x) {
                 thrown = x; throw x;
             } catch (Error x) {
                 thrown = x; throw x;
             } catch (Throwable x) {
                 thrown = x; throw new Error(x);
             } finally {
                 afterExecute(task, thrown);
             }
         } finally {
             // #10 任务执行完成,置空 task ,准备通过 getTask() 获取下一个任务
             task = null;
    
             // #11 更新任务已完成计数器
             w.completedTasks++;
    
             // #12 释放锁
             w.unlock();
         }
     }
     completedAbruptly = false;
    

    } finally {

     // #13 如果执行到这里,需要执行线程关闭,一般遇到以下两种情况才会关闭Worker线程
     // ① getTask() 返回 null,② 任务执行过程中出现异常
     // processWorkExit 第二种情况会在这个方法中做一些清理工作
     processWorkerExit(w, completedAbruptly);
    

    } }

<a name="knI4h"></a>
### getTask
从队列队列中获取任务
```java
// java.util.concurrent.ThreadPoolExecutor#getTask
/**
 * 获取待执行的任务,总共有三种可能:
 * ① 阻塞直到成功获得任务才返回。我们知道,存在 corePoolSize 大小的线程数不会被回收。
 * ② 超时退出。keepAliveTime 起作用。如果一个线程空闲时间超过 keepAliveTime,那么它会被回收。
 * ③ 如果发生以下情况,一定会返回 null
 *   1. 池中存在大于 maximumPoolSize 个 worker 
 *   2. 线程池被关闭了。
 *   3. 线程池处于 SHUTDOWN 状态,而且 workQueue = null,这种情况线程池不会接收新的任务
 *   3. woker 等待超时。
 *
 */
private Runnable getTask() {
    // #1 poll() 操作是否超时
    boolean timedOut = false; // Did the last poll() time out?

    // 死循环
    for (;;) {
        // #2 获取线程池数量
        int c = ctl.get();
        int rs = runStateOf(c);

        // #3 检查线程池状态,存在两种情况会返回 null
        // ① rs == SHUTDOWN && workQueue.isEmpty
        // ② rs >= STOP
        // 这两种情况都没有任务可执行,所以直接返回
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // #4 获取池内存活的线程数量
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // #5 allowCoreThreadTimeOut(false) ,线程数量超出 corePoolSize,
        // 则会根据 keepAlive 设置来关闭线程,这是线程回收操作
        // 如果允许核心线程数也加入超时等待或者池内线程池超过 corePoolSize
        // 则会调用带超时时间的 poll(time) 
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 当前线程数超过 maximumPoolSize 或者超时
        // 且 wc > 1 或任务队列为空
        // 这里的意思是如果线程超时或超过 maximumPoolSize大小,并且池中有 > 1 的线程,或者北关区为空
        // 那么当前的这个 Worker 线程就可以被回收,避免资源浪费呀
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // #6 减少 Worker 数量
            if (compareAndDecrementWorkerCount(c))
                // #7 返回 null ,回收当前 Worker
                return null;
            continue;
        }

        try {
            // #8 等待 keepAliveTime,如果还没有得到任务,当前线程也可以被关闭
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时可退出
            workQueue.take(); // 一直等待直到有任务到达

            // #9 如果能得到任务,则立即返回
            if (r != null)
                return r;

            // #10 否则,timeOut 置为 true,表示这是一次超时获取,当前线程会被回收
            timedOut = true;
        } catch (InterruptedException retry) {
            // #11 Worker 发生中断,Worker 会进行重试操作。
            timedOut = false;
        }
    }
}

拒绝策略

RejectedExecutionHandler 是线程池拒绝策略的接口类,所有的拒绝策略都需要实现这个接口,它只有一个方法:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

入参有两个,一个是任务 r,另一个是线程池 executor 对象。这两个正是线程池的核心对象,我们可以通过这两个对象可以获得很大的权限。
当然,ThreadPoolExecutor 中已经有四个定义好的拒绝策略,当然,我们也可以根据实际需求实现个性化拒绝策略。我们先看看自带的拒绝策略:

拒绝策略 描述
CallerRunsPolicy 任务被线程池给拒绝了,只要线程池没有被关闭,那么谁提交谁执行。
AbortPolicy 无论什么情况,直接抛出 RejectedException 异常
DiscardPolicy 忽略任务,不做任何处理。
DiscardOldestPolicy 丢弃阻塞队列队头的任务(即等待时间最长),然后将新任务添加到队列队列中

一般默认是 CallerRunsPolicy,其它拒绝策略都不可靠。

Executors

Executors 是一个工具类,可以快速创建线程池。

// 拥有恒定数量的线程池,且队列长度无限制
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, 
                                  nThreads,
                                  0L, 
                                  TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

newCachedThreadPool 会在很短的时间内缓存线程,以将其重用于其它任务。

// 线程池从0开始增长,数量可能会到达 MAX_VALUE,
// 线程池将会删除闲置一分钟的线程
// 而 SynchronousQueue 队列为 0,因此,如果有空闲线程,交给空闲线程处理,否则,创建一个新的线程
// 来处理该任务
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, 
                                  Integer.MAX_VALUE,
                                  60L, 
                                  TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

总结

ThreadPoolExecutor 总共有 5 个特别关键的参数:

  • corePoolSize
  • maximumPoolSize
  • workQueue
  • keepAliveTime
  • rejectedExecutionHandler

我们需要清楚,线程,即 Worker 对象创建的时机,而且如何与阻塞队列配合使用。并且,还有线程回收机制。当任务执行过程中出现异常,执行线程会被关闭。