概述

ForkJoinPoolJDK7 引入的线程池类。Fork/Join 技术是分治算法的并行实现,目的是帮助我们更好地利用多核处理器优势,以提升应用性能。

核心思想

三个模块

Fork/Join 框架主要包含三个模块:

  • 任务对象:ForkJoinTask,包括 RecursiveTaskRecursiveActionCountedCompleter
  • 执行 Fork/Join 任务的线程:ForkJoinWorkerThread
  • 线程池:ForkJoinPool

线程池 ForkJoinPool 通过池中的 ForkJoinWorkerThread 来处理 ForkJoinTask

编程模型

  1. 解决(问题):
  2. if 问题足够小:
  3. 直接解决问题 (顺序算法)
  4. else:
  5. for 部份 in 细分(问题)
  6. fork 子任务来解决(部份)
  7. join 在前面的循环中生成的所有子任务
  8. return 合并的结果

上面的是伪代码,它的含义是这样的:

  1. 如果一个问题不允许再细分,那么直接处理。
  2. 否则将问题拆分成多个独立的部分。然后 fork 新的子任务,每个子任务继续递归调用 solve 方法。
  3. 将所有子任务的结果做 join 操作,从子结果中计算当前任务拆分的结果。

ForkJoinPool 只处理 ForkJoinTask 类型的任务,当前也可以传入实现 RunnableCallable 的任务,只不过 ForkJoinPool 会将该任务包装成 ForkJoinTask 类型的对象。
RecursiveTaskForkJoinTask 的子类,它是一个可以递归执行的 Task。RecursiveAction 是一个无返回值的 RecursiveTaskCountedCompleter 在任务完成后会触发执行一个自定义的钩子函数。

分治思想

分治分治,分而治之,就是把一个复杂的问题分成多个独立的子问题,子问题又可以分成更小的子问题,直到问题不可拆分,就进行计算。然后收集所有子问题的解,经过层层合并操作,最后返回计算结果。
比如简单的归并排序也是一种 fork/join。

在计算机科学中,分治法是建基于多项分支递归的一种很重要的算法范型。字面上的解释是“分而治之”,就是把一个复杂的问题分成两个或更多的相同或相似的子问题,直到最后子问题可以简单的直接求解,原问题的解即子问题的解的合并。

工作窃取 work-stealing

线程池内的所有工作线程都尝试找到并执行已提交的任务,或者是被其它活动任务创建的子任务(如果不存在则阻塞等待)。这种特性使得 ForkJoinPool 在运行多个可以生成子任务的任务,或者是提交许多小任务时效率更高。尤其是构建异步模型的 ForkJoinPool 时,对不需要合并(join)的事件类型任务也非常适用。
ForkJoinPool 中,线程池中的每个工作线程 ForkJoinWorkerThread 都有自己的任务队列 WorkQueue,工作线程优先处理自己的任务队列里面的任务,队列进出有两种模式,分别是 LIFO (堆)和 FIFO (队列)顺序。然后以 FIFO 的顺序随机窃取其它队列中的任务。
具体过程如下:

  1. 每个工作线程 ForkJoinWorkerThread 有自己的任务队列 WorkQueue,队列支持 push、pop、poll 操作。push/pop 只能被队列的拥有线程所调用,而 poll 可以被其它线程调用。
  2. 划分的子任务调用 fork 时,会通过 push 方法添加到自己的队列中。
  3. 默认情况下,工作线程从自己的双端队列获得任务并执行任务。
  4. 当自己队列为空时,线程随机选择一个线程,调用该队列的 poll 方法,尝试从尾部窃取任务并执行。

工程实现:

  1. ForkJoinPool 使用数组保存所有的 WorkQueue
  2. 不是每个 WorkQueue 都有对应的 ForkJoinWorkerThread 工作线程。对于奇数的 WorkerQueue 是没有工作线程。
  3. WorkQueue 是一个双端队列,可以支持 LIFO 和 FIFO 操作。

    Fork/Join 执行流程

    Fork Join 执行流程.png

