1. 概况

线程池作用:

  • 减少频繁创建、销毁线程的资源损耗
  • 任务来了,可以立即执行,响应快
  • 避免对资源的过度使用,超过预期的请求能够拒绝

2. 类定义

ThreadPoolExecutor 线程池源码分析 - 图1

  • ExecutorService 扩充执行任务的能力,补充可以为一个或一批异步任务生成 Future的 方法;提供了管控线程池的方法,比如停止线程池的运行。
  • AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. }
  3. public abstract class AbstractExecutorService implements ExecutorService {
  4. }
  5. public interface ExecutorService extends Executor {
  6. }
  7. public interface Executor {
  8. void execute(Runnable command);
  9. }

3. 成员变量

3.1 ctl

用一个变量去表示两种状态,避免为了维护两者的一致使用额外的锁资源。高 3 位表示线程池的状态 runState,低 29 位表示有效工作线程数量 workerCount。

  1. // ctl 高 3 位表示线程状态 RunState,低 29 位表示线程数量
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. private static final int COUNT_BITS = Integer.SIZE - 3;
  4. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  5. // Packing and unpacking ctl
  6. private static int runStateOf(int c) { return c & ~CAPACITY; } // 计算 runState
  7. private static int workerCountOf(int c) { return c & CAPACITY; } // 计算 workerCount
  8. private static int ctlOf(int rs, int wc) { return rs | wc; } // 通过 runState 和 workerCount 计算出 ctl

3.2 RunState

线程池的运行状态,值从小到大排序: RUNNING < SHUTDOW < STOP < TIDYING < TERMINATED

  1. private static final int RUNNING = -1 << COUNT_BITS; // 111
  2. private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
  3. private static final int STOP = 1 << COUNT_BITS; // 001
  4. private static final int TIDYING = 2 << COUNT_BITS; // 010,所有的任务已经终止,workerCount = 0,将会执行 terminated() 的 hook 方法
  5. private static final int TERMINATED = 3 << COUNT_BITS;// 011

ThreadPoolExecutor 线程池源码分析 - 图2
线程池状态转换:

  • RUNNING -> SHUTDOWN,调用 shutdown() 或者 finalize()
  • (RUNNING or SHUTDOWN) -> STOP,调用 shutdownNow()
  • SHUTDOWN -> TIDYING,队列中的任务和线程池中的线程都为空
  • STOP -> TIDYING,线程池线程为空
  • TIDYING -> TERMINATED,调用 terminated() 方法
    ThreadPoolExecutor 线程池源码分析 - 图3

3.3 其它成员变量

  1. // 保存任务的队列,阻塞式
  2. private final BlockingQueue<Runnable> workQueue;
  3. // 主锁,后面防止并发的时候都会用到这个锁
  4. private final ReentrantLock mainLock = new ReentrantLock();
  5. // 线程池中的工作线程放到 worker 中,将任务与工作线程分开
  6. private final HashSet<Worker> workers = new HashSet<Worker>();
  7. // 调用 awaitTermination() 方法时,线程会等待这个条件
  8. private final Condition termination = mainLock.newCondition();
  9. // 线程池中线程数曾经的最大值
  10. private int largestPoolSize;
  11. // 线程池完成的任务数
  12. private long completedTaskCount;

阻塞队列可选:

3.4 构造函数所需变量

  1. // 创建线程的工厂类,在里面可以设置名称
  2. private volatile ThreadFactory threadFactory;
  3. // 拒绝策略
  4. private volatile RejectedExecutionHandler handler;
  5. // 超过 coresize 的线程空闲等待时长,nanoseconds 级别
  6. private volatile long keepAliveTime;
  7. // core 线程空闲标志,默认为 false,若设为 true ,core 线程将等待 keepAliveTime 之后被销魂
  8. private volatile boolean allowCoreThreadTimeOut;
  9. // 线程池的最大数量,一般为 CAPACITY
  10. private volatile int maximumPoolSize;
  11. // 默认的拒绝策略
  12. private static final RejectedExecutionHandler defaultHandler =
  13. new AbortPolicy();

