原文链接:https://www.jianshu.com/p/4de5dca3b187

涉及的主要方法

  • void shutdown();
  • List<Runnable> shutdownNow();
  • boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    线程池状态

    1. /*
    2. * The runState provides the main lifecycle control, taking on values:
    3. *
    4. * RUNNING: Accept new tasks and process queued tasks
    5. * SHUTDOWN: Don't accept new tasks, but process queued tasks
    6. * STOP: Don't accept new tasks, don't process queued tasks,
    7. * and interrupt in-progress tasks
    8. * TIDYING: All tasks have terminated, workerCount is zero,
    9. * the thread transitioning to state TIDYING
    10. * will run the terminated() hook method
    11. * TERMINATED: terminated() has completed
    12. *
    13. * The numerical order among these values matters, to allow
    14. * ordered comparisons. The runState monotonically increases over
    15. * time, but need not hit each state. The transitions are:
    16. *
    17. * RUNNING -> SHUTDOWN
    18. * On invocation of shutdown(), perhaps implicitly in finalize()
    19. * (RUNNING or SHUTDOWN) -> STOP
    20. * On invocation of shutdownNow()
    21. * SHUTDOWN -> TIDYING
    22. * When both queue and pool are empty
    23. * STOP -> TIDYING
    24. * When pool is empty
    25. * TIDYING -> TERMINATED
    26. * When the terminated() hook method has completed
    27. *
    28. * Threads waiting in awaitTermination() will return when the
    29. * state reaches TERMINATED.
    30. *
    31. * Detecting the transition from SHUTDOWN to TIDYING is less
    32. * straightforward than you'd like because the queue may become
    33. * empty after non-empty and vice versa during SHUTDOWN state, but
    34. * we can only terminate if, after seeing that it is empty, we see
    35. * that workerCount is 0 (which sometimes entails a recheck -- see
    36. * below).
    37. */

    void shutdown()

注释:

  1. /**
  2. * Initiates an orderly shutdown in which previously submitted
  3. * tasks are executed, but no new tasks will be accepted.
  4. * Invocation has no additional effect if already shut down.
  5. *
  6. * <p>This method does not wait for previously submitted tasks to
  7. * complete execution. Use {@link #awaitTermination awaitTermination}
  8. * to do that.
  9. *
  10. * @throws SecurityException if a security manager exists and
  11. * shutting down this ExecutorService may manipulate
  12. * threads that the caller is not permitted to modify
  13. * because it does not hold {@link
  14. * java.lang.RuntimePermission}{@code ("modifyThread")},
  15. * or the security manager's {@code checkAccess} method
  16. * denies access.
  17. */
  18. void shutdown();

该方法会停止ExecutorService添加新的任务, 但是老任务还是会继续执行.

This method does not wait for previously submitted tasks to * complete execution.

这句话指的是该方法会立即返回, 但不一定代表之前提交的任务已经全部完成了. 如果需要一个阻塞的方法, 可以调用awaitTermination方法.

该方法内部实现是设置了状态, 并interrupt了所有的空闲线程, 使其不再接受新的任务.

List<Runnable> shutdownNow()

注释:

  1. /**
  2. * Attempts to stop all actively executing tasks, halts the
  3. * processing of waiting tasks, and returns a list of the tasks
  4. * that were awaiting execution.
  5. *
  6. * <p>This method does not wait for actively executing tasks to
  7. * terminate. Use {@link #awaitTermination awaitTermination} to
  8. * do that.
  9. *
  10. * <p>There are no guarantees beyond best-effort attempts to stop
  11. * processing actively executing tasks. For example, typical
  12. * implementations will cancel via {@link Thread#interrupt}, so any
  13. * task that fails to respond to interrupts may never terminate.
  14. *
  15. * @return list of tasks that never commenced execution
  16. * @throws SecurityException if a security manager exists and
  17. * shutting down this ExecutorService may manipulate
  18. * threads that the caller is not permitted to modify
  19. * because it does not hold {@link
  20. * java.lang.RuntimePermission}{@code ("modifyThread")},
  21. * or the security manager's {@code checkAccess} method
  22. * denies access.
  23. */
  24. List<Runnable> shutdownNow();

