线程池的实现原理

调用方不断地向线程池中提交任务;线程池中有一组线程,不断地 从队列中取任务,这是一个典型的生产者—消费者模型

线程池与Future - 图1

需要有阻塞队列

线程池的继承体系

image.png

在这里,有两个核心的类: ThreadPoolExector 和 ScheduledThreadPoolExecutor ,后者不仅 可以执行某个任务,还可以周期性地执行任务。 向线程池中提交的每个任务,都必须实现 Runnable 接口,通过最上面的 Executor 接口中的 execute(Runnable command) 向线程池提交任务。 然后,在 ExecutorService 中,定义了线程池的关闭接口 shutdown() ,还定义了可以有返回值 的任务,也就是 Callable

ThreadPoolExecutor

核心数据结构

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. // 存放任务的阻塞队列
  4. private final BlockingQueue<Runnable> workQueue;
  5. // 对线程池内部各种变量进行互斥访问控制
  6. private final ReentrantLock mainLock = new ReentrantLock();
  7. // 线程集合
  8. private final HashSet<Worker> workers = new HashSet<Worker>();
  9. // 每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下
  10. private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
  11. // Worker封装的线程
  12. final Thread thread;
  13. // Worker接收到的第1个任务
  14. Runnable firstTask;
  15. // Worker执行完毕的任务个数
  16. volatile long completedTasks;
  17. Worker(Runnable firstTask) {
  18. setState(-1); // inhibit interrupts until runWorker
  19. this.firstTask = firstTask;
  20. this.thread = getThreadFactory().newThread(this);
  21. }
  22. }
  23. }

线程池的生命周期

  • running -1
  • shutdown 0
  • stop 1
  • tidying 2
  • terminated 3
  • 顺序不可逆

线程池与Future - 图3

线程池的核心配置参数

  • ThreadPoolExecutor
    1. public ThreadPoolExecutor(int corePoolSize,
    2. int maximumPoolSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue<Runnable> workQueue,
    6. ThreadFactory threadFactory,
    7. RejectedExecutionHandler handler) {
    8. if (corePoolSize < 0 ||
    9. maximumPoolSize <= 0 ||
    10. maximumPoolSize < corePoolSize ||
    11. keepAliveTime < 0)
    12. throw new IllegalArgumentException();
    13. if (workQueue == null || threadFactory == null || handler == null)
    14. throw new NullPointerException();
    15. this.acc = System.getSecurityManager() == null ?
    16. null :
    17. AccessController.getContext();
    18. this.corePoolSize = corePoolSize;
    19. this.maximumPoolSize = maximumPoolSize;
    20. this.workQueue = workQueue;
    21. this.keepAliveTime = unit.toNanos(keepAliveTime);
    22. this.threadFactory = threadFactory;
    23. this.handler = handler;
    24. }
    | 参数名 | 含义 | | —- | —- | | corePoolSize | 核心线程数 | | maximumPoolSize | 最大线程数 | | keepAliveTime + unit | 空闲线程的存活时间 | | workQueue | 用于存放任务的队列 | | threadFactory | 用来创建线程 支持自定义 默认: Executors.defaultThreadFactory() | | handler | corePoolSize已满,队列已满,maxPoolSize 已满,最后的拒
    绝策略。 |

任务的提交过程分析

线程池与Future - 图4

线程优雅的关闭

线程池的关闭,较之线程的关闭更加复杂。当关闭一个线程池的时候,有的线程还正在执行某个任 务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是 瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。

  • 线程池有两个关闭方法,shutdown()和shutdownNow(),这两个方法会让线程池切换到不同的状

态。在队列为空,线程池也为空之后,进入TIDYING 状态;最后执行一个钩子方法terminated(),进入
TERMINATED状态,线程池才真正关闭。

  • 这里的状态迁移有一个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大

的状态值迁移,不会逆向迁移。例如,当线程池的状态在TIDYING=2时,接下来只可能迁移到
TERMINATED=3,不可能迁移回STOP=1或者其他状态。

  • 除 terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现

自己的线程池,可以重写这几个方法:

  1. protected void beforeExecute(Thread t, Runnable r) { }
  2. protected void afterExecute(Runnable r, Throwable t) { }
  3. protected void terminated() { }

如何正确关闭线程池

  1. // executor.shutdownNow();
  2. executor.shutdown();
  3. try {
  4. boolean flag = true;
  5. do {
  6. flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS);
  7. } while (flag);
  8. } catch (InterruptedException e) {
  9. // ...
  10. }
  • awaitTermination(…)方法的内部实现很简单,如下所示。不断循环判断线程池是否到达了最终状态 TERMINATED,如果是,就返回;如果不是,则通过termination条件变量阻塞一段时间,之后继续判 断。
  • ThreadPoolExecutor
    1. public boolean awaitTermination(long timeout, TimeUnit unit)
    2. throws InterruptedException {
    3. long nanos = unit.toNanos(timeout);
    4. final ReentrantLock mainLock = this.mainLock;
    5. mainLock.lock();
    6. try {
    7. for (;;) {
    8. // 判断线程是否是 terminated 状态
    9. if (runStateAtLeast(ctl.get(), TERMINATED))
    10. return true;
    11. if (nanos <= 0)
    12. return false;
    13. nanos = termination.awaitNanos(nanos);
    14. }
    15. } finally {
    16. mainLock.unlock();
    17. }
    18. }

    shutdown()与shutdownNow()的区别

  1. shutdown()不会清空任务队列,会等所有任务执行完成,shutdownNow()清空任务队列。
  2. shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程

    shutdown()与任务执行过程综合分析

    把任务的执行过程和上面的线程池的关闭过程结合起来进行分析,当调用 shutdown()的时候,可能
    出现以下几种场景:

  3. 当调用shutdown()的时候,所有线程都处于空闲状态。
    这意味着任务队列一定是空的。此时,所有线程都会阻塞在 getTask()方法的地方。然后,所
    有线程都会收到interruptIdleWorkers()发来的中断信号,getTask()返回null,所有Worker都
    会退出while循环,之后执行processWorkerExit。

  4. 当调用shutdown()的时候,所有线程都处于忙碌状态。
    此时,队列可能是空的,也可能是非空的。interruptIdleWorkers()内部的tryLock调用失败,
    什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,
    直到队列为空,getTask()才会返回null。之后,就和场景1一样了,退出while循环。
  5. 当调用shutdown()的时候,部分线程忙碌,部分线程空闲。
    有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在 getTask()方法的地方。空闲的
    这些线程会和场景1一样处理,不空闲的线程会和场景2一样处理。

    shutdownNow() 与任务执行过程综合分析

  • 和上面的 shutdown()类似,只是多了一个环节,即清空任务队列。如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。因此,中断空闲线程和中断所有 线程的区别并不是很大,除非线程当前刚好阻塞在某个地方。