5. 成员方法

线程池创建之后,一般就会调用线程池的 execute 方法执行任务

5.1 execute

通过构造函数创建线程池之后,使用 execute 方法执行放到线程池中的任务,他的执行过程参考下图:
ThreadPoolExecutor 线程池源码分析 - 图4

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get(); // 获取当前 ctl 的值,高 3 位是线程状态,低 29 位是线程数
  5. if (workerCountOf(c) < corePoolSize) { // 工作线程数小于 corePoolSize,通过 addWorker 添加核心线程,把当前任务 command 作为该线程的 firstTask
  6. if (addWorker(command, true)) // 添加核心线程,addWorker 里面有执行任务的逻辑
  7. return;
  8. c = ctl.get();
  9. }
  10. // 走到这里,表示线程数大于 corePoolSize 或者 添加工作线程失败了
  11. // 线程池是 running 状态,并且工作队列没满(能够添加任务)
  12. if (isRunning(c) && workQueue.offer(command)) {
  13. int recheck = ctl.get();
  14. if (! isRunning(recheck) && remove(command)) // 线程池不是 RUNNING,移除刚刚添加的任务,执行拒绝策略
  15. reject(command);
  16. else if (workerCountOf(recheck) == 0) // 此时没有 worker ,添加 worker
  17. addWorker(null, false); // 添加非核心线程
  18. }
  19. // 添加工作线程失败了
  20. else if (!addWorker(command, false)) // 队列满了,并且添加非核心线程返回 false(线程数达到 maximumPoolSize),需要执行拒绝策略
  21. reject(command); // 执行拒绝策略
  22. }
  23. }

5.2 shutdown

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. checkShutdownAccess(); // 检查权限
  6. advanceRunState(SHUTDOWN); // CAS 方式将线程池状态改为 SHUTDOWN
  7. interruptIdleWorkers(); // 中断空闲的线程
  8. onShutdown(); // 调用 shuntdown 的 hook 方法
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. tryTerminate(); // 进行后续的终止操作
  13. }
  14. private void interruptIdleWorkers() {
  15. interruptIdleWorkers(false); // 中断所有的闲置线程
  16. }
  17. private void interruptIdleWorkers(boolean onlyOne) {
  18. final ReentrantLock mainLock = this.mainLock;
  19. mainLock.lock();
  20. try {
  21. for (Worker w : workers) {
  22. Thread t = w.thread;
  23. if (!t.isInterrupted() && w.tryLock()) { // 获取锁成功,表明线程没有在执行任务,此时可以进行中断
  24. try {
  25. t.interrupt();
  26. } catch (SecurityException ignore) {
  27. } finally {
  28. w.unlock();
  29. }
  30. }
  31. if (onlyOne)
  32. break;
  33. }
  34. } finally {
  35. mainLock.unlock();
  36. }
  37. }

5.3 tryTerminate

将线程池设置为 TERMINATED 状态,调用 shutdown 方法或者减少 worker count 的时候该方法会被调用。

  • runWorker 当线程池获取任务失败时,跳出 while 循环,执行 processWorkerExit 方法,里面调用 tryTerminate() 方法,此时阻塞的任务会被中断。
  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. // 若线程池是 RUNNING 状态,那么不能 terminate
  5. // 或者线程池状态 >= TIDYING,也就是 TIDYING TERMINATED 其中之一,也不能 terminate
  6. // 或者线程池状态 == SHUTDOWN,并且工作队列不为空,也不能 terminate
  7. if (isRunning(c) ||
  8. runStateAtLeast(c, TIDYING) ||
  9. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  10. return;
  11. // 如果此时线程池中有工作线程
  12. if (workerCountOf(c) != 0) { // Eligible to terminate
  13. // 中断闲置的 worker,只中断一个 worker
  14. // 若此 worker 阻塞在 take 方法中,那么就可以响应中断,并把中断信息传递到外面,runWorker 里面有调用
  15. interruptIdleWorkers(ONLY_ONE); // ONLY_ONE = true
  16. return;
  17. }
  18. final ReentrantLock mainLock = this.mainLock;
  19. mainLock.lock();
  20. try {
  21. // 尝试将 runstate 设为 TIDYING
  22. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  23. try {
  24. // 设置成功,执行 terminated()
  25. terminated();
  26. } finally {
  27. // 将 runState 设为 TERMINATED
  28. ctl.set(ctlOf(TERMINATED, 0));
  29. termination.signalAll(); // 通知 awaitTermination 的线程
  30. }
  31. return;
  32. }
  33. } finally {
  34. mainLock.unlock();
  35. }
  36. // else retry on failed CAS
  37. }
  38. }

