Executor执行Runnable任务

1. execute(Runnable command)

execute()定义在ThreadPoolExecutor.java中,源码如下:

  1. public void execute(Runnable command) {
  2. // 如果任务为null,则抛出异常。
  3. if (command == null) throw new NullPointerException();
  4. // 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
  5. int c = ctl.get();
  6. // 当线程池中的任务数量 < "核心池大小"时,即线程池中少于corePoolSize个任务。
  7. // 则通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
  8. if (workerCountOf(c) < corePoolSize) {
  9. if (addWorker(command, true))
  10. return;
  11. c = ctl.get();
  12. }
  13. // 当线程池中的任务数量 >= "核心池大小"时,
  14. // 而且,"线程池处于运行状态"时,则尝试将任务添加到阻塞队列中。
  15. if (isRunning(c) && workQueue.offer(command)) {
  16. // 再次确认“线程池状态”,若线程池异常终止了,则删除任务;然后通过reject()执行相应的拒绝策略的内容。
  17. int recheck = ctl.get();
  18. if (! isRunning(recheck) && remove(command))
  19. reject(command);
  20. // 否则,如果"线程池中任务数量"为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
  21. else if (workerCountOf(recheck) == 0)
  22. addWorker(null, false);
  23. }
  24. // 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
  25. // 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
  26. else if (!addWorker(command, false))
  27. reject(command);
  28. }

说明:execute()的作用是将任务添加到线程池中执行。它会分为3种情况进行处理:
情况1 — 如果”线程池中任务数量” < “核心池大小”时,即线程池中少于corePoolSize个任务;此时就新建一个线程,并将该任务添加到线程中进行执行。
情况2 — 如果”线程池中任务数量” >= “核心池大小”,并且”线程池是允许状态”;此时,则将任务添加到阻塞队列中阻塞等待。在该情况下,会再次确认”线程池的状态”,如果”第2次读到的线程池状态”和”第1次读到的线程池状态”不同,则从阻塞队列中删除该任务。
情况3 — 非以上两种情况。在这种情况下,尝试新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,则通过reject()拒绝该任务。
在多线程环境下,线程池的状态时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将command加入workque是线程池之前的状态。倘若没有double check,万一线程池处于非running状态(在多线程环境下很有可能发生),那么command永远不会执行。

2. addWorker()

addWorker()的源码如下:

  1. private final ReentrantLock mainLock = new ReentrantLock();
  2. private boolean addWorker(Runnable firstTask, boolean core) {
  3. retry:
  4. // 更新"线程池状态和计数"标记,即更新ctl,core为true的话,则以corePoolSize为界限
  5. for (;;) {
  6. // 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息
  7. int c = ctl.get();
  8. // 获取线程池状态。
  9. int rs = runStateOf(c);
  10. // 有效性检查
  11. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
  12. return false;
  13. for (;;) {
  14. // 获取线程池中任务的数量。
  15. int wc = workerCountOf(c);
  16. // 如果"线程池中任务的数量"超过限制,则返回false。
  17. if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
  18. return false;
  19. // 通过CAS函数将c的值+1。操作失败的话,则退出循环。
  20. if (compareAndIncrementWorkerCount(c))
  21. break retry;
  22. c = ctl.get(); // Re-read ctl
  23. // 检查"线程池状态",如果与之前的状态不同,则从retry重新开始。
  24. if (runStateOf(c) != rs)
  25. continue retry;
  26. // else CAS failed due to workerCount change; retry inner loop
  27. }
  28. }
  29. boolean workerStarted = false;
  30. boolean workerAdded = false;
  31. Worker w = null;
  32. // 添加任务到线程池,并启动任务所在的线程。
  33. try {
  34. final ReentrantLock mainLock = this.mainLock;
  35. // 新建Worker,并且指定firstTask为Worker的第一个任务。
  36. w = new Worker(firstTask);
  37. // 获取Worker对应的线程。
  38. final Thread t = w.thread;
  39. if (t != null) {
  40. // 获取锁
  41. mainLock.lock();
  42. try {
  43. int c = ctl.get();
  44. int rs = runStateOf(c);
  45. // 再次确认"线程池状态"
  46. if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
  47. // precheck that t is startable
  48. if (t.isAlive()) throw new IllegalThreadStateException();
  49. // 将Worker对象(w)添加到"线程池的Worker集合(workers)"中
  50. workers.add(w);
  51. // 更新largestPoolSize
  52. int s = workers.size();
  53. if (s > largestPoolSize)
  54. largestPoolSize = s;
  55. workerAdded = true;
  56. }
  57. } finally {
  58. // 释放锁
  59. mainLock.unlock();
  60. }
  61. // 如果"成功将任务添加到线程池"中,则启动任务所在的线程。
  62. if (workerAdded) {
  63. t.start();
  64. workerStarted = true;
  65. }
  66. }
  67. } finally {
  68. if (! workerStarted)
  69. addWorkerFailed(w);
  70. }
  71. // 返回任务是否启动。
  72. return workerStarted;
  73. }