该方法尝试停止所有正在执行的任务, 停止对正在等待执行的任务的处理, 并且返回正在等待执行的任务.

shutdown(), 该方法也是立刻返回的, 不会等到所有任务终止以后才返回.

因为终止是通过interrupt实现的, 所以如果那个任务没有对interrupt做出正确响应, 那么该方法将无法终止该任务. 所以传进去的任务需要对interrup做出合适的响应.

boolean awaitTermination(long timeout, TimeUnit unit)

注释:

  1. /**
  2. * Blocks until all tasks have completed execution after a shutdown
  3. * request, or the timeout occurs, or the current thread is
  4. * interrupted, whichever happens first.
  5. *
  6. * @param timeout the maximum time to wait
  7. * @param unit the time unit of the timeout argument
  8. * @return {@code true} if this executor terminated and
  9. * {@code false} if the timeout elapsed before termination
  10. * @throws InterruptedException if interrupted while waiting
  11. */
  12. boolean awaitTermination(long timeout, TimeUnit unit)
  13. throws InterruptedException;

该方法是阻塞的, 阻塞到所有任务都完成(必须在shutdown调用之后)或者超时. 如果executor在超时之前终止了, 那么返回true, 否则返回false.

注意, 如果不在awaitTermination前调用shutdown, 则即使在超时之前所有任务都已经完成, awaitTermination仍然会等待着, 而且最后一定返回false, 因为没有shutdown的调用不会使executor的状态变为terminated.
例子:

  1. public static void testAwaitTerminationWithoutShutdown(){
  2. Runnable runnable = () -> {
  3. System.out.println("I'm a very quick task");
  4. };
  5. executorService.submit(runnable);
  6. try {
  7. executorService.awaitTermination(3000, TimeUnit.MILLISECONDS);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }

上面这段代码将会阻塞3000毫秒, 并且最终返回true, 即使仅有的任务一瞬间就完成了, 因为没有对shutdown的调用, 所以executorService的状态不可能会变成terminated.

实例

shutdown后再尝试添加任务:

  1. public static void testShutdown(){
  2. Runnable runnable = () -> {
  3. try {
  4. System.out.println("going to sleep for 1s");
  5. Thread.sleep(1000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. };
  10. scheduledExecutorService.submit(runnable);
  11. scheduledExecutorService.shutdown();
  12. try {
  13. scheduledExecutorService.submit(runnable);
  14. } catch (RejectedExecutionException e){
  15. System.out.println("cannot add task after shutdown");
  16. }
  17. }

输出(输出顺序不一定一致):

  1. cannot add task after shutdown
  2. going to sleep for 1s

从输出可以到确实有RejectedExecutionException被抛出了, 另外从这次输出也可以看出shutdown确实立马就返回了.

shutdownNow()关闭成功的例子:

  1. public static void shutdownNowNormally() throws InterruptedException {
  2. Runnable task = () -> {
  3. try {
  4. System.out.println(String.format("now is %s, I'm going to sleep for 10s", getCurrentTime()));
  5. Thread.sleep(10000);
  6. } catch (InterruptedException e) {
  7. System.out.println(String.format("someone asked me to terminate at: %s", getCurrentTime()));
  8. }
  9. };
  10. scheduledExecutorService.submit(task);
  11. Thread.sleep(1000);
  12. scheduledExecutorService.shutdownNow();
  13. }

输出:

  1. Now is 13:47:30, I'm going to sleep for 10s
  2. someone asked me to terminate at: 13:47:31

shutdownNow()不成功的例子:
因为shutdownNow()最终是通过interrupt来打断工作线程, 如果任务没有对interrupt做出反应, 那么shutdownNow()将无法正常terminate.

  1. public static void shutdownNowNotWorking(){
  2. Runnable task = () ->{
  3. while (true){
  4. try {
  5. System.out.println("I'm gonna sleep for 1s");
  6. Thread.sleep(1000);
  7. } catch (InterruptedException e) {
  8. System.out.println(String.format("I'll ignore this InterruptedException. Now is : %s", getCurrentTime()));
  9. }
  10. }
  11. };
  12. scheduledExecutorService.submit(task);
  13. scheduledExecutorService.shutdownNow();
  14. }

输出:

  1. I'm gonna sleep for 1s
  2. I'll ignore this InterruptedException. Now is : 13:53:12
  3. I'm gonna sleep for 1s
  4. I'm gonna sleep for 1s
  5. I'm gonna sleep for 1s
  6. I'm gonna sleep for 1s

void shutdown()源码

ThreadPoolExecutor中的实现:
设置状态并interrupt全部空闲的工作线程(即不让其再继续从任务队列中获取任务). 但是之前提交的任务还会被执行.

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. checkShutdownAccess();
  6. advanceRunState(SHUTDOWN);
  7. interruptIdleWorkers();
  8. onShutdown();
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. tryTerminate();
  13. }

interruptIdleWorkers方法:

  1. private void interruptIdleWorkers(boolean onlyOne) {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers) {
  6. Thread t = w.thread;
  7. if (!t.isInterrupted() && w.tryLock()) {
  8. try {
  9. t.interrupt();
  10. } catch (SecurityException ignore) {
  11. } finally {
  12. w.unlock();
  13. }
  14. }
  15. if (onlyOne)
  16. break;
  17. }
  18. } finally {
  19. mainLock.unlock();
  20. }
  21. }