6. 内部类 Worker

  • firstTask,保存传入的第一个任务,若 firstTask != null,那么线程启动时就会执行这个任务,否则只是增加一个 worker(创建非核心线程)
  • Worker 继承 AQS,通过 lock 方法获取独占锁,表示线程正在运行;若线程没有获取独占锁(空闲状态),那么他就可以被回收。
  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable // 继承自 AQS,只用实现 tryAquire 和 tryRelease 就可以实现线程间的同步
  4. {
  5. /**
  6. * This class will never be serialized, but we provide a
  7. * serialVersionUID to suppress a javac warning.
  8. */
  9. private static final long serialVersionUID = 6138294804551838833L;
  10. /** Thread this worker is running in. Null if factory fails. */
  11. final Thread thread;
  12. // 若 firstTask 不为空,那么 thread 启动的时候就会执行这个任务;否则 thread 会去调用 getTask() 方法去获取执行的任务
  13. Runnable firstTask;
  14. /** Per-thread task counter */
  15. volatile long completedTasks;
  16. /**
  17. * Creates with given first task and thread from ThreadFactory.
  18. * @param firstTask the first task (null if none)
  19. */
  20. Worker(Runnable firstTask) {
  21. setState(-1); // 初始化的时候禁止中断响应
  22. this.firstTask = firstTask;
  23. this.thread = getThreadFactory().newThread(this); // 通过工厂方法新建线程
  24. }
  25. /** Delegates main run loop to outer runWorker */
  26. public void run() {
  27. runWorker(this); // 执行具体的任务逻辑
  28. }
  29. // Lock methods 加锁办法,熟悉 AQS 下面的办法应该会很熟悉
  30. //
  31. // The value 0 represents the unlocked state.
  32. // The value 1 represents the locked state.
  33. protected boolean isHeldExclusively() {
  34. return getState() != 0;
  35. }
  36. protected boolean tryAcquire(int unused) {
  37. if (compareAndSetState(0, 1)) { // 抢占式,直接设定 state
  38. setExclusiveOwnerThread(Thread.currentThread());
  39. return true;
  40. }
  41. return false;
  42. }
  43. protected boolean tryRelease(int unused) {
  44. setExclusiveOwnerThread(null);
  45. setState(0);
  46. return true;
  47. }
  48. public void lock() { acquire(1); }
  49. public boolean tryLock() { return tryAcquire(1); }
  50. public void unlock() { release(1); }
  51. public boolean isLocked() { return isHeldExclusively(); }
  52. void interruptIfStarted() {
  53. Thread t;
  54. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  55. try {
  56. t.interrupt();
  57. } catch (SecurityException ignore) {
  58. }
  59. }
  60. }
  61. }

addWorker

