Ref: https://pdai.tech/md/java/thread/java-thread-x-juc-executor-ThreadPoolExecutor.html
带着问题去理解
- 为什么要有线程池?
- Java 是实现和管理线程池有哪些方式?请简单举例如何使用。
- 为什么很多公司不允许使用 Executors 去创建线程池?那么推荐怎么使用呢?
- ThreadPoolExecutor 有哪些核心的配置参数?请简要说明
- 由 Executors 基于 ThreadPoolExecutor 可以创建哪是哪三种线程池?
- 当队列满了并且 worker 的数量达到 maxSize 的时候,会怎么样?
- 说说 ThreadPoolExecutor 有哪些 RejectedExecutionHandler 策略?默认是什么策略?
- 简要说下线程池的任务执行机制?execute –> addWorker –>runworker (getTask)
- 线程池中任务是如何提交的?
- 线程池中任务是如何关闭的?
- 在配置线程池的时候需要考虑哪些配置因素?
-
为什么要有线程池
线程池能够对线程进行统一分配,调优和监控:
降低资源消耗 (线程无限制地创建,然后使用完毕后销毁)
- 提高响应速度 (无须创建线程)
- 提高线程的可管理性
ThreadPoolExecutor 例子
Java 是如何实现和管理线程池的?
从 JDK 5 开始,把工作单元与执行机制分离开来,工作单元包括 Runnable 和 Callable,而执行机制由 Executor 框架提供。 ```java import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class SimpleThreadPool {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
Runnable worker = new WorkerThread("" + i);
executor.execute(worker);
}
executor.shutdown(); // This will make the executor accept no new threads and finish all existing threads in the queue
while (!executor.isTerminated()) { // Wait until all threads are finish,and also you can use "executor.awaitTermination();" to wait
}
System.out.println("Finished all threads");
// ThreadPoolExecutor 提供了一些方法,我们可以使用这些方法来查询 executor 的当前状态,线程池大小,活动线程数量以及任务数量。
System.out.println(
String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
this.executor.getPoolSize(),
this.executor.getCorePoolSize(),
this.executor.getActiveCount(),
this.executor.getCompletedTaskCount(),
this.executor.getTaskCount(),
this.executor.isShutdown(),
this.executor.isTerminated()));
}
}
<a name="ZwpNC"></a>
## ThreadPoolExecutor 使用详解
其实 java 线程池的实现原理很简单,说白了就是一个线程集合 workerSet 和一个阻塞队列 workQueue。当用户向线程池提交一个任务 (也就是线程) 时,线程池会先将任务放入 workQueue 中。workerSet 中的线程会不断的从 workQueue 中获取线程然后执行。当 workQueue 中没有任务的时候,worker 就会阻塞,直到队列中有任务了就取出来继续执行。<br />
<a name="mQl7Z"></a>
### Execute 原理
当一个任务提交至线程池之后:
1. 线程池首先当前运行的线程数量是否少于 corePoolSize。如果是,则创建一个新的工作线程来执行任务。如果都在执行任务,则进入 2.
1. 判断 BlockingQueue 是否已经满了,倘若还没有满,则将线程放入 BlockingQueue。否则进入 3.
1. 如果创建一个新的工作线程将使当前运行的线程数量超过 maximumPoolSize,则交给 RejectedExecutionHandler 来处理任务。
当 ThreadPoolExecutor 创建新线程时,通过 CAS 来更新线程池的状态 ctl.<br /><br />图片来自:[玩转线程池](https://www.maishuren.top/archives/%E7%8E%A9%E8%BD%ACjava%E7%BA%BF%E7%A8%8B%E6%B1%A0%E4%B8%80threadpoolexecutor%E7%9A%84%E6%89%A7%E8%A1%8C%E6%B5%81%E7%A8%8B%E5%92%8C%E5%8E%9F%E7%90%86)
<a name="GqWG5"></a>
### 参数
```java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
- corePoolSize 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize, 即使有其他空闲线程能够执行新来的任务,也会继续创建线程;如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的 prestartAllCoreThreads () 方法,线程池会提前创建并启动所有核心线程。
workQueue 用来保存等待被执行的任务的阻塞队列。在 JDK 中提供了如下阻塞队列(BlockingQueue 详解):
- ArrayBlockingQueue: 基于数组结构的有界阻塞队列,按 FIFO 排序任务;
- LinkedBlockingQueue: 基于链表结构的阻塞队列,按 FIFO 排序任务,吞吐量通常要高于 ArrayBlockingQueue;
- SynchronousQueue: 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQuene;
- PriorityBlockingQueue: 具有优先级的无界阻塞队列;
LinkedBlockingQueue 比 ArrayBlockingQueue 在插入删除节点性能方面更优,但是二者在 put(), take() 任务的时均需要加锁,SynchronousQueue 使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是 Transfer.transfer().
maximumPoolSize 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize;当阻塞队列是无界队列,则 maximumPoolSize 则不起作用,因为无法提交至核心线程池的线程会一直持续地放入 workQueue.
- keepAliveTime 线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于 corePoolSize 时才有用,超过这个时间的空闲线程将被终止;
- unit keepAliveTime 的单位
- threadFactory 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为 DefaultThreadFactory.
- handler 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:
- AbortPolicy: 直接抛出异常,默认策略;
- CallerRunsPolicy: 用调用者所在的线程来执行任务;
- DiscardOldestPolicy: 丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy: 直接丢弃任务;
当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
三种类型
- newFixedThreadPool
线程池的线程数量达 corePoolSize 后,即使线程池没有可执行任务时,也不会释放线程。
FixedThreadPool 的工作队列为无界队列 LinkedBlockingQueue (队列容量为 Integer.MAX_VALUE), 这会导致以下问题:
- 线程池里的线程数量不超过 corePoolSize, 这导致了 maximumPoolSize 和 keepAliveTime 将会是个无用参数
- 由于使用了无界队列,所以 FixedThreadPool 永远不会拒绝,即饱和策略失效
- newSingleThreadExecutor
初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行。
由于使用了无界队列,所以 SingleThreadPool 永远不会拒绝,即饱和策略失效。
- newCachedThreadPool
线程池的线程数可达到 Integer.MAX_VALUE,即 2147483647,内部使用 SynchronousQueue 作为阻塞队列;
和 newFixedThreadPool 创建的线程池不同,newCachedThreadPool 在没有任务执行时,当线程的空闲时间超过 keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;
执行过程与前两种稍微不同:
- 主线程调用 SynchronousQueue 的 offer () 方法放入 task, 倘若此时线程池中有空闲的线程尝试读取 SynchronousQueue 的 task, 即调用了 SynchronousQueue 的 poll (), 那么主线程将该 task 交给空闲线程。否则执行。
当线程池为空或者没有空闲的线程,则创建新的线程执行任务. 执行完任务的线程倘若在 60s 内仍空闲,则会被终止。因此长时间空闲的 CachedThreadPool 不会持有任何线程资源。
关闭线程池
遍历线程池中的所有线程,然后逐个调用线程的 interrupt 方法来中断线程.
shutdown:将线程池里的线程状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程.
- shutdownNow:将线程池里的线程状态设置成 STOP 状态,然后停止所有正在执行或暂停任务的线程.
只要调用这两个关闭方法中的任意一个,isShutDown() 返回 true. 当所有任务都成功关闭了,isTerminated() 返回 true.
ThreadPoolExecutor 源码详解
几个关键属性
//这个属性是用来存放 当前运行的worker数量以及线程池状态的
//int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set来存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史达到的worker数最大值
private int largestPoolSize;
//当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存时间
private volatile long keepAliveTime;
//常驻worker的数量
private volatile int corePoolSize;
//最大worker的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;
内部状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中 AtomicInteger 变量 ctl 的功能非常强大:利用低 29 位表示线程池中线程数,通过高 3 位表示线程池的运行状态:
- RUNNING: -1 << COUNT_BITS,即高 3 位为 111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
- SHUTDOWN: 0 << COUNT_BITS,即高 3 位为 000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
- STOP : 1 << COUNT_BITS,即高 3 位为 001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
- TIDYING : 2 << COUNT_BITS,即高 3 位为 010, 所有的任务都已经终止;
- TERMINATED: 3 <<COUNT_BITS,即高 3 位为 011, terminated () 方法已经执行完成
任务的执行
execute –> addWorker –>runworker (getTask)
线程池的工作线程通过 Woker 类实现,在 ReentrantLock 锁的保证下,把 Woker 实例插入到 HashSet 后,并启动 Woker 中的线程。 从 Woker 类的构造方法实现可以发现:线程工厂在创建线程 thread 时,将 Woker 实例本身 this 作为参数传入,当执行 start 方法启动线程 thread 时,本质是执行了 Worker 的 runWorker 方法。 firstTask 执行完成之后,通过 getTask 方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask 方法会被阻塞并挂起,不会占用 cpu 资源;
execute()
图片来自:玩转线程池
ThreadPoolExecutor.execute(task) 实现了 Executor.execute(task)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建新线程执行command任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// double check: c, recheck
// 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// recheck and if necessary 回滚到入队操作前,即倘若线程池shutdown状态,就remove(command)
//如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务
if (!isRunning(recheck) && remove(command))
reject(command);
//线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 往线程池中创建新的线程失败,则reject任务
else if (!addWorker(command, false))
reject(command);
}
- 为什么需要 double check 线程池的状态?
在多线程环境下,线程池的状态时刻在变化,而 ctl.get () 是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将 command 加入 workqueue 是线程池之前的状态。倘若没有 double check,万一线程池处于非 running 状态 (在多线程环境下很有可能发生),那么 command 永远不会执行。
addWorker()
从方法 execute 的实现可以看出: addWorker 主要负责创建新的线程并执行任务。线程池创建新线程执行任务时,需要获取全局锁:
private final ReentrantLock mainLock = new ReentrantLock();
private boolean addWorker(Runnable firstTask, boolean core) {
// CAS更新线程池数量
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 线程池重入锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 线程启动,执行任务(Worker.thread(firstTask).start());
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
runWorker()
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 创建线程
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// ...
}
- 继承了 AQS 类,可以方便的实现工作线程的中止操作;
- 实现了 Runnable 接口,可以将自身作为一个任务在工作线程中执行;
- 当前提交的任务 firstTask 作为参数传入 Worker 的构造方法;
一些属性还有构造方法:
//运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker
final Thread thread;
//当一个worker刚创建的时候,就先尝试执行这个任务
Runnable firstTask;
//记录完成任务的数量
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行worker的run方法
this.thread = getThreadFactory().newThread(this);
}
runWorker 方法是线程池的核心:
- 线程启动之后,通过 unlock 方法释放锁,设置 AQS 的 state 为 0,表示运行可中断;
- Worker 执行 firstTask 或从 workQueue 中获取任务:
- 进行加锁操作,保证 thread 不被其他线程中断 (除非线程池被中断)
- 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
- 执行 beforeExecute
- 执行任务的 run 方法
- 执行 afterExecute 方法
- 解锁操作
通过 getTask 方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask 方法会被阻塞并挂起,不会占用 cpu 资源;
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 先执行firstTask,再从workerQueue中取task(getTask())
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask()
下面来看一下 getTask () 方法,这里面涉及到 keepAliveTime 的使用,从这个方法我们可以看出先吃池是怎么让超过 corePoolSize 的那部分 worker 销毁的。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling? keepAliveTime 起作用的关键
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// keepAliveTime 起作用的关键
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
allowCoreThreadTimeOut 为 false,线程即使空闲也不会被销毁;倘若为 ture,在 keepAliveTime 内仍空闲则会被销毁。
如果线程允许空闲等待而不被销毁 timed == false,workQueue.take 任务:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take 方法返回任务,并执行;
如果线程不允许无休止空闲 timed == true, workQueue.poll 任务:如果在 keepAliveTime 时间内,阻塞队列还是没有任务,则返回 null;
任务的提交
- submit 任务,等待线程池 execute
- 执行 FutureTask 类的 get 方法时,会把主线程封装成 WaitNode 节点并保存在 waiters 链表中, 并阻塞等待运行结果;
FutureTask 任务执行完成后,通过 UNSAFE 设置 waiters 相应的 waitNode 为 null,并通过 LockSupport 类 unpark 方法唤醒主线程;
public class Test{ public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); Future<String> future = es.submit(new Callable<String>() { @Override public String call() throws Exception { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "future result"; } }); try { String result = future.get(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } } }
在实际业务场景中,Future 和 Callable 基本是成对出现的,Callable 负责产生结果,Future 负责获取结果。
Callable 接口类似于 Runnable,只是 Runnable 没有返回值。
- Callable 任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即 Future 可以拿到异步执行任务各种结果;
- Future.get 方法会导致主线程阻塞,直到 Callable 任务执行完成;
submit 方法
AbstractExecutorService.submit () 实现了 ExecutorService.submit () 可以获取执行完的返回值,而 ThreadPoolExecutor 是 AbstractExecutorService.submit () 的子类,所以 submit 方法也是 ThreadPoolExecutor 的方法。 ```java // submit()在ExecutorService中的定义Future submit(Callable task);
Future<?> submit(Runnable task);
// submit方法在AbstractExecutorService中的实现
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。
RunnableFuture
通过 submit 方法提交的 Callable 任务会被封装成了一个 FutureTask 对象。通过 Executor.execute 方法提交 FutureTask 到线程池中等待被执行,最终执行的是 FutureTask 的 run 方法;
<a name="A0JzI"></a>
#### FutureTask 对象
public class **FutureTask**<V> implements **RunnableFuture**<V> 可以将 FutureTask 提交至线程池中等待被执行 (通过 FutureTask 的 run 方法来执行)
<a name="PoDDB"></a>
##### 内部状态
内部状态通过 sun.misc.Unsafe 修改
```java
/* The run state of this task, initially NEW.
* ...
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
get()
通过 awaitDone 方法对主线程进行阻塞
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
- 如果主线程被中断,则抛出中断异常;
- 判断 FutureTask 当前的 state,如果大于 COMPLETING,说明任务已经执行完成,则直接返回;
- 如果当前 state 等于 COMPLETING,说明任务已经执行完,这时主线程只需通过 yield 方法让出 cpu 资源,等待 state 变成 NORMAL;
- 通过 WaitNode 类封装当前线程,并通过 UNSAFE 添加到 waiters 链表;
最终通过 LockSupport 的 park 或 parkNanos 挂起线程;
run()
在线程池中被执行的,而非主线程:
通过执行 Callable 任务的 call 方法;
- 如果 call 执行成功,则通过 set 方法保存结果;
- 如果 call 执行有异常,则通过 setException 保存异常;
run() -> set() -> finishComplete()
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 设置异常完成结果
setException(ex);
}
if (ran)
// 设置正常完成结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING; 挨个唤醒阻塞线程
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
任务的关闭
shutdown 方法会将线程池的状态设置为 SHUTDOWN, 线程池进入这个状态后,就拒绝再接受任务,然后会将剩余的任务全部执行完。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否可以关闭线程
checkShutdownAccess();
//设置线程池状态
advanceRunState(SHUTDOWN);
//尝试中断worker
interruptIdleWorkers();
//预留方法,留给子类实现
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有的worker
for (Worker w : workers) {
Thread t = w.thread;
//先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它
//注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能
//它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow 做的比较绝,它先将线程池状态设置为 STOP,然后拒绝所有提交的任务。最后中断所有正在运行中的 worker, 然后清空任务队列。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//检测权限
advanceRunState(STOP);
//中断所有的worker
interruptWorkers();
//清空任务队列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有worker,然后调用中断方法
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
更深入理解
为什么线程池不允许使用 Executors 去创建?推荐方式是什么?
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 各个方法的弊端:
- newFixedThreadPool 和 newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
- newCachedThreadPool 和 newScheduledThreadPool: 主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。
commons-lang3 包
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor( 1, new BasicThreadFactory.Builder() .namingPattern("example-schedule-pool-%d") .daemon(true) .build() );
com.google.guava 包 ```java ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat(“demo-pool-%d”) .build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(
5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
// excute pool.execute(()-> System.out.println(Thread.currentThread().getName()));
//gracefully shutdown pool.shutdown();
3. spring 配置线程池
自定义线程工厂 bean 需要实现 ThreadFactory,可参考该接口的其它默认实现类,使用方式直接注入 bean 调用 execute (Runnable task) 方法即可。
```xml
<bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
//in code
userThreadPool.execute(thread);
自定义线程池
public class ExecutorServiceHelper {
/**
* 一般IO类型任务 CPU 加速比为10, 这里保守一点设置为5
*/
private static final int POOL_SIZE = Runtime.getRuntime().availableProcessors() * 5;
/**
* 最大等待时间(单位:毫秒)
*/
private static final int MAX_WAIT_TIME = 150;
/**
* 接口平均RT(单位:毫秒)
*/
private static final int AVG_RT = 15;
/**
* 任务队列大小
*/
private static final int QUEUE_SIZE = POOL_SIZE * MAX_WAIT_TIME / AVG_RT * 2;
public static final ExecutorService EXECUTORS = new ThreadPoolExecutor(
POOL_SIZE, POOL_SIZE, 0,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE),
new NamedThreadFactory("concurrent-executor", true),
new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 异步处理线程池
*/
public final static ExecutorService ASYNC_EXECUTOR = new ThreadPoolExecutor(10, 20,
10000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 异步处理线程池
*/
public final static ExecutorService CATEGORY_EXECUTOR = new ThreadPoolExecutor(5, 10,
10000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
}
配置线程池需要考虑因素
考虑因素:
- 任务的优先级:PriorityBlockingQueue
- 任务的执行时间长短:执行时间不同的任务,可以拆给不同规模的线程池。
- 任务的性质:CPU 密集型尽可能少的线程 Ncpu+1;IO 密集型尽可能多的线程 Ncpu*2,比如数据库连接池;混合型 CPU 密集型的任务与 IO 密集型任务的执行时间差别较小,拆分为两个线程池;否则没有必要拆分。
- 任务的依赖关系:线程提交 SQL 后需要等待数据库返回结果,CPU 处于空闲等待,线程越多 CPU 利用率越高。
监控线程池的状态
可以使用 ThreadPoolExecutor 以下方法:
- getTaskCount(): Returns the approximate total number of tasks that have ever been scheduled for execution.
- getCompletedTaskCount(): Returns the approximate total number of tasks that have completed execution. 返回结果少于 getTaskCount ()。
- getLargestPoolSize(): Returns the largest number of threads that have ever simultaneously been in the pool. 返回结果小于等于 maximumPoolSize
- getPoolSize(): Returns the current number of threads in the pool.
- getActiveCount(): Returns the approximate number of threads that are actively executing tasks.