List<Runnable> shutdownNow()源码

ThreadPoolExecutor中的源码:

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess();
  7. advanceRunState(STOP);
  8. interruptWorkers();
  9. tasks = drainQueue();
  10. } finally {
  11. mainLock.unlock();
  12. }
  13. tryTerminate();
  14. return tasks;
  15. }

其中interruptWorkers方法:

  1. private void interruptWorkers() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers)
  6. w.interruptIfStarted();
  7. } finally {
  8. mainLock.unlock();
  9. }
  10. }

可以看到即使这个工作线程已经拿到任务在执行中, 也会被interrupt, 这种情况需要我们的任务对
interrupt做出响应, 否则就会导致shutdownNow也无法终止executorService.

runWoker()源码

idle worker指的就是正在执行while (task != null || (task = getTask()) != null)这个while条件的worker, 即还未成功取到task的任务.

interruptIdleWorkers()方法就是针对这个状态的woker, 如果getTask()返回值是null, 那么该woker线程就会结束了. 从getTask()源码中可以看到, 如果shutdown的时候, wokerQueue(BlockingQueue)的poll()或者take()方法能够响应interrupt(), 从而导致getTask()会继续下一次循环, 从而能够检查到shutdown状态, 从而直接返回null, 进而使woker退出. 所以shutdown不会对已经进入while body的woker线程起作用.

shutdown仅仅调用了一次interruptIdleWorkers(), 所以那些idlewokers被直接结束了, 但是剩下的仍然在工作的workers不会受到影响, 如果任务队列中仍然有剩余的任务, 那么这些woker仍然能够取出并且完成 (因为shutdown()方法仅仅将状态改成了SHUTDOWN).

