execute 方法

execute 方法public void execute(Runnable command),的作用是提交任务 command 到线程池进行执行。用户线程提交任务到线程池的模型图如图 8-2 所示。

源码分析 - 图1

从该图可以看出,ThreadPoolExecutor 的实现实际是一个生产消费模型,当用户添加任务到线程池时相当于生产者生产元素,workers 线程工作集中的线程直接执行任务或者从任务队列里面获取任务时则相当于消费者消费元素。

用户线程提交任务的 execute 方法的具体代码如下。

  1. public void executeRunnable command {
  2. //(1) 如果任务为 null,则抛出 NPE 异常
  3. if command == null
  4. throw new NullPointerException();
  5. //(2)获取当前线程池的状态 + 线程个数变量的组合值
  6. int c = ctl.get();
  7. //(3)当前线程池中线程个数是否小于 corePoolSize,小于则开启新线程运行
  8. if workerCountOf(c) < corePoolSize {
  9. if addWorker(command, true))
  10. return
  11. c = ctl.get();
  12. }
  13. //(4)如果线程池处于 RUNNING 状态,则添加任务到阻塞队列
  14. if isRunning(c) && workQueue.offer(command)) {
  15. //(4.1)二次检查
  16. int recheck = ctl.get();
  17. //(4.2)如果当前线程池状态不是 RUNNING 则从队列中删除任务,并执行拒绝策略
  18. if (! isRunning(recheck) && remove(command))
  19. rejectcommand);
  20. //(4.3)否则如果当前线程池为空,则添加一个线程
  21. else if workerCountOf(recheck) == 0
  22. addWorkernull, false);
  23. }
  24. //(5)如果队列满,则新增线程,新增失败则执行拒绝策略
  25. else if (! addWorker(command, false))
  26. rejectcommand);
  27. }

代码(3)判断如果当前线程池中线程个数小于 corePoolSize,会向 workers 里面新增一个核心线程(core 线程)执行该任务。

如果当前线程池中线程个数大于等于 corePoolSize 则执行代码(4)。如果当前线程池处于 RUNNING 状态则添加当前任务到任务队列。这里需要判断线程池状态是因为有可能线程池已经处于非 RUNNING 状态,而在非 RUNNING 状态下是要抛弃新任务的。

如果向任务队列添加任务成功,则代码(4.2)对线程池状态进行二次校验,这是因为添加任务到任务队列后,执行代码(4.2)前有可能线程池的状态已经变化了。这里进行二次校验,如果当前线程池状态不是 RUNNING 了则把任务从任务队列移除,移除后执行拒绝策略;如果二次校验通过,则执行代码(4.3)重新判断当前线程池里面是否还有线程,如果没有则新增一个线程。

如果代码(4)添加任务失败,则说明任务队列已满,那么执行代码(5)尝试新开启线程(如图 8-1 中的 thread3 和 thread4)来执行该任务,如果当前线程池中线程个数 >maximumPoolSize 则执行拒绝策略。

下面分析下新增线程的 addWorkder 方法,代码如下。

  1. private boolean addWorkerRunnable firstTask, boolean core {
  2. retry:
  3. for (; ; {
  4. int c = ctl.get();
  5. int rs = runStateOfc);
  6. //(6)检查队列是否只在必要时为空
  7. if (rs >= SHUTDOWN &&
  8. rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))
  11. return false
  12. //(7)循环 CAS 增加线程个数
  13. for (; {
  14. int wc = workerCountOfc);
  15. //(7.1)如果线程个数超限则返回 false
  16. if (wc >= CAPACITY ||
  17. wc >= core corePoolSize : maximumPoolSize))
  18. return false
  19. //(7.2)CAS 增加线程个数,同时只有一个线程成功
  20. if (compareAndIncrementWorkerCountc))
  21. break retry
  22. //(7.3)CAS 失败了,则看线程池状态是否变化了,变化则跳到外层循环重新尝试获取线程池
  23. 状态,否则内层循环重新 CAS
  24. c = ctl.get(); // Re-read ctl
  25. if (runStateOfc = rs)
  26. continue retry
  27. }
  28. }
  29. //(8)到这里说明 CAS 成功了
  30. boolean workerStarted = false
  31. boolean workerAdded = false
  32. Worker w = null
  33. try {
  34. //(8.1)创建 worker
  35. final ReentrantLock mainLock = this.mainLock
  36. w = new WorkerfirstTask);
  37. final Thread t = w.thread
  38. if t = null {
  39. //(8.2)加独占锁,为了实现 workers 同步,因为可能多个线程调用了线程池的 execute 方法
  40. mainLock.lock();
  41. try {
  42. //(8.3)重新检查线程池状态,以避免在获取锁前调用了 shutdown 接口
  43. int c = ctl.get();
  44. int rs = runStateOfc);
  45. if rs < SHUTDOWN ||
  46. (rs == SHUTDOWN && firstTask == null)) {
  47. if t.isAlive()) // precheck that t is startable
  48. throw new IllegalThreadStateException();
  49. //(8.4)添加任务
  50. workers.addw);
  51. int s = workers.size();
  52. if s > largestPoolSize
  53. largestPoolSize = s
  54. workerAdded = true
  55. }
  56. } finally {
  57. mainLock.unlock();
  58. }
  59. //(8.5)添加成功后则启动任务
  60. if workerAdded {
  61. t.start();
  62. workerStarted = true
  63. }
  64. }
  65. } finally {
  66. if (! workerStarted
  67. addWorkerFailedw);
  68. }
  69. return workerStarted
  70. }

