引言

上一篇文章,我们讲解了FutureTask和AbstractExecutorService。AbstractExecutorService给出了ExecutorService接口的几个方法的实现,包括submit、invokeAll、invokeAny等。FutureTask实现了Future和Runnable,并且内部有Callable作为要执行的任务,它给出了Future接口几个方法的实现,例如get、cancel等。这篇文章,我们来看最重要的线程池ThreadPoolExecutor。

execute方法

在讲AbstractExecutorService的submit逻辑时,我们知道它构造了FutureTask,然后调用execute方法。ThreadPoolExecutor的重要逻辑也是execute方法:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. /*
  5. * Proceed in 3 steps:
  6. *
  7. * 1. If fewer than corePoolSize threads are running, try to
  8. * start a new thread with the given command as its first
  9. * task. The call to addWorker atomically checks runState and
  10. * workerCount, and so prevents false alarms that would add
  11. * threads when it shouldn't, by returning false.
  12. *
  13. * 2. If a task can be successfully queued, then we still need
  14. * to double-check whether we should have added a thread
  15. * (because existing ones died since last checking) or that
  16. * the pool shut down since entry into this method. So we
  17. * recheck state and if necessary roll back the enqueuing if
  18. * stopped, or start a new thread if there are none.
  19. *
  20. * 3. If we cannot queue task, then we try to add a new
  21. * thread. If it fails, we know we are shut down or saturated
  22. * and so reject the task.
  23. */
  24. int c = ctl.get();
  25. if (workerCountOf(c) < corePoolSize) {
  26. if (addWorker(command, true))
  27. return;
  28. c = ctl.get();
  29. }
  30. if (isRunning(c) && workQueue.offer(command)) {
  31. int recheck = ctl.get();
  32. if (! isRunning(recheck) && remove(command))
  33. reject(command);
  34. else if (workerCountOf(recheck) == 0)
  35. addWorker(null, false);
  36. }
  37. else if (!addWorker(command, false))
  38. reject(command);
  39. }

首先,它会检查当前的线程数量是否小于设置的核心线程数,如果小于,就会调用addWorker方法添加线程,并将参数Runnable作为该线程运行的第一个task(线程池中的线程会执行多个Runnable)。我们来看一下它是怎么判断当前的线程数量的。

线程池状态和线程数量表示

ThreadPoolExecutor使用一个AtomicInteger来记录线程池的运行状态和线程池的活跃线程数。

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

一个int类型要记录两个值,就需要进行位拆分。它的高三位用来表示线程池的运行状态,低29位表示线程的数量。所以线程数量最多是2的29次方-1。线程池的运行状态有以下几种:

  1. private static final int RUNNING = -1 << COUNT_BITS;
  2. private static final int SHUTDOWN = 0 << COUNT_BITS;
  3. private static final int STOP = 1 << COUNT_BITS;
  4. private static final int TIDYING = 2 << COUNT_BITS;
  5. private static final int TERMINATED = 3 << COUNT_BITS;

COUNT_BITS:

  1. private static final int COUNT_BITS = Integer.SIZE - 3;

是29。
所以线程的状态用三位二进制表示分别是:

  1. 111 000 001 010 011

execute方法中的workerCountOf方法就是取AtomicInteger的后29位的值:

  1. private static int workerCountOf(int c) { return c & CAPACITY; }

CAPACITY:

  1. private static final int CAPACITY = (1 << COUNT_BITS) - 1;

然后我们看addWaiter方法。

