图解线程池的工作原理
线程池的重要类
Executor接口
代表线程池的接口,有一个execute()
方法,入参是Runnable类型对象。
通过分配一个线程进行处理执行。
ExecutorService接口
Executor
的子接口,相当于是一个线程池的接口,有销毁线程池等方法。
Executors辅助类
线程池的辅助工具类,辅助入口类,可以通过Executors快速创建需要的线程池,比如Executors._newSingleThreadExecutor()_
、Executors.newFixedThreadPool()
、Executors.newCachedThreadPool()
、Executors.newScheduledThreadPool()
ThreadPoolExecutor类
ExecutorService
接口的实现类,真正代表一个线程池的类,一般在Executors里创建一个线程池时,内部都是直接创建一个ThreadPoolExecutor的实例对象进行返回。
corePoolSize
:线程池中的核心线程数。maximumPoolSize
:线程池里允许创建的最大线程数keepAliveTime
:如果线程数量大于corePoolSize时,多出来的线程在空闲时间内会等待指定的keepAliveTime时间后自动释放掉。unit
:keepAliveTime的时间单位。workQueue
:通过ThreadPoolExecutor.execute()
方法扔进来的Runnable工作任务,如果已经到达了corePoolSize个数量都在处理任务时,多余的工作任务就会进入到workQueue进行排队。threadFactory
:如果需要创建新的线程放入线程池时,就通过这个线程工厂来创建线程。handler
:如果workQueue具有固定的大小,往队列里扔的任务数量超过队列大小,且已经有maximumPoolSize个线程负责处理任务,则新的任务就会使用handler的拒绝策略进行拒绝处理。public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池内的任务如何被提交
任务对应的就是Runnable对象,通过执行ThreadPoolExecutor.execute()
方法就可以处理工作任务。 ```java /*
- 分3步执行
- 如果工作线程的数量小于核心线程的数量,那新的工作任务请求过来时,
- 就会去创建一个新的线程去处理任务,直到创建的线程数量已经达到了核心线程数量。
- 具体的创建形式就是 addWorker *
- 在一个任务成功进入工作队列进行排队,需要检查是否能够添加一个线程来处理任务
否则,该任务就需要在工作队列中进行等待有空闲的线程来处理。
- 如果新的任务无法进入工作队列进行排队,此时就会创建新的线程处理新任务,
- 创建的线程数量不会大于最大线程数量maximumPoolSize。
如果创建线程失败,就会通过handler策略拒绝任务。 */
```java
// addWorker: 添加新的线程去处理任务
// workQueue.offer(command): 将任务放入到工作队列当中
// isRunning(): 线程池处理运行状态,新的任务就只能放入到工作队列当中,等待线程空闲后进行处理
public void execute(Runnable command) {
// 如果任务为空,直接抛出异常
if (command == null)
throw new NullPointerException();
// 原子变量ctl共同存储 线程状态+线程数量。int类型存储,高3位表示线程状态,后29位表示线程数量
int c = ctl.get();
// 判断当前的工作线程数量 是否小于核心线程数量,如果小于,则添加新的线程去执行任务
if (workerCountOf(c) < corePoolSize) {
// true: 代表的是可以创建最大数量为corePoolSize内的线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池处于RUNNING状态,就将任务放入到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl的值,因为把任务添加到队列时,线程的状态可能已经改变,所以需要重新获取
int recheck = ctl.get();
// 线程状态不是RUNNING状态,则从队列中删除
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池一个空闲线程都没有,且队列已满,则添加一个非核心线程(对应maximumPoolSize)
else if (workerCountOf(recheck) == 0)
// false: 代表的是可以创建最大数量为maximumPoolSize内的线程
addWorker(null, false);
}
// 如果队列满了,则添加非核心线程,如果添加失败,则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
任务是如何被线程池消费的
基于Worker类,对应的就是工作线程,在这个类中可以知道工作线程是如何消费任务的。
Worker类实现了Runnable接口,当线程启动时,就会去执行run()
方法。通过run()
方法去消费一个任务;runWorker()
方法,内部会task.run()
去消费任务;通过getTask()
方法去任务队列当中获取任务,workQueue.take()
Runnable task = w.firstTask
, 工作线程负责处理的第一个任务task。 ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 获取任务队列中的任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
}
```