线程池的执行,主要是通过 addWorker 方法去增加工作线程,线程池根据 addWorker 方法的返回判断是否需要执行拒绝策略。

  1. // firstTask 可能为 null
  2. // core == ture 表示以 corePoolSize 为界限,工作线程不能超过这个值;core == false 表示以 maximumPoolSize 为界限,不能超过该值
  3. private boolean addWorker(Runnable firstTask, boolean core) {
  4. retry:
  5. for (;;) {
  6. int c = ctl.get();
  7. int rs = runStateOf(c);
  8. // 线程池处于 SHUTDOW and firstTask == null and ! workQueue.isEmpty() 情况下可以创建线程。若线程池处于 STOP,TIDYING,TERMINATED 情况下,不允许添加线程
  9. if (rs >= SHUTDOWN &&
  10. !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
  11. )
  12. return false;
  13. for (;;) {
  14. int wc = workerCountOf(c); // 线程池中正在运行的线程数
  15. if (wc >= CAPACITY || // 正在运行的线程超过 5 亿,或者如果是添加核心线程超过 corePoolSize ,添加非核心线程超过 maximumPoolSize,直接返回不能添加
  16. wc >= (core ? corePoolSize : maximumPoolSize))
  17. return false;
  18. if (compareAndIncrementWorkerCount(c)) // 以 CAS 方式将当前的工作线程数增加 1,跳出循环
  19. break retry;
  20. c = ctl.get(); // 存在并发重新读取
  21. if (runStateOf(c) != rs) // CAS 失败,线程池状态发生了变更(其它线程关闭线程池),重新循环
  22. continue retry;
  23. // else CAS failed due to workerCount change; retry inner loop
  24. }
  25. }
  26. // 经过上面循环 workerCount 已经增加了 1 个
  27. boolean workerStarted = false;
  28. boolean workerAdded = false;
  29. Worker w = null;
  30. try {
  31. // 将 Runnable 对象封装到 Worker 对象中,届时能够进行加锁执行某些方法
  32. w = new Worker(firstTask);
  33. final Thread t = w.thread;
  34. if (t != null) {
  35. final ReentrantLock mainLock = this.mainLock;
  36. mainLock.lock(); // 全局锁,持有这个锁期间,线程池不会关闭,关闭线程池需要该锁
  37. try {
  38. // 加锁之后再检查 state
  39. int rs = runStateOf(ctl.get());
  40. if (rs < SHUTDOWN ||
  41. (rs == SHUTDOWN && firstTask == null)) { // 如果是 running 状态,或者是 shutdown ,此时不接受新任务,但是可以处理队列中已有的任务
  42. if (t.isAlive()) // worker 里面的 thread,不应该是启动状态
  43. throw new IllegalThreadStateException();
  44. workers.add(w); // 添加到 Work<Set> 中
  45. int s = workers.size();
  46. if (s > largestPoolSize) // 维护一个全局达到过的最大线程数计数器
  47. largestPoolSize = s;
  48. workerAdded = true;
  49. }
  50. } finally {
  51. mainLock.unlock();
  52. }
  53. // worker 成功加入到 HashSet
  54. if (workerAdded) {
  55. t.start(); // 执行任务的 run 方法
  56. workerStarted = true; // 标记添加到队列里面的 worker 启动成功
  57. }
  58. }
  59. } finally {
  60. if (! workerStarted)
  61. addWorkerFailed(w); // 刚添加到队列的 worker 没有启动成功,需要移除刚添加到 HashSet 里面的 worker
  62. }
  63. // 返回线程是否启动成功
  64. return workerStarted;
  65. }

ThreadPoolExecutor 线程池源码分析 - 图5
addWorker 四种调用方式:

  • addWorker(command, true),线程数 < coreSize时,将 task 放入 workers,如果线程数 >= coreSize,返回 false;
  • addWorker(command, false),当阻塞对列已满,尝试将新的 task 放入 workers,如果线程数 >= maximumPoolSize,返回 false;
  • addWorker(null, false),放入一个空的 task 到 workers,此时线程数的限制是 maximumPoolSize,相当于创建一个新的线程,没立马分配任务;
  • addWorker(null, true)放入一个空的 task 到 workers,线程数限制是需要小于 coreSize,否则返回 false。实际的使用是在prestartCoreThread()等方法,有兴趣的读者可以自行阅读,在此不做详细赘述。

addWorkerFailed

  • 删除 workers 里面的 worker
  • workCount 减 1
  1. private void addWorkerFailed(Worker w) {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. if (w != null)
  6. workers.remove(w); // 移除 HashSet<Worker> 中的这个 worker
  7. decrementWorkerCount(); // worderCount 减 1
  8. tryTerminate(); // 尝试终止线程池
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. }

runWorker

