引言
上一篇文章,我们讲解了FutureTask和AbstractExecutorService。AbstractExecutorService给出了ExecutorService接口的几个方法的实现,包括submit、invokeAll、invokeAny等。FutureTask实现了Future和Runnable,并且内部有Callable作为要执行的任务,它给出了Future接口几个方法的实现,例如get、cancel等。这篇文章,我们来看最重要的线程池ThreadPoolExecutor。
execute方法
在讲AbstractExecutorService的submit逻辑时,我们知道它构造了FutureTask,然后调用execute方法。ThreadPoolExecutor的重要逻辑也是execute方法:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
首先,它会检查当前的线程数量是否小于设置的核心线程数,如果小于,就会调用addWorker方法添加线程,并将参数Runnable作为该线程运行的第一个task(线程池中的线程会执行多个Runnable)。我们来看一下它是怎么判断当前的线程数量的。
线程池状态和线程数量表示
ThreadPoolExecutor使用一个AtomicInteger来记录线程池的运行状态和线程池的活跃线程数。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
一个int类型要记录两个值,就需要进行位拆分。它的高三位用来表示线程池的运行状态,低29位表示线程的数量。所以线程数量最多是2的29次方-1。线程池的运行状态有以下几种:
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
COUNT_BITS:
private static final int COUNT_BITS = Integer.SIZE - 3;
是29。
所以线程的状态用三位二进制表示分别是:
111 000 001 010 011
execute方法中的workerCountOf方法就是取AtomicInteger的后29位的值:
private static int workerCountOf(int c) { return c & CAPACITY; }
CAPACITY:
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
addWorker增加新的线程并在新的线程中执行任务
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
首先,它会在一个不断的循环中增加线程数量,直到增加成功为止。然后,它创建了一个新的Worker对象,Worker是ThreadPoolExecutor的一个内部类,它的定义如下:
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{}
它是一个同步器,同时还实现了Runnable接口,那它的作用是什么呢?
首先,我们来看它的字段:
/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;
有thread,说明Worker能够创建新的线程。有runnable说明它需要执行任务,并且每个worker中的线程会执行多个任务,所以这里的命名为firstTask,它是在创建Worker时给定的。completedTasks用来记录这个worker完成的任务数量。
构造方法:
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}
首先,它调用的是AQS的setState方法,将state设为-1。然后为firstTask和thread赋值,注意,这里创建的新的thread的target(内部的runnable)是当前worker自己(Worker实现了runnable),所以,如果调用thread.start方法,它会执行worker的run方法。而worker的run方法就有了执行任务的逻辑:
public void run() {runWorker(this);}
run方法调用的是ThreadPoolExecutor的runWorker方法,参数是当前worker自己:
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
重要的逻辑就是会在一个循环中不断地取下一个任务,然后执行。注意,因为runWorker是Worker的run方法调用的,所以执行的线程是Worker的thread,这样就实现了在新创建的线程中执行任务的逻辑。
我们再回去看addWorker方法,它创建worker之后,最后会调用:
t.start();
t:
w = new Worker(firstTask);final Thread t = w.thread;
就是worker内部的线程。
所以worker这个类一方面新建线程,另一方面包含了在新建的线程中执行任务的逻辑。
我们继续看execute的逻辑:
核心线程达到限制之后
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
如果核心线程数已满,就会将任务加入队列。如果队列也满了,就会调用addWorker(command, false)继续增加核心线程以外的线程,如果增加失败,说明达到最大线程数,就会执行拒绝策略。
四种拒绝策略
当任务队列已满并且最大线程数量已经达到限制之后,继续提交的任务就需要使用拒绝策略来处理。RejectedExecutionHandler接口表示拒绝策略,它的rejectedExecution方法用来执行拒绝策略 。要想提供自己的拒绝策略,只需要实现这个接口然后在rejectedExecution方法中实现拒绝逻辑即可。
ThreadPooleExecutor类默认提供了四种拒绝策略:分别是CallerRunsPolicy、AbortPolicy、DiscardPolicy和DiscardOldestPolicy。我们来看一下这几个策略的拒绝逻辑。
CallerRunsPolicy
这个拒绝策略是让执行execute方法的线程来执行任务。上面我们讲execute方法的时候可以看到,ThreadPoolExecutor会让线程池中的线程来执行任务,线程池中的线程与执行execute方法的线程并不是同一个线程。它的实现很简单:
public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.** @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}
在rejectedExecution方法中,只是简单的调用了runnable.run方法,因为调用rejectedExecution方法的线程就是调用execute方法的线程。
AbortPolicy
AbortPolicy是线程池默认的拒绝策略,它会直接抛出一个RejectedExecutionException异常:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}
DiscardPolicy
DiscardPolicy会直接丢弃任务,不进行处理,也不抛出异常,所以它的rejectedExecution方法是空的:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
DiscardOldestPolicy
DiscardOldestPolicy会丢弃下一个要执行的任务,然后尝试执行新的任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}
可以看到它会将队列中的下一个任务弹出。所以如果我们使用的是优先级队列,那么这个策略会丢弃优先级最高的任务,所以一般不要将这个策略与优先级队列一起使用。
小结
这篇文章,我们讲解了线程池的执行过程,重点分析了它的execute方法的逻辑。其中涉及到了Worker的使用、线程池运行状态的判断、线程数量的表示等,然后罗列了四种拒绝策略的拒绝逻辑。下一篇文章,我们来看Executors默认给出的几种线程池以及它们的优缺点。
