引言
上一篇文章,我们讲解了FutureTask和AbstractExecutorService。AbstractExecutorService给出了ExecutorService接口的几个方法的实现,包括submit、invokeAll、invokeAny等。FutureTask实现了Future和Runnable,并且内部有Callable作为要执行的任务,它给出了Future接口几个方法的实现,例如get、cancel等。这篇文章,我们来看最重要的线程池ThreadPoolExecutor。
execute方法
在讲AbstractExecutorService的submit逻辑时,我们知道它构造了FutureTask,然后调用execute方法。ThreadPoolExecutor的重要逻辑也是execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
首先,它会检查当前的线程数量是否小于设置的核心线程数,如果小于,就会调用addWorker方法添加线程,并将参数Runnable作为该线程运行的第一个task(线程池中的线程会执行多个Runnable)。我们来看一下它是怎么判断当前的线程数量的。
线程池状态和线程数量表示
ThreadPoolExecutor使用一个AtomicInteger来记录线程池的运行状态和线程池的活跃线程数。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
一个int类型要记录两个值,就需要进行位拆分。它的高三位用来表示线程池的运行状态,低29位表示线程的数量。所以线程数量最多是2的29次方-1。线程池的运行状态有以下几种:
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
COUNT_BITS:
private static final int COUNT_BITS = Integer.SIZE - 3;
是29。
所以线程的状态用三位二进制表示分别是:
111 000 001 010 011
execute方法中的workerCountOf方法就是取AtomicInteger的后29位的值:
private static int workerCountOf(int c) { return c & CAPACITY; }
CAPACITY:
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
addWorker增加新的线程并在新的线程中执行任务
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
首先,它会在一个不断的循环中增加线程数量,直到增加成功为止。然后,它创建了一个新的Worker对象,Worker是ThreadPoolExecutor的一个内部类,它的定义如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{}
它是一个同步器,同时还实现了Runnable接口,那它的作用是什么呢?
首先,我们来看它的字段:
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
有thread,说明Worker能够创建新的线程。有runnable说明它需要执行任务,并且每个worker中的线程会执行多个任务,所以这里的命名为firstTask,它是在创建Worker时给定的。completedTasks用来记录这个worker完成的任务数量。
构造方法:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
首先,它调用的是AQS的setState方法,将state设为-1。然后为firstTask和thread赋值,注意,这里创建的新的thread的target(内部的runnable)是当前worker自己(Worker实现了runnable),所以,如果调用thread.start方法,它会执行worker的run方法。而worker的run方法就有了执行任务的逻辑:
public void run() {
runWorker(this);
}
run方法调用的是ThreadPoolExecutor的runWorker方法,参数是当前worker自己:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
重要的逻辑就是会在一个循环中不断地取下一个任务,然后执行。注意,因为runWorker是Worker的run方法调用的,所以执行的线程是Worker的thread,这样就实现了在新创建的线程中执行任务的逻辑。
我们再回去看addWorker方法,它创建worker之后,最后会调用:
t.start();
t:
w = new Worker(firstTask);
final Thread t = w.thread;
就是worker内部的线程。
所以worker这个类一方面新建线程,另一方面包含了在新建的线程中执行任务的逻辑。
我们继续看execute的逻辑:
核心线程达到限制之后
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
如果核心线程数已满,就会将任务加入队列。如果队列也满了,就会调用addWorker(command, false)继续增加核心线程以外的线程,如果增加失败,说明达到最大线程数,就会执行拒绝策略。
四种拒绝策略
当任务队列已满并且最大线程数量已经达到限制之后,继续提交的任务就需要使用拒绝策略来处理。RejectedExecutionHandler接口表示拒绝策略,它的rejectedExecution方法用来执行拒绝策略 。要想提供自己的拒绝策略,只需要实现这个接口然后在rejectedExecution方法中实现拒绝逻辑即可。
ThreadPooleExecutor类默认提供了四种拒绝策略:分别是CallerRunsPolicy、AbortPolicy、DiscardPolicy和DiscardOldestPolicy。我们来看一下这几个策略的拒绝逻辑。
CallerRunsPolicy
这个拒绝策略是让执行execute方法的线程来执行任务。上面我们讲execute方法的时候可以看到,ThreadPoolExecutor会让线程池中的线程来执行任务,线程池中的线程与执行execute方法的线程并不是同一个线程。它的实现很简单:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
在rejectedExecution方法中,只是简单的调用了runnable.run方法,因为调用rejectedExecution方法的线程就是调用execute方法的线程。
AbortPolicy
AbortPolicy是线程池默认的拒绝策略,它会直接抛出一个RejectedExecutionException异常:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
DiscardPolicy
DiscardPolicy会直接丢弃任务,不进行处理,也不抛出异常,所以它的rejectedExecution方法是空的:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
DiscardOldestPolicy
DiscardOldestPolicy会丢弃下一个要执行的任务,然后尝试执行新的任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
可以看到它会将队列中的下一个任务弹出。所以如果我们使用的是优先级队列,那么这个策略会丢弃优先级最高的任务,所以一般不要将这个策略与优先级队列一起使用。
小结
这篇文章,我们讲解了线程池的执行过程,重点分析了它的execute方法的逻辑。其中涉及到了Worker的使用、线程池运行状态的判断、线程数量的表示等,然后罗列了四种拒绝策略的拒绝逻辑。下一篇文章,我们来看Executors默认给出的几种线程池以及它们的优缺点。