继承关系

ForkJoinPool 类结构.png

线程工厂接口:ForkJoinWorkerThreadFactory

内部线程工厂接口,用于创建工作线程 ForkJoinWorkerThread,实现类有三个,如下图所示:
ForkJoinWorkerThreadFactory.png

  • DefaultForkJoinWorkerThreadFactoryForkJoinPool 线程工厂默认实现类。
  • CommonPoolForkJoinWorkerThreadFactory
  • InnocuousForkJoinWorkerThreadFactory:无许可线程工厂,当系统变量中有系统安全管理相关属性时,默认使用这个工厂创建线程。

    WorkQueue

    ForkjoinPool 核心数据结构,使用双端队列实现 work-stealing 模式。内部存放 ForkJoinTask 对象,使用 @Contented 注解修改以防止伪共享。

    伪共享状态: 缓存系统中是以缓存行 (cache line) 为单位存储的。缓存行是 2 的整数幂个连续字节,一般为 32-256 个字节。最常见的缓存行大小是 64 个字节。当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享。

  • 工作线程在运行中产生的新的任务(通常是调用了 fork())时,此时可以把 WorkQueue 数据结构视为一个栈,新的任务会放入栈顶(top 端)。工作线程在处理自己工作队列的任务时,按照 LIFO 的顺序出队。

  • 工作线程在处理自己的工作队列同时,会尝试窃取一个任务(可能来自于刚刚提交到 pool 的任务,或是来自其它工作线程的任务队列),窃取的任务会放入队头(base 端),此时可以把 WorkQueue 的数据结构视为一个 FIFO 队列。

    WorkQueue 成员变量

    ```java // 初始队列容量:8192 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; // 最大队列容量:67108864 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

// 实例字段 volatile int scanState; // Woker状态, <0: inactive; odd:scanning int stackPred; // 记录前一个栈顶的ctl int nsteals; // 偷取任务数 int hint; // 记录偷取者索引,初始为随机索引 int config; // 池索引和模式 volatile int qlock; // 1: locked, < 0: terminate; else 0 volatile int base; // 下一个poll操作的索引(栈底/队列头) int top; // 下一个push操作的索引(栈顶/队列尾) ForkJoinTask[] array; // 任务数组 final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // 当前工作队列的工作线程,共享模式下为null volatile Thread parker; // 调用park阻塞期间为owner,其他情况为null volatile ForkJoinTask<?> currentJoin; // 记录被join过来的任务 volatile ForkJoinTask<?> currentSteal; // 记录从其他工作队列偷取过来的任务

  1. <a name="eFewg"></a>
  2. ## ForkJoinTask
  3. `ForkJoinTask` 是一个实现 `Future` 接口的**抽象类**,意味着其它线程可以阻塞等待获取 `ForkJoinTask` 的计算结果。它有三个实现类,如下图所示:<br />![ForkJoinTask.png](https://cdn.nlark.com/yuque/0/2021/png/105848/1635324834759-eb0423ca-6b32-45b7-9bd7-6c75430223af.png#clientId=ucd6300b3-81f9-4&from=drop&id=uf0bb66b2&margin=%5Bobject%20Object%5D&name=ForkJoinTask.png&originHeight=412&originWidth=1801&originalType=binary&ratio=1&size=32121&status=done&style=none&taskId=udb0109ee-0b0e-4033-a1b2-06d3a0d32d6)<br />我们先看看 `ForkJoinTask` 内部类:<br />![ForkJoinTask_2.png](https://cdn.nlark.com/yuque/0/2021/png/105848/1635325071797-16362540-5052-45b8-bb8b-130d73b01a63.png#clientId=ucd6300b3-81f9-4&from=drop&id=u11279e73&margin=%5Bobject%20Object%5D&name=ForkJoinTask_2.png&originHeight=242&originWidth=1161&originalType=binary&ratio=1&size=14673&status=done&style=none&taskId=u8eb7abb0-e6dd-4160-a9e6-a2561f1986f)
  4. - `ExceptionNode` 用于存储任务执行期间的异常信息的单向链表。
  5. - 其余 4 个是适配器。用于将实现了 Runnable 或 Callable 接口的任务提供适配类,将它们转换为 ForkJoinPool 可以执行的 `FutureTask`。
  6. `ForkJoinTask` 被 3 个抽象类:<br />![ForkJoinTask 实现类.png](https://cdn.nlark.com/yuque/0/2021/png/105848/1635325239317-6a957d3e-5e9b-4f38-bde4-8d45ea1c9ea0.png#clientId=ucd6300b3-81f9-4&from=drop&id=u05b63111&margin=%5Bobject%20Object%5D&name=ForkJoinTask%20%E5%AE%9E%E7%8E%B0%E7%B1%BB.png&originHeight=230&originWidth=660&originalType=binary&ratio=1&size=11255&status=done&style=none&taskId=u1a8dd591-c318-4ab8-a0d7-d3461e6efb1)<br />我们一般使用继承 `RecursiveTask` 抽象类,重写里面的 `compute()` 方法,实现自定义的分-治操作。
  7. <a name="JEeDK"></a>
  8. # 源码解析
  9. 我们最重点关注 3 大流程:
  10. 1. 任务提交流程:任务提交还可细分为外部任务提交和子任务提交。这里面的逻辑也是有区别的。
  11. 1. 任务执行流程。主要是讲解 `ForkJoinWorkerThread.run()` 和 `ForkJoinTask.doExec()` 这两部分。
  12. 1. 任务结果获取流程。主要是讲解 `ForkJoinTask.join()` 和 `ForkJoinTask.invoke()` 这两部分。
  13. <a name="FTTK9"></a>
  14. ## ForkJoinPool
  15. <a name="a86DD"></a>
  16. ### 成员变量
  17. ```java
  18. // 主控制参数,将 long 划分为四个区域,每个区域的含义如下图所示
  19. volatile long ctl;
  20. // 运行状态锁
  21. volatile int runState;
  22. // 队列模式,默认 FILO(高 16 位)|并行度(低 16 位)
  23. final int config;