说明
addWorker(Runnable firstTask, boolean core) 的作用是将任务(firstTask)添加到线程池中,并启动该任务。
core为true的话,则以corePoolSize为界限,若”线程池中已有任务数量>=corePoolSize”,则返回false;core为false的话,则以maximumPoolSize为界限,若”线程池中已有任务数量>=maximumPoolSize”,则返回false。
addWorker()会先通过for循环不断尝试更新ctl状态,ctl记录了”线程池中任务数量和线程池状态”。
更新成功之后,再通过try模块来将任务添加到线程池中,并启动任务所在的线程。
从addWorker()中,我们能清晰的发现:线程池在添加任务时,会创建任务对应的Worker对象;而一个Workder对象包含一个Thread对象。(01) 通过将Worker对象添加到”线程的workers集合”中,从而实现将任务添加到线程池中。 (02) 通过启动Worker对应的Thread线程,则执行该任务。

3. Worker类

Worker为ThreadPoolExecutor的内部类,
(1)继承了AQS类,可以方便的实现工作线程的中止操作;
(2)实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
(3)当前提交的任务firstTask作为参数传入Worker的构造方法;
Worker类属性和构造方法源码如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

     final Thread thread;
     Runnable firstTask;
     volatile long completedTasks;
     Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
}

说明 :thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。

3. Worker类的runworker方法

源码如下

public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    // 获取当前线程
    Thread wt = Thread.currentThread();
    // 获取w的firstTask
    Runnable task = w.firstTask;
    // 设置w的firstTask为null
    w.firstTask = null;
    // 释放锁(设置AQS的state为0,表示允许中断)
    w.unlock(); 
   // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) { // 任务不为null或者阻塞队列还存在任务
            // 获取锁,保证线程不被其他线程中断(除非线程池被中断)
             w.lock();
             // If pool is stopping, ensure thread is interrupted;
             // if not, ensure thread is not interrupted.  This
             // requires a recheck in second case to deal with
             // shutdownNow race while clearing interrupt
        //检查线程池状态,如果线程池处于中断状态,则中断线程if ((runStateAtLeast(ctl.get(), STOP) ||    // 线程池的运行状态至少应该高于STOP
                 (Thread.interrupted() &&                // 线程被中断
                  runStateAtLeast(ctl.get(), STOP))) &&    // 再次检查,线程池的运行状态至少应该高于STOP
                  !wt.isInterrupted())                    // wt线程(当前线程)没有被中断
                  wt.interrupt();                            // 中断wt线程(当前线程)
             try {
                  // 在执行之前调用钩子函数
                  beforeExecute(wt, task);
                  Throwable thrown = null;
                  try {
                      // 运行给定的任务
                      task.run();
                  } catch (RuntimeException x) {
                        thrown = x; throw x;
                  } catch (Error x) {
                        thrown = x; throw x;
                  } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                  } finally {
                        // 执行完后调用钩子函数
                        afterExecute(task, thrown);
                  }
              } finally {
                   task = null;
                    // 增加给worker完成的任务数量
                    w.completedTasks++;
                    // 释放锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 处理完成后,调用钩子函数
            processWorkerExit(w, completedAbruptly);
    }
 }