代码比较长,主要分两个部分:第一部分双重循环的目的是通过 CAS 操作增加线程数;第二部分主要是把并发安全的任务添加到 workers 里面,并且启动任务执行。

首先来分析第一部分的代码(6)。

  1. rs >= SHUTDOWN &&
  2. ! (rs == SHUTDOWN &&
  3. firstTask == null &&
  4. ! workQueue.isEmpty())

展开!运算后等价于

  1. s >= SHUTDOWN &&
  2. (rs ! = SHUTDOWN ||//(I)
  3. firstTask ! = null ||//(II)
  4. workQueue.isEmpty())//(III)

也就是说代码(6)在下面几种情况下会返回 false:

·(I)当前线程池状态为 STOP、TIDYING 或 TERMINATED。

·(II)当前线程池状态为 SHUTDOWN 并且已经有了第一个任务。

·(III)当前线程池状态为 SHUTDOWN 并且任务队列为空。

内层循环的作用是使用 CAS 操作增加线程数,代码(7.1)判断如果线程个数超限则返回 false,否则执行代码(7.2)CAS 操作设置线程个数,CAS 成功则退出双循环,CAS 失败则执行代码(7.3)看当前线程池的状态是否变化了,如果变了,则再次进入外层循环重新获取线程池状态,否则进入内层循环继续进行 CAS 尝试。

执行到第二部分的代码(8)时说明使用 CAS 成功地增加了线程个数,但是现在任务还没开始执行。这里使用全局的独占锁来控制把新增的 Worker 添加到工作集 workers 中。代码(8.1)创建了一个工作线程 Worker。

代码(8.2)获取了独占锁,代码(8.3)重新检查线程池状态,这是为了避免在获取锁前其他线程调用了 shutdown 关闭了线程池。如果线程池已经被关闭,则释放锁,新增线程失败,否则执行代码(8.4)添加工作线程到线程工作集,然后释放锁。代码(8.5)判断如果新增工作线程成功,则启动工作线程。

工作线程 Worker 的执行

用户线程提交任务到线程池后,由 Worker 来执行。先看下 Worker 的构造函数。

  1. WorkerRunnable firstTask {
  2. setState(-1); // 在调用 runWorker 前禁止中断
  3. this.firstTask = firstTask
  4. this.thread = getThreadFactory().newThreadthis); //创建一个线程
  5. }

在构造函数内首先设置 Worker 的状态为-1,这是为了避免当前 Worker 在调用 runWorker 方法前被中断(当其他线程调用了线程池的 shutdownNow 时,如果 Worker 状态 >=0 则会中断该线程)。这里设置了线程的状态为-1,所以该线程就不会被中断了。在如下 runWorker 代码中,运行代码(9)时会调用 unlock 方法,该方法把 status 设置为了 0,所以这时候调用 shutdownNow 会中断 Worker 线程。

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); //(9)将 state 设置为 0,允许中断
  6. boolean completedAbruptly = true
  7. try {
  8. //(10)
  9. while task ! = null || (task = getTask()) ! = null {
  10. //(10.1)
  11. w.lock();
  12. ...
  13. try {
  14. //(10.2)执行任务前干一些事情
  15. beforeExecutewt, task);
  16. Throwable thrown = null
  17. try {
  18. task.run(); //(10.3)执行任务
  19. } catch RuntimeException x {
  20. thrown = x throw x
  21. } catch Error x {
  22. thrown = x throw x
  23. } catch Throwable x {
  24. thrown = x throw new Errorx);
  25. } finally {
  26. //(10.4)执行任务完毕后干一些事情
  27. afterExecutetask, thrown);
  28. }
  29. } finally {
  30. task = null
  31. //(10.5)统计当前 Worker 完成了多少个任务
  32. w.completedTasks++;
  33. w.unlock();
  34. }
  35. }
  36. completedAbruptly = false
  37. } finally {
  38. //(11)执行清理工作
  39. processWorkerExitw, completedAbruptly);
  40. }
  41. }