ForkJoinPool ctl 变量含义.png
WorkQueue 共享的常量:

// Constants shared across ForkJoinPool and WorkQueue

// 限定参数
static final int SMASK = 0xffff;        //  低位掩码,也是最大索引位
static final int MAX_CAP = 0x7fff;      //  工作线程最大容量
static final int EVENMASK = 0xfffe;     //  偶数低位掩码
static final int SQMASK = 0x007e;       //  workQueues 数组最多64个槽位

// ctl 子域和 WorkQueue.scanState 的掩码和标志位
static final int SCANNING = 1;          // 标记是否正在运行任务
static final int INACTIVE = 1 << 31;    // 失活状态  负数
static final int SS_SEQ = 1 << 16;      // 版本戳,防止ABA问题

// ForkJoinPool.config 和 WorkQueue.config 的配置信息标记
static final int MODE_MASK = 0xffff << 16;  // 模式掩码
static final int LIFO_QUEUE = 0;             // LIFO队列
static final int FIFO_QUEUE = 1 << 16;        // FIFO队列
static final int SHARED_QUEUE = 1 << 31;    // 共享模式队列,负数

其它常量:

//  低位和高位掩码
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;

// 活跃线程数
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT; // 活跃线程数增量
private static final long AC_MASK = 0xffffL << AC_SHIFT; // 活跃线程数掩码

// 工作线程数
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT; // 工作线程数增量
private static final long TC_MASK = 0xffffL << TC_SHIFT; // 掩码
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);  // 创建工作线程标志

// 池状态
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;

实例字段:

volatile long ctl;                   // 主控制参数
volatile int runState;               // 运行状态锁
final int config;                    // 并行度|模式
int indexSeed;                       // 用于生成工作线程索引
volatile WorkQueue[] workQueues;     // 主对象注册信息,workQueue
final ForkJoinWorkerThreadFactory factory; // 线程工厂
final UncaughtExceptionHandler ueh;  // 每个工作线程的异常信息
final String workerNamePrefix;       // 用于创建工作线程的名称
volatile AtomicLong stealCounter;    // 偷取任务总数,也可作为同步监视器


/** 静态初始化字段 */
// 线程工厂
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;

// 启动或杀死线程的方法调用者的权限
private static final RuntimePermission modifyThreadPermission;

// 公共静态pool
static final ForkJoinPool common;

// 并行度,对应内部common池
static final int commonParallelism;

// 备用线程数,在tryCompensate中使用
private static int commonMaxSpares;

// 创建workerNamePrefix(工作线程名称前缀)时的序号
private static int poolNumberSequence;

// 线程阻塞等待新的任务的超时值(以纳秒为单位),默认2秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec

// 空闲超时时间,防止timer未命中
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms

// 默认备用线程数
private static final int DEFAULT_COMMON_MAX_SPARES = 256;

// 阻塞前自旋的次数,用在在awaitRunStateLock和awaitWork中
private static final int SPINS  = 0;

// indexSeed的增量
private static final int SEED_INCREMENT = 0x9e3779b9;

构造函数

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}

private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
  • parallelism: 并行度,默认为 CPU 核心数,最小为 1。
  • factory: 工作线程工厂,用来创建 ForkJoinWorkerThread 工作线程。
  • handler: 处理工作线程运行任务过程中出现未捕获异常,默认为 null。
  • asyncMode: 是否为异步模式,默认为 false。如果为true,表示子任务的执行遵循 FIFO 顺序并且任务不能被合并(join),这种模式适用于工作线程只运行事件类型的异步任务。

在大多数使用场景中,如果没有太强的业务要求,我们一般直接使用 ForkJoinPool 中的 Common 池,它的优点是我们可以通过指定系统参数的方式定义并行度线程工厂异常处理类。并且它使用的是同步模式,也就是说可以支持任务合并(join)。

提交任务

任务提交分为外部任务提交和子任务提交。

外部任务提交:external/submisstion

ForkJoinPool 提交任务有三种方式:

  1. invoke()等待任务计算完成并返回计算结果。同步阻塞。
  2. execute():直接向线程池提交一个任务,无返回结果。任务异步执行。
  3. submit():直接向线程池提交一个任务,有返回结果。任务异步执行。通过 FutureTask#get() 阻塞等待计算结果。

以上三种提交方式都是通过调用 externalPush() 方法实现。

externalPush

// java.util.concurrent.ForkJoinPool#externalPush
/**
 * 将任务(ForkJoinTask)提交到 submission 队列中。
 */
final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws; 
    WorkQueue q; 
    int m;
    // 用于随机计算 WorkQueue[] 槽位索引值
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && // 获取随机槽位对应的「WorkQueue」
        U.compareAndSwapInt(q, QLOCK, 0, 1)) { // 获取「WorkQueue」队列的锁

        ForkJoinTask<?>[] a; int am, n, s;
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            // 计算任务索引位置
            int j = ((am & s) << ASHIFT) + ABASE;

            // 将任务加入队列中
            U.putOrderedObject(a, j, task);

            // 更新 push slot
            U.putOrderedInt(q, QTOP, s + 1);

            // 释放锁
            U.putIntVolatile(q, QLOCK, 0);

            if (n <= 1)
                // 如果任务数小于 1,尝试创建或激活一个工作线程
                signalWork(ws, q);
            return;
        }

        // 释放锁
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }

    // 
    externalSubmit(task);
}
// java.util.concurrent.ForkJoinTask#fork
public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}