说明 此函数中会实际执行给定任务(即调用用户重写的run方法),并且当给定任务完成后,会继续从阻塞队列中取任务,直到阻塞队列为空(即任务全部完成)。在执行给定任务时,会调用钩子函数,利用钩子函数可以完成用户自定义的一些逻辑。在runWorker中会调用到getTask函数从阻塞队列中获取等待的任务(如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源)和processWorkerExit钩子函数。
runWorker方法是线程池的核心:
1. 线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行中断;
2. Worker执行firstTask或从workQueue中获取任务:
2.1. 进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)
2.2. 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
2.3. 执行beforeExecute
2.4 执行任务的run方法
2.5 执行afterExecute方法
2.6 解锁操作

4. getTask方法

getTask方法从阻塞队列中获取等待的任务
源码如下:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) { // 无限循环,确保操作成功
            // 获取线程池控制状态
            int c = ctl.get();
            // 运行的状态
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 大于等于SHUTDOWN(表示调用了shutDown)并且(大于等于STOP(调用了shutDownNow)或者worker阻塞队列为空)
                // 减少worker的数量
                decrementWorkerCount();
                // 返回null,不执行任务
                return null;
            }
            // 获取worker数量
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否允许coreThread超时或者workerCount大于核心大小
            if ((wc > maximumPoolSize || (timed && timedOut))     // worker数量大于maximumPoolSize
                && (wc > 1 || workQueue.isEmpty())) {            // workerCount大于1或者worker阻塞队列为空(在阻塞队列不为空时,需要保证至少有一个wc)
                if (compareAndDecrementWorkerCount(c))            // 比较并减少workerCount
                    // 返回null,不执行任务,该worker会退出
                    return null;
                // 跳过剩余部分,继续循环
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :    // 等待指定时间
                    workQueue.take();                                        // 一直等待,直到有元素
                if (r != null)
                    return r;
                // 等待指定时间后,没有获取元素,则超时
                timedOut = true;
            } catch (InterruptedException retry) {
                // 抛出了被中断异常,重试,没有超时
                timedOut = false;
            }
        }
    }

说明:此函数用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持有限时间等待(poll)和无限时间等待(take)。在该函数中还会响应shutDown和、shutDownNow函数的操作,若检测到线程池处于SHUTDOWN或STOP状态,则会返回null,而不再返回阻塞队列中的Runnalbe对象。
注意这里一段代码是keepAliveTime起作用的关键:
// Are workers subject to culling?
// 是否允许coreThread超时或者workerCount大于核心大小
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
…..
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
allowCoreThreadTimeOut为false,线程即使空闲也不会被销毁;倘若为ture,在keepAliveTime内仍空闲则会被销毁。
如果线程允许空闲等待而不被销毁,workQueue.take()任务:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
如果线程不允许无休止空闲, 则 workQueue.poll()任务:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;

5. processWorkerExit

processWorkerExit函数是在worker退出时调用到的钩子函数,而引起worker退出的主要因素如下
(1) 阻塞队列已经为空,即没有任务可以运行了。
 (2) 调用了shutDown或shutDownNow函数
processWorkerExit源码如下

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 如果被中断,则需要减少workCount    // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        // 获取可重入锁
        final ReentrantLock mainLock = this.mainLock;
        // 获取锁
        mainLock.lock();
        try {
            // 将worker完成的任务添加到总的完成任务中
            completedTaskCount += w.completedTasks;
            // 从workers集合中移除该worker
            workers.remove(w);
        } finally {
            // 释放锁
            mainLock.unlock();
        }
        // 尝试终止
        tryTerminate();
        // 获取线程池控制状态
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) { // 小于STOP的运行状态
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty()) // 允许核心超时并且workQueue阻塞队列不为空
                    min = 1;
                if (workerCountOf(c) >= min) // workerCount大于等于min
                    // 直接返回
                    return; // replacement not needed
            }
            // 添加worker
            addWorker(null, false);
        }
    }

说明:此函数会根据是否中断了空闲线程来确定是否减少workerCount的值,并且将worker从workers集合中移除并且会尝试终止线程池。