线程池核心组件图解

看源码之前,先了解一下该组件 最主要的几个 接口、抽象类和实现类的结构关系。

avatar

该组件中,Executor 和 ExecutorService 接口 定义了线程池最核心的几个方法,提交任务 submit ()、关闭线程池 shutdown()。抽象类 AbstractExecutorService 主要对公共行为 submit()系列方法进行了实现,这些 submit()方法 的实现使用了 模板方法模式,其中调用的 execute()方法 是未实现的 来自 Executor 接口 的方法。实现类 ThreadPoolExecutor 则对线程池进行了具体而复杂的实现。

另外还有一个常见的工具类 Executors,里面为开发者封装了一些可以直接拿来用的线程池。

源码赏析

话不多说,直接上源码。(这里只看最主要的代码部分)

Executor 和 ExecutorService 接口

  1. public interface Executor {
  2. /**
  3. * 在将来的某个时间执行给定的 Runnable。该 Runnable 可以在新线程、池线程或调用线程中执行。
  4. */
  5. void execute(Runnable command);
  6. }
  7. public interface ExecutorService extends Executor {
  8. /**
  9. * 优雅关闭,该关闭会继续执行完以前提交的任务,但不再接受新任务。
  10. */
  11. void shutdown();
  12. /**
  13. * 提交一个有返回值的任务,并返回该任务的 未来执行完成后的结果。
  14. * Future的 get()方法 将在成功完成后返回任务的结果。
  15. */
  16. <T> Future<T> submit(Callable<T> task);
  17. <T> Future<T> submit(Runnable task, T result);
  18. Future<?> submit(Runnable task);
  19. }

AbstractExecutorService 抽象类

  1. /**
  2. * 该抽象类最主要的内容就是,实现了 ExecutorService 中的 submit()系列方法
  3. */
  4. public abstract class AbstractExecutorService implements ExecutorService {
  5. /**
  6. * 提交任务 进行执行,返回获取未来结果的 Future对象。
  7. * 这里使用了 “模板方法模式”,execute()方法来自 Executor接口,该抽象类中并未进行实现,
  8. * 而是交由子类具体实现。
  9. */
  10. public Future<?> submit(Runnable task) {
  11. if (task == null) throw new NullPointerException();
  12. RunnableFuture<Void> ftask = newTaskFor(task, null);
  13. execute(ftask);
  14. return ftask;
  15. }
  16. public <T> Future<T> submit(Runnable task, T result) {
  17. if (task == null) throw new NullPointerException();
  18. RunnableFuture<T> ftask = newTaskFor(task, result);
  19. execute(ftask);
  20. return ftask;
  21. }
  22. public <T> Future<T> submit(Callable<T> task) {
  23. if (task == null) throw new NullPointerException();
  24. RunnableFuture<T> ftask = newTaskFor(task);
  25. execute(ftask);
  26. return ftask;
  27. }
  28. }