addWorker增加新的线程并在新的线程中执行任务

  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN &&
  8. ! (rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))
  11. return false;
  12. for (;;) {
  13. int wc = workerCountOf(c);
  14. if (wc >= CAPACITY ||
  15. wc >= (core ? corePoolSize : maximumPoolSize))
  16. return false;
  17. if (compareAndIncrementWorkerCount(c))
  18. break retry;
  19. c = ctl.get(); // Re-read ctl
  20. if (runStateOf(c) != rs)
  21. continue retry;
  22. // else CAS failed due to workerCount change; retry inner loop
  23. }
  24. }
  25. boolean workerStarted = false;
  26. boolean workerAdded = false;
  27. Worker w = null;
  28. try {
  29. w = new Worker(firstTask);
  30. final Thread t = w.thread;
  31. if (t != null) {
  32. final ReentrantLock mainLock = this.mainLock;
  33. mainLock.lock();
  34. try {
  35. // Recheck while holding lock.
  36. // Back out on ThreadFactory failure or if
  37. // shut down before lock acquired.
  38. int rs = runStateOf(ctl.get());
  39. if (rs < SHUTDOWN ||
  40. (rs == SHUTDOWN && firstTask == null)) {
  41. if (t.isAlive()) // precheck that t is startable
  42. throw new IllegalThreadStateException();
  43. workers.add(w);
  44. int s = workers.size();
  45. if (s > largestPoolSize)
  46. largestPoolSize = s;
  47. workerAdded = true;
  48. }
  49. } finally {
  50. mainLock.unlock();
  51. }
  52. if (workerAdded) {
  53. t.start();
  54. workerStarted = true;
  55. }
  56. }
  57. } finally {
  58. if (! workerStarted)
  59. addWorkerFailed(w);
  60. }
  61. return workerStarted;
  62. }

首先,它会在一个不断的循环中增加线程数量,直到增加成功为止。然后,它创建了一个新的Worker对象,Worker是ThreadPoolExecutor的一个内部类,它的定义如下:

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable{}

它是一个同步器,同时还实现了Runnable接口,那它的作用是什么呢?
首先,我们来看它的字段:

  1. /** Thread this worker is running in. Null if factory fails. */
  2. final Thread thread;
  3. /** Initial task to run. Possibly null. */
  4. Runnable firstTask;
  5. /** Per-thread task counter */
  6. volatile long completedTasks;

有thread,说明Worker能够创建新的线程。有runnable说明它需要执行任务,并且每个worker中的线程会执行多个任务,所以这里的命名为firstTask,它是在创建Worker时给定的。completedTasks用来记录这个worker完成的任务数量。
构造方法:

  1. Worker(Runnable firstTask) {
  2. setState(-1); // inhibit interrupts until runWorker
  3. this.firstTask = firstTask;
  4. this.thread = getThreadFactory().newThread(this);
  5. }

首先,它调用的是AQS的setState方法,将state设为-1。然后为firstTask和thread赋值,注意,这里创建的新的thread的target(内部的runnable)是当前worker自己(Worker实现了runnable),所以,如果调用thread.start方法,它会执行worker的run方法。而worker的run方法就有了执行任务的逻辑:

  1. public void run() {
  2. runWorker(this);
  3. }

run方法调用的是ThreadPoolExecutor的runWorker方法,参数是当前worker自己:

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {
  9. w.lock();
  10. // If pool is stopping, ensure thread is interrupted;
  11. // if not, ensure thread is not interrupted. This
  12. // requires a recheck in second case to deal with
  13. // shutdownNow race while clearing interrupt
  14. if ((runStateAtLeast(ctl.get(), STOP) ||
  15. (Thread.interrupted() &&
  16. runStateAtLeast(ctl.get(), STOP))) &&
  17. !wt.isInterrupted())
  18. wt.interrupt();
  19. try {
  20. beforeExecute(wt, task);
  21. Throwable thrown = null;
  22. try {
  23. task.run();
  24. } catch (RuntimeException x) {
  25. thrown = x; throw x;
  26. } catch (Error x) {
  27. thrown = x; throw x;
  28. } catch (Throwable x) {
  29. thrown = x; throw new Error(x);
  30. } finally {
  31. afterExecute(task, thrown);
  32. }
  33. } finally {
  34. task = null;
  35. w.completedTasks++;
  36. w.unlock();
  37. }
  38. }
  39. completedAbruptly = false;
  40. } finally {
  41. processWorkerExit(w, completedAbruptly);
  42. }
  43. }

