概述
ForkJoinPool
是 JDK7 引入的线程池类。Fork/Join 技术是分治算法的并行实现,目的是帮助我们更好地利用多核处理器优势,以提升应用性能。
核心思想
三个模块
Fork/Join 框架主要包含三个模块:
- 任务对象:
ForkJoinTask
,包括RecursiveTask
,RecursiveAction
和CountedCompleter
。 - 执行 Fork/Join 任务的线程:
ForkJoinWorkerThread
。 - 线程池:
ForkJoinPool
。
线程池 ForkJoinPool
通过池中的 ForkJoinWorkerThread
来处理 ForkJoinTask
。
编程模型
解决(问题):
if 问题足够小:
直接解决问题 (顺序算法)
else:
for 部份 in 细分(问题)
fork 子任务来解决(部份)
join 在前面的循环中生成的所有子任务
return 合并的结果
上面的是伪代码,它的含义是这样的:
- 如果一个问题不允许再细分,那么直接处理。
- 否则将问题拆分成多个独立的部分。然后 fork 新的子任务,每个子任务继续递归调用
solve
方法。 - 将所有子任务的结果做 join 操作,从子结果中计算当前任务拆分的结果。
ForkJoinPool
只处理 ForkJoinTask
类型的任务,当前也可以传入实现 Runnable
和 Callable
的任务,只不过 ForkJoinPool
会将该任务包装成 ForkJoinTask
类型的对象。RecursiveTask
是 ForkJoinTask
的子类,它是一个可以递归执行的 Task。RecursiveAction
是一个无返回值的 RecursiveTask
,CountedCompleter
在任务完成后会触发执行一个自定义的钩子函数。
分治思想
分治分治,分而治之,就是把一个复杂的问题分成多个独立的子问题,子问题又可以分成更小的子问题,直到问题不可拆分,就进行计算。然后收集所有子问题的解,经过层层合并操作,最后返回计算结果。
比如简单的归并排序也是一种 fork/join。
在计算机科学中,分治法是建基于多项分支递归的一种很重要的算法范型。字面上的解释是“分而治之”,就是把一个复杂的问题分成两个或更多的相同或相似的子问题,直到最后子问题可以简单的直接求解,原问题的解即子问题的解的合并。
工作窃取 work-stealing
线程池内的所有工作线程都尝试找到并执行已提交的任务,或者是被其它活动任务创建的子任务(如果不存在则阻塞等待)。这种特性使得 ForkJoinPool
在运行多个可以生成子任务的任务,或者是提交许多小任务时效率更高。尤其是构建异步模型的 ForkJoinPool
时,对不需要合并(join)的事件类型任务也非常适用。
在 ForkJoinPool
中,线程池中的每个工作线程 ForkJoinWorkerThread
都有自己的任务队列 WorkQueue,工作线程优先处理自己的任务队列里面的任务,队列进出有两种模式,分别是 LIFO
(堆)和 FIFO
(队列)顺序。然后以 FIFO
的顺序随机窃取其它队列中的任务。
具体过程如下:
- 每个工作线程
ForkJoinWorkerThread
有自己的任务队列WorkQueue
,队列支持 push、pop、poll 操作。push/pop 只能被队列的拥有线程所调用,而 poll 可以被其它线程调用。 - 划分的子任务调用
fork
时,会通过 push 方法添加到自己的队列中。 - 默认情况下,工作线程从自己的双端队列获得任务并执行任务。
- 当自己队列为空时,线程随机选择一个线程,调用该队列的 poll 方法,尝试从尾部窃取任务并执行。
工程实现:
ForkJoinPool
使用数组保存所有的WorkQueue
。- 不是每个
WorkQueue
都有对应的ForkJoinWorkerThread
工作线程。对于奇数的WorkerQueue
是没有工作线程。 WorkQueue
是一个双端队列,可以支持 LIFO 和 FIFO 操作。Fork/Join 执行流程
继承关系
线程工厂接口:ForkJoinWorkerThreadFactory
内部线程工厂接口,用于创建工作线程 ForkJoinWorkerThread
,实现类有三个,如下图所示:
DefaultForkJoinWorkerThreadFactory
:ForkJoinPool
线程工厂默认实现类。CommonPoolForkJoinWorkerThreadFactory
:InnocuousForkJoinWorkerThreadFactory
:无许可线程工厂,当系统变量中有系统安全管理相关属性时,默认使用这个工厂创建线程。WorkQueue
ForkjoinPool
核心数据结构,使用双端队列实现 work-stealing 模式。内部存放ForkJoinTask
对象,使用@Contented
注解修改以防止伪共享。伪共享状态: 缓存系统中是以缓存行 (cache line) 为单位存储的。缓存行是 2 的整数幂个连续字节,一般为 32-256 个字节。最常见的缓存行大小是 64 个字节。当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享。
工作线程在运行中产生的新的任务(通常是调用了
fork()
)时,此时可以把WorkQueue
数据结构视为一个栈,新的任务会放入栈顶(top 端)。工作线程在处理自己工作队列的任务时,按照LIFO
的顺序出队。- 工作线程在处理自己的工作队列同时,会尝试窃取一个任务(可能来自于刚刚提交到 pool 的任务,或是来自其它工作线程的任务队列),窃取的任务会放入队头(base 端),此时可以把
WorkQueue
的数据结构视为一个FIFO
队列。WorkQueue 成员变量
```java // 初始队列容量:8192 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; // 最大队列容量:67108864 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// 实例字段 volatile int scanState; // Woker状态, <0: inactive; odd:scanning int stackPred; // 记录前一个栈顶的ctl int nsteals; // 偷取任务数 int hint; // 记录偷取者索引,初始为随机索引 int config; // 池索引和模式 volatile int qlock; // 1: locked, < 0: terminate; else 0 volatile int base; // 下一个poll操作的索引(栈底/队列头) int top; // 下一个push操作的索引(栈顶/队列尾) ForkJoinTask[] array; // 任务数组 final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // 当前工作队列的工作线程,共享模式下为null volatile Thread parker; // 调用park阻塞期间为owner,其他情况为null volatile ForkJoinTask<?> currentJoin; // 记录被join过来的任务 volatile ForkJoinTask<?> currentSteal; // 记录从其他工作队列偷取过来的任务
<a name="eFewg"></a>
## ForkJoinTask
`ForkJoinTask` 是一个实现 `Future` 接口的**抽象类**,意味着其它线程可以阻塞等待获取 `ForkJoinTask` 的计算结果。它有三个实现类,如下图所示:<br /><br />我们先看看 `ForkJoinTask` 内部类:<br />
- `ExceptionNode` 用于存储任务执行期间的异常信息的单向链表。
- 其余 4 个是适配器。用于将实现了 Runnable 或 Callable 接口的任务提供适配类,将它们转换为 ForkJoinPool 可以执行的 `FutureTask`。
`ForkJoinTask` 被 3 个抽象类:<br /><br />我们一般使用继承 `RecursiveTask` 抽象类,重写里面的 `compute()` 方法,实现自定义的分-治操作。
<a name="JEeDK"></a>
# 源码解析
我们最重点关注 3 大流程:
1. 任务提交流程:任务提交还可细分为外部任务提交和子任务提交。这里面的逻辑也是有区别的。
1. 任务执行流程。主要是讲解 `ForkJoinWorkerThread.run()` 和 `ForkJoinTask.doExec()` 这两部分。
1. 任务结果获取流程。主要是讲解 `ForkJoinTask.join()` 和 `ForkJoinTask.invoke()` 这两部分。
<a name="FTTK9"></a>
## ForkJoinPool
<a name="a86DD"></a>
### 成员变量
```java
// 主控制参数,将 long 划分为四个区域,每个区域的含义如下图所示
volatile long ctl;
// 运行状态锁
volatile int runState;
// 队列模式,默认 FILO(高 16 位)|并行度(低 16 位)
final int config;
与 WorkQueue
共享的常量:
// Constants shared across ForkJoinPool and WorkQueue
// 限定参数
static final int SMASK = 0xffff; // 低位掩码,也是最大索引位
static final int MAX_CAP = 0x7fff; // 工作线程最大容量
static final int EVENMASK = 0xfffe; // 偶数低位掩码
static final int SQMASK = 0x007e; // workQueues 数组最多64个槽位
// ctl 子域和 WorkQueue.scanState 的掩码和标志位
static final int SCANNING = 1; // 标记是否正在运行任务
static final int INACTIVE = 1 << 31; // 失活状态 负数
static final int SS_SEQ = 1 << 16; // 版本戳,防止ABA问题
// ForkJoinPool.config 和 WorkQueue.config 的配置信息标记
static final int MODE_MASK = 0xffff << 16; // 模式掩码
static final int LIFO_QUEUE = 0; // LIFO队列
static final int FIFO_QUEUE = 1 << 16; // FIFO队列
static final int SHARED_QUEUE = 1 << 31; // 共享模式队列,负数
其它常量:
// 低位和高位掩码
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;
// 活跃线程数
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT; // 活跃线程数增量
private static final long AC_MASK = 0xffffL << AC_SHIFT; // 活跃线程数掩码
// 工作线程数
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT; // 工作线程数增量
private static final long TC_MASK = 0xffffL << TC_SHIFT; // 掩码
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // 创建工作线程标志
// 池状态
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;
实例字段:
volatile long ctl; // 主控制参数
volatile int runState; // 运行状态锁
final int config; // 并行度|模式
int indexSeed; // 用于生成工作线程索引
volatile WorkQueue[] workQueues; // 主对象注册信息,workQueue
final ForkJoinWorkerThreadFactory factory; // 线程工厂
final UncaughtExceptionHandler ueh; // 每个工作线程的异常信息
final String workerNamePrefix; // 用于创建工作线程的名称
volatile AtomicLong stealCounter; // 偷取任务总数,也可作为同步监视器
/** 静态初始化字段 */
// 线程工厂
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
// 启动或杀死线程的方法调用者的权限
private static final RuntimePermission modifyThreadPermission;
// 公共静态pool
static final ForkJoinPool common;
// 并行度,对应内部common池
static final int commonParallelism;
// 备用线程数,在tryCompensate中使用
private static int commonMaxSpares;
// 创建workerNamePrefix(工作线程名称前缀)时的序号
private static int poolNumberSequence;
// 线程阻塞等待新的任务的超时值(以纳秒为单位),默认2秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
// 空闲超时时间,防止timer未命中
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
// 默认备用线程数
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
// 阻塞前自旋的次数,用在在awaitRunStateLock和awaitWork中
private static final int SPINS = 0;
// indexSeed的增量
private static final int SEED_INCREMENT = 0x9e3779b9;
构造函数
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
- parallelism: 并行度,默认为 CPU 核心数,最小为 1。
- factory: 工作线程工厂,用来创建
ForkJoinWorkerThread
工作线程。 - handler: 处理工作线程运行任务过程中出现未捕获异常,默认为 null。
- asyncMode: 是否为异步模式,默认为 false。如果为true,表示子任务的执行遵循 FIFO 顺序并且任务不能被合并(join),这种模式适用于工作线程只运行事件类型的异步任务。
在大多数使用场景中,如果没有太强的业务要求,我们一般直接使用 ForkJoinPool
中的 Common
池,它的优点是我们可以通过指定系统参数的方式定义并行度、线程工厂和异常处理类。并且它使用的是同步模式,也就是说可以支持任务合并(join)。
提交任务
外部任务提交:external/submisstion
向 ForkJoinPool
提交任务有三种方式:
invoke()
:等待任务计算完成并返回计算结果。同步阻塞。execute()
:直接向线程池提交一个任务,无返回结果。任务异步执行。submit()
:直接向线程池提交一个任务,有返回结果。任务异步执行。通过FutureTask#get()
阻塞等待计算结果。
以上三种提交方式都是通过调用 externalPush()
方法实现。
externalPush
// java.util.concurrent.ForkJoinPool#externalPush
/**
* 将任务(ForkJoinTask)提交到 submission 队列中。
*/
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws;
WorkQueue q;
int m;
// 用于随机计算 WorkQueue[] 槽位索引值
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && // 获取随机槽位对应的「WorkQueue」
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // 获取「WorkQueue」队列的锁
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
// 计算任务索引位置
int j = ((am & s) << ASHIFT) + ABASE;
// 将任务加入队列中
U.putOrderedObject(a, j, task);
// 更新 push slot
U.putOrderedInt(q, QTOP, s + 1);
// 释放锁
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
// 如果任务数小于 1,尝试创建或激活一个工作线程
signalWork(ws, q);
return;
}
// 释放锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
//
externalSubmit(task);
}
// java.util.concurrent.ForkJoinTask#fork
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}