ThreadPoolExecutor

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. /**
  3. * **************
  4. * ** 主要属性 **
  5. * **************
  6. */
  7. /** 阻塞队列 */
  8. private final BlockingQueue<Runnable> workQueue;
  9. /** 用于创建线程的 线程工厂 */
  10. private volatile ThreadFactory threadFactory;
  11. /** 核心线程数 */
  12. private volatile int corePoolSize;
  13. /** 最大线程数 */
  14. private volatile int maximumPoolSize;
  15. /**
  16. * **************
  17. * ** 构造方法 **
  18. * **************
  19. */
  20. /** 最后都使用了最后一个构造方法的实现 */
  21. public ThreadPoolExecutor(int corePoolSize,
  22. int maximumPoolSize,
  23. long keepAliveTime,
  24. TimeUnit unit,
  25. BlockingQueue<Runnable> workQueue) {
  26. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  27. Executors.defaultThreadFactory(), defaultHandler);
  28. }
  29. public ThreadPoolExecutor(int corePoolSize,
  30. int maximumPoolSize,
  31. long keepAliveTime,
  32. TimeUnit unit,
  33. BlockingQueue<Runnable> workQueue,
  34. ThreadFactory threadFactory) {
  35. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  36. threadFactory, defaultHandler);
  37. }
  38. public ThreadPoolExecutor(int corePoolSize,
  39. int maximumPoolSize,
  40. long keepAliveTime,
  41. TimeUnit unit,
  42. BlockingQueue<Runnable> workQueue,
  43. RejectedExecutionHandler handler) {
  44. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  45. Executors.defaultThreadFactory(), handler);
  46. }
  47. public ThreadPoolExecutor(int corePoolSize,
  48. int maximumPoolSize,
  49. long keepAliveTime,
  50. TimeUnit unit,
  51. BlockingQueue<Runnable> workQueue,
  52. ThreadFactory threadFactory,
  53. RejectedExecutionHandler handler) {
  54. if (corePoolSize < 0 ||
  55. maximumPoolSize <= 0 ||
  56. maximumPoolSize < corePoolSize ||
  57. keepAliveTime < 0)
  58. throw new IllegalArgumentException();
  59. if (workQueue == null || threadFactory == null || handler == null)
  60. throw new NullPointerException();
  61. this.corePoolSize = corePoolSize;
  62. this.maximumPoolSize = maximumPoolSize;
  63. this.workQueue = workQueue;
  64. this.keepAliveTime = unit.toNanos(keepAliveTime);
  65. this.threadFactory = threadFactory;
  66. this.handler = handler;
  67. }
  68. /**
  69. * **************
  70. * ** 主要实现 **
  71. * **************
  72. */
  73. /** 执行 Runnable任务 */
  74. public void execute(Runnable command) {
  75. if (command == null)
  76. throw new NullPointerException();
  77. /*
  78. * 分三步进行:
  79. *
  80. * 1、如果运行的线程少于 corePoolSize,尝试开启一个新的线程;否则尝试进入工作队列
  81. *
  82. * 2. 如果工作队列没满,则进入工作队列;否则 判断是否超出最大线程数
  83. *
  84. * 3. 如果未超出最大线程数,则尝试开启一个新的线程;否则 按饱和策略处理无法执行的任务
  85. */
  86. int c = ctl.get();
  87. if (workerCountOf(c) < corePoolSize) {
  88. if (addWorker(command, true))
  89. return;
  90. c = ctl.get();
  91. }
  92. if (isRunning(c) && workQueue.offer(command)) {
  93. int recheck = ctl.get();
  94. if (! isRunning(recheck) && remove(command))
  95. reject(command);
  96. else if (workerCountOf(recheck) == 0)
  97. addWorker(null, false);
  98. }
  99. else if (!addWorker(command, false))
  100. reject(command);
  101. }
  102. /**
  103. * 优雅关闭,在其中执行以前提交的任务,但不接受新任务。如果已关闭,则调用没有其他效果。
  104. */
  105. public void shutdown() {
  106. final ReentrantLock mainLock = this.mainLock;
  107. mainLock.lock();
  108. try {
  109. checkShutdownAccess();
  110. advanceRunState(SHUTDOWN);
  111. interruptIdleWorkers();
  112. onShutdown(); // hook for ScheduledThreadPoolExecutor
  113. } finally {
  114. mainLock.unlock();
  115. }
  116. tryTerminate();
  117. }
  118. }

ThreadPoolExecutor 中的 execute()方法 执行 Runnable 任务 的流程逻辑可以用下图表示。

avatar

工具类 Executors

看类名也知道,它最主要的作用就是提供 static 的工具方法,为开发者提供各种封装好的 具有各自特性的线程池。

  1. public class Executors {
  2. /**
  3. * 创建一个固定线程数量的线程池
  4. */
  5. public static ExecutorService newFixedThreadPool(int nThreads) {
  6. return new ThreadPoolExecutor(nThreads, nThreads,
  7. 0L, TimeUnit.MILLISECONDS,
  8. new LinkedBlockingQueue<Runnable>());
  9. }
  10. /**
  11. * 创建一个单线程的线程池
  12. */
  13. public static ExecutorService newSingleThreadExecutor() {
  14. return new FinalizableDelegatedExecutorService
  15. (new ThreadPoolExecutor(1, 1,
  16. 0L, TimeUnit.MILLISECONDS,
  17. new LinkedBlockingQueue<Runnable>()));
  18. }
  19. /**
  20. * 创建一个缓存的,可动态伸缩的线程池。
  21. * 可以看出来:核心线程数为0,最大线程数为Integer.MAX_VALUE,如果任务数在某一瞬间暴涨,
  22. * 这个线程池很可能会把 服务器撑爆。
  23. * 另外需要注意的是,它们底层都是使用了 ThreadPoolExecutor,只不过帮我们配好了参数
  24. */
  25. public static ExecutorService newCachedThreadPool() {
  26. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  27. 60L, TimeUnit.SECONDS,
  28. new SynchronousQueue<Runnable>());
  29. }
  30. }