在如上代码(10)中,如果当前 task==null 或者调用 getTask 从任务队列获取的任务返回 null,则跳转到代码(11)执行。如果 task 不为 null 则执行代码(10.1)获取工作线程内部持有的独占锁,然后执行扩展接口代码(10.2)在具体任务执行前做一些事情。代码(10.3)具体执行任务,代码(10.4)在任务执行完毕后做一些事情,代码(10.5)统计当前 Worker 完成了多少个任务,并释放锁。

这里在执行具体任务期间加锁,是为了避免在任务运行期间,其他线程调用了 shutdown 后正在执行的任务被中断(shutdown 只会中断当前被阻塞挂起的线程

代码(11)执行清理任务,其代码如下。

  1. private void processWorkerExitWorker w, boolean completedAbruptly {
  2. ...
  3. //(11.1)统计整个线程池完成的任务个数,并从工作集里面删除当前 Woker
  4. final ReentrantLock mainLock = this.mainLock
  5. mainLock.lock();
  6. try {
  7. completedTaskCount += w.completedTasks
  8. workers.removew);
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. //(11.2)尝试设置线程池状态为 TERMINATED,如果当前是 SHUTDONW 状态并且工作队列为空
  13. //或者当前是 STOP 状态,当前线程池里面没有活动线程
  14. tryTerminate();
  15. //(11.3)如果当前线程个数小于核心个数,则增加
  16. int c = ctl.get();
  17. if runStateLessThan(c, STOP)) {
  18. if (! completedAbruptly {
  19. int min = allowCoreThreadTimeOut 0 : corePoolSize
  20. if min == 0 && ! workQueue.isEmpty())
  21. min = 1
  22. if workerCountOf(c) >= min
  23. return // replacement not needed
  24. }
  25. addWorkernull, false);
  26. }
  27. }

在如上代码中,代码(11.1)统计线程池完成任务个数,并且在统计前加了全局锁。把在当前工作线程中完成的任务累加到全局计数器,然后从工作集中删除当前 Worker。

代码(11.2)判断如果当前线程池状态是 SHUTDOWN 并且工作队列为空,或者当前线程池状态是 STOP 并且当前线程池里面没有活动线程,则设置线程池状态为 TERMINATED。如果设置为了 TERMINATED 状态,则还需要调用条件变量 termination 的 signalAll()方法激活所有因为调用线程池的 awaitTermination 方法而被阻塞的线程。

代码(11.3)则判断当前线程池里面线程个数是否小于核心线程个数,如果是则新增一个线程。

shutdown 操作

调用 shutdown 方法后,线程池就不会再接受新的任务了,但是工作队列里面的任务还是要执行的。该方法会立刻返回,并不等待队列任务完成再返回。

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock
  3. mainLock.lock();
  4. try {
  5. //(12)权限检查
  6. checkShutdownAccess();
  7. //(13)设置当前线程池状态为 SHUTDOWN,如果已经是 SHUTDOWN 则直接返回
  8. advanceRunStateSHUTDOWN);
  9. //(14)设置中断标志
  10. interruptIdleWorkers();
  11. onShutdown();
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. //(15)尝试将状态变为 TERMINATED
  16. tryTerminate();
  17. }

在如上代码中,代码(12)检查看是否设置了安全管理器,是则看当前调用 shutdown 命令的线程是否有关闭线程的权限,如果有权限则还要看调用线程是否有中断工作线程的权限,如果没有权限则抛出 SecurityException 或者 NullPointerException 异常。

其中代码(13)的内容如下,如果当前线程池状态 >=SHUTDOWN 则直接返回,否则设置为 SHUTDOWN 状态。

  1. private void advanceRunState(int targetState) {
  2. for (; ; ) {
  3. int c = ctl.get();
  4. if (runStateAtLeast(c, targetState) ||
  5. ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
  6. break;
  7. }
  8. }

代码(14)的内容如下,其设置所有空闲线程的中断标志。这里首先加了全局锁,同时只有一个线程可以调用 shutdown 方法设置中断标志。然后尝试获取 Worker 自己的锁,获取成功则设置中断标志。由于正在执行的任务已经获取了锁,所以正在执行的任务没有被中断。这里中断的是阻塞到 getTask()方法并企图从队列里面获取任务的线程,也就是空闲线程。

  1. private void interruptIdleWorkersboolean onlyOne {
  2. final ReentrantLock mainLock = this.mainLock
  3. mainLock.lock();
  4. try {
  5. for Worker w : workers {
  6. Thread t = w.thread
  7. //如果工作线程没有被中断,并且没有正在运行则设置中断标志
  8. if (! t.isInterrupted() && w.tryLock()) {
  9. try {
  10. t.interrupt();
  11. } catch SecurityException ignore {
  12. } finally {
  13. w.unlock();
  14. }
  15. }
  16. if onlyOne
  17. break
  18. }
  19. } finally {
  20. mainLock.unlock();
  21. }
  22. }
  23. final void tryTerminate() {
  24. for (; ; {
  25. ...
  26. int c = ctl.get();
  27. ...
  28. final ReentrantLock mainLock = this.mainLock
  29. mainLock.lock();
  30. try {//设置当前线程池状态为 TIDYING
  31. if ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  32. try {
  33. terminated();
  34. } finally {
  35. //设置当前线程池状态为 TERMINATED
  36. ctl.setctlOf(TERMINATED, 0));
  37. //激活因调用条件变量 termination 的 await 系列方法而被阻塞的所有线程
  38. termination.signalAll();
  39. }
  40. return
  41. }
  42. } finally {
  43. mainLock.unlock();
  44. }
  45. }
  46. }