重要的逻辑就是会在一个循环中不断地取下一个任务,然后执行。注意,因为runWorker是Worker的run方法调用的,所以执行的线程是Worker的thread,这样就实现了在新创建的线程中执行任务的逻辑。
我们再回去看addWorker方法,它创建worker之后,最后会调用:

  1. t.start();

t:

  1. w = new Worker(firstTask);
  2. final Thread t = w.thread;

就是worker内部的线程。
所以worker这个类一方面新建线程,另一方面包含了在新建的线程中执行任务的逻辑。
我们继续看execute的逻辑:

核心线程达到限制之后

  1. if (isRunning(c) && workQueue.offer(command)) {
  2. int recheck = ctl.get();
  3. if (! isRunning(recheck) && remove(command))
  4. reject(command);
  5. else if (workerCountOf(recheck) == 0)
  6. addWorker(null, false);
  7. }
  8. else if (!addWorker(command, false))
  9. reject(command);

如果核心线程数已满,就会将任务加入队列。如果队列也满了,就会调用addWorker(command, false)继续增加核心线程以外的线程,如果增加失败,说明达到最大线程数,就会执行拒绝策略。

四种拒绝策略

当任务队列已满并且最大线程数量已经达到限制之后,继续提交的任务就需要使用拒绝策略来处理。RejectedExecutionHandler接口表示拒绝策略,它的rejectedExecution方法用来执行拒绝策略 。要想提供自己的拒绝策略,只需要实现这个接口然后在rejectedExecution方法中实现拒绝逻辑即可。
ThreadPooleExecutor类默认提供了四种拒绝策略:分别是CallerRunsPolicy、AbortPolicy、DiscardPolicy和DiscardOldestPolicy。我们来看一下这几个策略的拒绝逻辑。

CallerRunsPolicy

这个拒绝策略是让执行execute方法的线程来执行任务。上面我们讲execute方法的时候可以看到,ThreadPoolExecutor会让线程池中的线程来执行任务,线程池中的线程与执行execute方法的线程并不是同一个线程。它的实现很简单:

  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2. /**
  3. * Executes task r in the caller's thread, unless the executor
  4. * has been shut down, in which case the task is discarded.
  5. *
  6. * @param r the runnable task requested to be executed
  7. * @param e the executor attempting to execute this task
  8. */
  9. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  10. if (!e.isShutdown()) {
  11. r.run();
  12. }
  13. }
  14. }

在rejectedExecution方法中,只是简单的调用了runnable.run方法,因为调用rejectedExecution方法的线程就是调用execute方法的线程。

AbortPolicy

AbortPolicy是线程池默认的拒绝策略,它会直接抛出一个RejectedExecutionException异常:

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  2. throw new RejectedExecutionException("Task " + r.toString() +
  3. " rejected from " +
  4. e.toString());
  5. }

执行execute时我们可以捕获这个异常。

DiscardPolicy

DiscardPolicy会直接丢弃任务,不进行处理,也不抛出异常,所以它的rejectedExecution方法是空的:

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  2. }

DiscardOldestPolicy

DiscardOldestPolicy会丢弃下一个要执行的任务,然后尝试执行新的任务。

  1. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  2. if (!e.isShutdown()) {
  3. e.getQueue().poll();
  4. e.execute(r);
  5. }
  6. }

可以看到它会将队列中的下一个任务弹出。所以如果我们使用的是优先级队列,那么这个策略会丢弃优先级最高的任务,所以一般不要将这个策略与优先级队列一起使用。

小结

这篇文章,我们讲解了线程池的执行过程,重点分析了它的execute方法的逻辑。其中涉及到了Worker的使用、线程池运行状态的判断、线程数量的表示等,然后罗列了四种拒绝策略的拒绝逻辑。下一篇文章,我们来看Executors默认给出的几种线程池以及它们的优缺点。