当关闭一个线程池时,有的工作线程还正在执行任务,有的调用者正在向线程池提交任务,并且工作队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是一个平滑过渡的过程。

关闭线程池的正确姿势

  1. threadPoolExecutor.shutdown();
  2. //threadPoolExecutor.shutdownNow();
  3. try {
  4. boolean loop = true;
  5. do {
  6. loop = !threadPoolExecutor.awaitTermination(2, TimeUnit.SECONDS);
  7. } while (loop);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  • shutdown()会中断空闲工作线程,不会中断正在执行任务的工作线程,也不会清空工作队列,会等待所有已提交的任务执行完,但是拒绝新提交的任务。
  • shutdownNow(),会中断所有工作线程,并清空工作队列,拒绝新提交的任务。
  • 关闭线程池,只调用shutdown()或者shutdownNow()是不够的,因为线程池并不一定立刻终止,还需要调用awaitTermination(),循环检查runState是否到了最终状态TERMINATED。
  • 当我们使用shuwdown()方法关闭线程池时,一定要确保任务里不会有永久阻塞等待的逻辑,否则线程池就关闭不了。

Tomcat shutdown 线程池无法关闭

场景

Tomcat下面部署application,application创建了线程池。在执行tomcat的shutdown之后(不是kill -9命令),线程池没有关闭,导致资源浪费等问题

原因

有2种情况,一种是用户线程和守护线程;还一种是线程池没有响应容器关闭的信号。

自定义的ThreadPoolExecutor或者通过Executors创建的线程池并不会去响应容器关闭或者Tomcat关闭的信号,所以shutdown Tomcat时,线程池没有关闭。如果是通过Spring的ThreadPoolTaskExecutor创建的线程池,是可以响应销毁容器的信号,因为ThreadPoolTaskExecutor继承了ExecutorConfigurationSupport,ExecutorConfigurationSupport实现了DisposableBean接口。当销毁Bean的时候,会自动调用BeanDisposableBean的destroy方法。所以这种方式创建的线程池可以正常关闭。

源码解读

shutdown()将 runState 流转为 SHUTDOWN

shutdown()主要做了 4 步:

  1. 检查是否有中断线程池的权限。
  2. 自旋CAS设置runStateSHUTDOWN
  3. 中断空闲线程
  4. 尝试终止线程池。

    1. public void shutdown() {
    2. final ReentrantLock mainLock = this.mainLock;
    3. mainLock.lock();
    4. try {
    5. //1.检查是否有Shutdown权限
    6. checkShutdownAccess();
    7. //2.cas设置runState为SHUTDOWN
    8. advanceRunState(SHUTDOWN);
    9. //3.中断空闲线程
    10. interruptIdleWorkers();
    11. //空的钩子函数
    12. onShutdown(); // hook for ScheduledThreadPoolExecutor
    13. } finally {
    14. mainLock.unlock();
    15. }
    16. //4.尝试终止线程池
    17. tryTerminate();
    18. }

    shutdown是如何只中断空闲线程?为什么要调用tryTerminate()

    shutdown 是如何只中断空闲线程的?

    从源码中可以看出在中断线程前会尝试获取Worker的锁,如果获得锁,说明当前的Worker是空闲的,可以中断,这也验证了 Worker 执行任务代码时加锁,确保除了线程池销毁导致中断外,没有其他中断的设置。

    1. private void interruptIdleWorkers(boolean onlyOne) {
    2. final ReentrantLock mainLock = this.mainLock;
    3. mainLock.lock();
    4. try {
    5. for (Worker w : workers) {
    6. //循环中断空闲worder
    7. Thread t = w.thread;
    8. //w.tryLock()尝试获取worker的锁,如果获得锁说明当前工作线程是空闲的
    9. if (!t.isInterrupted() && w.tryLock()) {
    10. try {
    11. t.interrupt();
    12. } catch (SecurityException ignore) {
    13. } finally {
    14. w.unlock();
    15. }
    16. }
    17. if (onlyOne)
    18. break;
    19. }
    20. } finally {
    21. mainLock.unlock();
    22. }
    23. }

    为什么要调用 tryTerminate()?

    tryTerminate()不会强行终止线程池,当workerCount0workerQueue为空时:

  5. 状态流转到TIDYING

  6. 然后调用钩子函数terminated()
  7. 状态从TIDYING 流转到TERMINATED
  8. 调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。
    1. final void tryTerminate() {
    2. for (;;) {
    3. int c = ctl.get();
    4. if (isRunning(c) ||
    5. runStateAtLeast(c, TIDYING) ||
    6. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
    7. return;
    8. if (workerCountOf(c) != 0) { // Eligible to terminate
    9. interruptIdleWorkers(ONLY_ONE);
    10. return;
    11. }
    12. //1.当workQueue为空,workerCount为空时,cas流转状态为TIDYING
    13. //2.并调用了一个空钩子函数terminated
    14. //3.最终将状态流转为TERMINATED,并通知awaitTermination
    15. final ReentrantLock mainLock = this.mainLock;
    16. mainLock.lock();
    17. try {
    18. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
    19. try {
    20. terminated();
    21. } finally {
    22. ctl.set(ctlOf(TERMINATED, 0));
    23. termination.signalAll();
    24. }
    25. return;
    26. }
    27. } finally {
    28. mainLock.unlock();
    29. }
    30. // else retry on failed CAS 自旋
    31. }
    32. }

    termination.signalAll()唤醒的是什么?

    awaitTermination()逻辑很简单,就是重复判断runState是否到达最终状态 TERMINATED,如果是直接返回 true,如果不是,调用termination.awaitNanos(nanos)阻塞一段时间,苏醒后再判断一次,如果runStateTERMINATED返回 true,否则返回 false。 ```java /**
    • Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition();

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { ////判断当前的runstate是否大于等于TERMINATED if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; //如果不是TERMINATED,将等待nanos nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }

  1. <a name="T1h0H"></a>
  2. ### shutdownNow()将 runState 流转为 STOP
  3. shutdownNow(),主要做了 5 步:
  4. 1. 检查是否有中断线程池权限。
  5. 1. 设置`runState`为STOP。
  6. 1. **中断所有工作线程。**
  7. 1. **清空工作队列**。
  8. 1. 尝试终止线程池。
  9. ```java
  10. public List<Runnable> shutdownNow() {
  11. List<Runnable> tasks;
  12. final ReentrantLock mainLock = this.mainLock;
  13. mainLock.lock();
  14. try {
  15. //1.检查是否有shutdown权限
  16. checkShutdownAccess();
  17. //2.设置runState为stop
  18. advanceRunState(STOP);
  19. //3.给worders发送终止信号
  20. interruptWorkers();
  21. //4.清空阻塞队列
  22. tasks = drainQueue();
  23. } finally {
  24. mainLock.unlock();
  25. }
  26. //5.尝试终止线程池
  27. tryTerminate();
  28. return tasks;
  29. }
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    //删除所有元素并加入到taskList
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        //如果q此时又被加入了任务再将任务删除并加入taskList
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

shutdown()和 shutdownNow()的区别

shutdownNow()和shutdown()不同之处在于:

  • shutdownNow()会终止所有工作线程,不管是空闲还是正在运行。
  • shutdownNow()会清空阻塞队列。