在如上代码中,首先使用 CAS 设置当前线程池状态为 TIDYING,如果设置成功则执行扩展接口 terminated 在线程池状态变为 TERMINATED 前做一些事情,然后设置当前线程池状态为 TERMINATED。最后调用 termination.signalAll()激活因调用条件变量 termination 的 await 系列方法而被阻塞的所有线程,关于这一点随后讲到 awaitTermination 方法时具体讲解。

shutdownNow 操作

调用 shutdownNow 方法后,线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务,正在执行的任务会被中断,该方法会立刻返回,并不等待激活的任务执行完成。返回值为这时候队列里面被丢弃的任务列表

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks
  3. final ReentrantLock mainLock = this.mainLock
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess(); //(16)权限检查
  7. advanceRunStateSTOP); //(17) 设置线程池状态为 STOP
  8. interruptWorkers(); //(18)中断所有线程
  9. tasks = drainQueue(); //(19)将队列任务移动到 tasks 中
  10. } finally {
  11. mainLock.unlock();
  12. }
  13. tryTerminate();
  14. return tasks
  15. }

在如上代码中,首先调用代码(16)检查权限,然后调用代码(17)设置当前线程池状态为 STOP,随后执行代码(18)中断所有的工作线程。这里需要注意的是,中断的所有线程包含空闲线程和正在执行任务的线程。

  1. private void interruptWorkers() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers)
  6. w.interruptIfStarted();
  7. } finally {
  8. mainLock.unlock();
  9. }
  10. }

然后代码(19)将当前任务队列里面的任务移动到 tasks 列表。

awaitTermination 操作

当线程调用 awaitTermination 方法后,当前线程会被阻塞,直到线程池状态变为 TERMINATED 才返回,或者等待时间超时才返回。整个过程中独占锁的代码如下。

  1. public boolean awaitTermination(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. long nanos = unit.toNanos(timeout);
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. for (; ; ) {
  8. if (runStateAtLeast(ctl.get(), TERMINATED))
  9. return true;
  10. if (nanos <= 0)
  11. return false;
  12. nanos = termination.awaitNanos(nanos);
  13. }
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. }

如上代码首先获取独占锁,然后在无限循环内部判断当前线程池状态是否至少是 TERMINATED 状态,如果是则直接返回,否则说明当前线程池里面还有线程在执行,则看设置的超时时间 nanos 是否小于 0,小于 0 则说明不需要等待,那就直接返回,如果大于 0 则调用条件变量 termination 的 awaitNanos 方法等待 nanos 时间,期望在这段时间内线程池状态变为 TERMINATED。

在讲解 shutdown 方法时提到过,当线程池状态变为 TERMINATED 时,会调用 termination.signalAll()用来激活调用条件变量 termination 的 await 系列方法被阻塞的所有线程,所以如果在调用 awaitTermination 之后又调用了 shutdown 方法,并且在 shutdown 内部将线程池状态设置为 TERMINATED,则 termination.awaitNanos 方法会返回。

另外在工作线程 Worker 的 runWorker 方法内,当工作线程运行结束后,会调用 processWorkerExit 方法,在 processWorkerExit 方法内部也会调用 tryTerminate 方法测试当前是否应该把线程池状态设置为 TERMINATED,如果是,则也会调用 termination. signalAll()用来激活调用线程池的 awaitTermination 方法而被阻塞的线程。

而且当等待时间超时后,termination.awaitNanos 也会返回,这时候会重新检查当前线程池状态是否为 TERMINATED,如果是则直接返回,否则继续阻塞挂起自己。

总结

线程池巧妙地使用一个 Integer 类型的原子变量来记录线程池状态和线程池中的线程个数。通过线程池状态来控制任务的执行,每个 Worker 线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。