shutdownNow()中设置状态为STOP, 并调用了interruptWorkers()方法. 所以即使worker已经执行到task.run(), 如果我们传进去的任务的run方法有对interrupt做出合适响应, 那么依然可以被停止, 否则shutdownNow()也无法终止. 另外结合getTask(), 可以知道即使已经缓存在任务队列中的任务也不会被执行了 (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())).

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock();
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {
  9. w.lock();
  10. if ((runStateAtLeast(ctl.get(), STOP) ||
  11. (Thread.interrupted() &&
  12. runStateAtLeast(ctl.get(), STOP))) &&
  13. !wt.isInterrupted())
  14. wt.interrupt();
  15. try {
  16. beforeExecute(wt, task);
  17. Throwable thrown = null;
  18. try {
  19. task.run();
  20. } catch (RuntimeException x) {
  21. thrown = x; throw x;
  22. } catch (Error x) {
  23. thrown = x; throw x;
  24. } catch (Throwable x) {
  25. thrown = x; throw new Error(x);
  26. } finally {
  27. afterExecute(task, thrown);
  28. }
  29. } finally {
  30. task = null;
  31. w.completedTasks++;
  32. w.unlock();
  33. }
  34. }
  35. completedAbruptly = false;
  36. } finally {
  37. processWorkerExit(w, completedAbruptly);
  38. }
  39. }

其中getTask()方法:

  1. private Runnable getTask() {
  2. boolean timedOut = false;
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  7. decrementWorkerCount();
  8. return null;
  9. }
  10. int wc = workerCountOf(c);
  11. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  12. if ((wc > maximumPoolSize || (timed && timedOut))
  13. && (wc > 1 || workQueue.isEmpty())) {
  14. if (compareAndDecrementWorkerCount(c))
  15. return null;
  16. continue;
  17. }
  18. try {
  19. Runnable r = timed ?
  20. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  21. workQueue.take();
  22. if (r != null)
  23. return r;
  24. timedOut = true;
  25. } catch (InterruptedException retry) {
  26. timedOut = false;
  27. }
  28. }
  29. }

其中processWorkerExit方法:
注意processWorkerExit方法会调用tryTerminate()方法. 所以每次有一个woker结束的时候, 都会尝试termiante, 所以仅仅调用shutdown也可以使得在全部任务完成以后terminate.

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. if (completedAbruptly)
  3. decrementWorkerCount();
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. completedTaskCount += w.completedTasks;
  8. workers.remove(w);
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. tryTerminate();
  13. int c = ctl.get();
  14. if (runStateLessThan(c, STOP)) {
  15. if (!completedAbruptly) {
  16. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  17. if (min == 0 && ! workQueue.isEmpty())
  18. min = 1;
  19. if (workerCountOf(c) >= min)
  20. return;
  21. }
  22. addWorker(null, false);
  23. }
  24. }

tryTerminate()源码:

  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. if (isRunning(c) ||
  5. runStateAtLeast(c, TIDYING) ||
  6. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  7. return;
  8. if (workerCountOf(c) != 0) {
  9. interruptIdleWorkers(ONLY_ONE);
  10. return;
  11. }
  12. final ReentrantLock mainLock = this.mainLock;
  13. mainLock.lock();
  14. try {
  15. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  16. try {
  17. terminated();
  18. } finally {
  19. ctl.set(ctlOf(TERMINATED, 0));
  20. termination.signalAll();
  21. }
  22. return;
  23. }
  24. } finally {
  25. mainLock.unlock();
  26. }
  27. }
  28. }

一个关键的地方在于interruptIdleWorkers(ONLY_ONE);, 下面是关于这个参数的解释:

如果这个参数是true的话, 那么一次最多interrupt一个空闲的worker. 因为每一个worker在退出的时候都会调用processWorkerExit方法, 而且processWorkerExit方法中也会继续调用tryTerminate()方法, 所以注释里面的propagate就能解释得通了. in case all threads are currently waiting, 这里还不是很理解, 这里是说避免所有线程都在那时刻等待的情况, 但是这样做的目的还是不很清楚.

总结

要让ExecutorService能够被正常关闭, 需要任务本身对interrupted这个状态做出反应, 否则可能无法正常关闭ExecutorService.