addWorker 里面执行 start() 方法之后,线程开始执行 run() 逻辑,而 worker 里面的 run() 调用的是 runWorker(),下面来看具体的任务执行过程
ThreadPoolExecutor 线程池源码分析 - 图6

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. // 线程的第一个任务,可以不从队列中获取
  4. Runnable task = w.firstTask;
  5. w.firstTask = null;
  6. w.unlock(); // 释放锁,允许响应中断
  7. boolean completedAbruptly = true;
  8. try {
  9. // worker 的 firsttask 不为空或者从 workeQueue 中获取到任务
  10. while (task != null || (task = getTask()) != null) {
  11. w.lock();
  12. // 若线程池是非 RUNNING 状态,要保证该线程是中断状态
  13. if ((runStateAtLeast(ctl.get(), STOP) ||
  14. (Thread.interrupted() &&
  15. runStateAtLeast(ctl.get(), STOP))) &&
  16. !wt.isInterrupted())
  17. wt.interrupt();
  18. try {
  19. beforeExecute(wt, task); // wt 线程执行 task,做一些执行前的操作,WorkerPoolExecutor 继承 ThreadPoolExecutor,有对这个方法具体实现
  20. Throwable thrown = null;
  21. try {
  22. task.run(); // 执行具体的任务处理逻辑
  23. } catch (RuntimeException x) {
  24. thrown = x; throw x;
  25. } catch (Error x) {
  26. thrown = x; throw x;
  27. } catch (Throwable x) {
  28. thrown = x; throw new Error(x);
  29. } finally {
  30. afterExecute(task, thrown);
  31. }
  32. } finally {
  33. // 任务正常执行完成,此已完成任务置为null,进行下一次循环执行 getTask()
  34. task = null;
  35. w.completedTasks++;
  36. w.unlock();
  37. }
  38. }
  39. completedAbruptly = false; // 完成任务,没有被中断
  40. } finally {
  41. // 走到这里,说明上面循环结束
  42. // 1. getTask 返回 null
  43. // 2. 发生了异常
  44. processWorkerExit(w, completedAbruptly); // 回收线程
  45. }
  46. }

getTask

从任务队列中阻塞获取任务,并且会回收多余的 worker
ThreadPoolExecutor 线程池源码分析 - 图7

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // 1. rs == SHUTDOWN && workQueue.isEmpty
  7. // 2. rs >= STOP
  8. // 以上两种情况需要减少 工作线程数
  9. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  10. decrementWorkerCount();
  11. return null;
  12. }
  13. int wc = workerCountOf(c);
  14. // 若设置了核心线程数可超时 或者 此时工作线程数大于核心线程数
  15. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  16. if ((wc > maximumPoolSize || (timed && timedOut))
  17. && (wc > 1 || workQueue.isEmpty())) {
  18. // 减少 workCount
  19. if (compareAndDecrementWorkerCount(c))
  20. return null;
  21. continue;
  22. }
  23. try {
  24. // 超时获取或者阻塞获取任务
  25. Runnable r = timed ?
  26. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  27. workQueue.take();
  28. if (r != null)
  29. return r;
  30. // 没获取到任务,表示超时,继续 for 循环获取
  31. timedOut = true;
  32. } catch (InterruptedException retry) {
  33. // 获取任务过程中被中断
  34. timedOut = false;
  35. }
  36. }
  37. }

processWorkerExit

回收 worker

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
  3. decrementWorkerCount();
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. completedTaskCount += w.completedTasks; // 更新完成的任务数量
  8. workers.remove(w);
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. tryTerminate();
  13. int c = ctl.get();
  14. if (runStateLessThan(c, STOP)) { // 线程池处于 RUNNING 或者 SHUTDOWN 状态
  15. if (!completedAbruptly) {
  16. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  17. if (min == 0 && ! workQueue.isEmpty())
  18. min = 1;
  19. if (workerCountOf(c) >= min)
  20. return; // replacement not needed
  21. }
  22. addWorker(null, false);
  23. }
  24. }

8. 总结

Java线程池实现原理及其在美团业务中的实践