概述
Oracle官方对Fork/Join框架的定义原文如下
The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application. As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy. The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.
大致意思是,Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以将一个大任务通过递归分解成多个小任务并发执行,以此提高任务执行的效率。Fork/Join框架和ExecutorService一样需要将任务交给线程池来执行,但不同的是Fork/Join框架使用了工作窃取算法,即工作用尽的工作线程可以从其他仍很忙的线程中窃取任务。
fork / join框架的中心是ForkJoinPool类,它是AbstractExecutorService类的扩展。 ForkJoinPool实现了核心的工作窃取算法,并且可以执行ForkJoinTask进程。
思想
分治编程
分治算法 :把一个复杂的问题分成两个或更多的相同或相似的子问题,再把子问题分成更小的子问题……直到最后子问题可以简单的直接求解,原问题的解即子问题的解的合并。
利用分治法思想的编程方式称为分治编程,而Fork/Join框架就是使用了分治编程,通过设定阈值,对大任务分解(Fork)成小任务,再对每个小任务并发执行,将结果合并(Join)获得最终结果,提高任务执行的效率和CPU的使用率。

工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
那么为什么需要使用工作窃取算法呢?
当我们需要执行一个大任务时,我们使用分治将其分解成若干个小任务并发执行,为了减少线程间的竞争,使用不同的队列对应不同的线程,每个线程执行自己队列里的任务。
当有的线程已经完成自己队列里的任务时,该线程便空闲了,此时可以让它去帮其他线程,让它窃取其他线程队列里的任务来执行。
而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
基本类和接口
Fork/Join框架主要有以下重要的类
- ForkJoinPool
- ForkJoinTask
- RecursiveAction
- RecursiveTask
- ForkJoinWorkerThread
ForkJoinPool
ForkJoinPool类继承了AbstractExecutorService,该类是一个线程池,是Fork/Join框架的核心,保证了工作窃取算法和ForkJoinTask的正常工作。

public class ForkJoinPool extends AbstractExecutorService {//执行ForkJoinTask任务,无返回值,不阻塞public void execute(ForkJoinTask<?> task);//执行ForkJoinTask任务,有任务返回值,阻塞public <T> T invoke(ForkJoinTask<T> task);//执行ForkJoinTask任务,有任务返回值,不阻塞public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task);}
ForkJoinTask
ForkJoinTask是一个抽象任务类,其提供了两个主要方法fork()和join()。Fork(分解),Join(合并),依赖这两操作实现分治编程。
其有两个主要子类,一个是RecursiveTask,一个是RecursiveAction。
在使用时我们通常继承其子类,并实现其compute()方法来实现自己的分治逻辑。
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {//在当前任务正在运行的池中异步执行此任务public final ForkJoinTask<V> fork();//返回计算结果public final V join();}
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {//完成任务,有返回值protected abstract V compute();}
public abstract class RecursiveAction extends ForkJoinTask<Void> {//完成任务,无返回值protected abstract void compute();}
对于两个子类,它们有以下区别
- RecursiveTask是有返回值的任务,RecursiveAction是无返回值的任务
- RecursiveTask通常用于只fork不join的情形,RecursiveAction通常用于fork+join的情形
/*** @author Chavy* @date 2020/4/19 19:26*/public class ForkJoinDemo {/*** 10w大小的数组*/static int[] nums = new int[100000];static final int MAX_NUM = 5000;static Random random = new Random();static {for(int i = 0; i < nums.length; i++){nums[i] = random.nextInt(100);}System.out.println("sum == " + Arrays.stream(nums).sum());}static class AddTask extends RecursiveAction{int start, end;AddTask(int start, int end){this.start = start;this.end = end;}@Overrideprotected void compute() {if(end - start <= MAX_NUM){long sum = 0;for(int i = start; i < end; i++){sum += nums[i];}System.out.println("From " + start + " to " + end + " = " + sum);}else {int middle = start + (end - start) / 2;AddTask addTask1 = new AddTask(start, middle);AddTask addTask2 = new AddTask(middle, end);addTask1.fork();addTask2.fork();}}}static class AddTaskRec extends RecursiveTask<Long> {int start, end;AddTaskRec(int start, int end){this.start = start;this.end = end;}@Overrideprotected Long compute() {if(end - start <= MAX_NUM){long sum = 0;for(int i = start; i < end; i++){sum += nums[i];}return sum;}int middle = start + (end - start) / 2;AddTaskRec addTaskRec1 = new AddTaskRec(start, middle);AddTaskRec addTaskRec2 = new AddTaskRec(middle, end);addTaskRec1.fork();addTaskRec2.fork();try{return addTaskRec1.fork().get() + addTaskRec2.fork().get();}catch (ExecutionException | InterruptedException e){System.out.println("Error");}return null;}}public static void main(String[] args) {ForkJoinDemo forkJoinDemo = new ForkJoinDemo();ForkJoinPool forkJoinPool = new ForkJoinPool();AddTask addTask = new AddTask(0, nums.length);// forkJoinPool.execute(addTask);// addTask.join();AddTaskRec addTaskRec = new AddTaskRec(0, nums.length);forkJoinPool.execute(addTaskRec);System.out.println(addTaskRec.join());}}
上面就是一个例子,要计算一个数组内的数的总和,使用两个不同的任务子类来实现该Fork/Join的逻辑。
ForkJoinWorkerThread
ForkJoinWorkerThread线程是一种在Fork/Join框架中运行的特性线程,它除了具有普通线程的特性外,最主要的特点是每一个ForkJoinWorkerThread线程都具有一个独立的任务等待队列(work queue),这个任务队列用于存储在本线程中被拆分的若干子任务。
public class ForkJoinWorkerThread extends Thread {final ForkJoinPool pool; // the pool this thread works infinal ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics}
使用
这是一个使用Fork/Join框架求数组最大值的例子:
import java.util.concurrent.*;public class ForkJoinTest extends RecursiveTask<Integer> {//阈值设定为2private int threshold = 2;private int[] array;private int index0 = 0;private int index1 = 1;public ForkJoinTest(int[] array, int index0, int index1){this.array = array;this.index0 = index0;this.index1 = index1;}@Overrideprotected Integer compute() {//最大值的结果int max = Integer.MIN_VALUE;//分治法划分子任务逻辑//如果子任务已达阈值,则直接操作if ((index1 - index0) <= threshold) {for (int i = index0;i <= index1; i ++) {max = Math.max(max, array[i]);}} else {//fork/join//划分子任务int mid = index0 + (index1 - index0) / 2;ForkJoinTest lMax = new ForkJoinTest(array, index0, mid);ForkJoinTest rMax = new ForkJoinTest(array, mid + 1, index1);//将子任务在当前运行池中异步执行lMax.fork();rMax.fork();//获取返回结果int lm = lMax.join();int rm = rMax.join();max = Math.max(lm, rm);}return max;}public static void main(String ... args) throws ExecutionException, InterruptedException, TimeoutException {//创建ForkJoinPool线程池ForkJoinPool pool = new ForkJoinPool();int[] array = {100,400,200,90,80,300,600,10,20,-10,30,2000,1000};ForkJoinTest task = new ForkJoinTest(array, 0, array.length - 1);//将任务提交到线程池里Future<Integer> future = pool.submit(task);System.out.println("Result:" + future.get(1, TimeUnit.SECONDS));}}
从上面的例子里可以看出几点使用步骤:
- 先创建一个ForkJoinTask类,按情况继承RecursiveAction还是RecursiveTask,并实现其compute()方法
- compute方法里实现分治逻辑,主要使用if-else结构,规定阈值,达到阈值则执行,否则划分子任务,并将子任务fork()异步执行(若有结果返回,使用join获取结果)
- 创建ForkJoinPool线程池,提交任务给线程池并执行
- 获取处理Future对象结果
原理
属性
@sun.misc.Contendedpublic class ForkJoinPool extends AbstractExecutorService {private static final int RSLOCK = 1;private static final int RSIGNAL = 1 << 1;private static final int STARTED = 1 << 2;private static final int STOP = 1 << 29;private static final int TERMINATED = 1 << 30;private static final int SHUTDOWN = 1 << 31;volatile long ctl; // 控制中心,重要volatile int runState; // 运行状态,如上,SHUWDOWN是负数,其他是2的次方final int config; // 配置:并行度(parallelism),二进制的低16位代表int indexSeed; // 生成worker的queue索引volatile WorkQueue[] workQueues; // 主要保存工作队列的数组final ForkJoinWorkerThreadFactory factory;final UncaughtExceptionHandler ueh; // per-worker UEHfinal String workerNamePrefix; // to create worker name stringvolatile AtomicLong stealCounter; // also used as sync monitor/*** 主要构造器* parallelism*/private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);}}
WorkQueue
@sun.misc.Contendedstatic final class WorkQueue {//初始容量 2的13次方 32Mstatic final int INITIAL_QUEUE_CAPACITY = 1 << 13;//最大容量 2的26次方 64Mstatic final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M// Instance fieldsvolatile int scanState; // versioned, <0: inactive; odd:scanningint stackPred; // pool stack (ctl) predecessorint nsteals; // 窃取的任务数int hint; // 一个随机数,用来帮助任务窃取,在 helpXXXX()的方法中会用到int config; // 配置:二进制的低16位代表 在 queue[] 中的索引,// 高16位:mode可选FIFO_QUEUE(1 << 16)和LIFO_QUEUE(1 << 31),默认是LIFO_QUEUEvolatile int qlock; // 锁定标示位:1: locked, < 0: terminate; else 0volatile int base; // index of next slot for pollint top; // index of next slot for pushForkJoinTask<?>[] array; // ForkJoinTask的任务数组final ForkJoinPool pool; // 保存的线程池final ForkJoinWorkerThread owner; // owning thread or null if sharedvolatile Thread parker; // == owner during call to park; else nullvolatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoinvolatile ForkJoinTask<?> currentSteal; // mainly used by helpStealerWorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {this.pool = pool;this.owner = owner;// Place indices in the center of array (that is not yet allocated)base = top = INITIAL_QUEUE_CAPACITY >>> 1;}}
比较重要和核心的就是上面这个内部类 WorkQueue,它是一个数组实现的队列。它对应的外部传入的一个任务,内部使用了一个数组来保存自己Fork出来的子任务。另外 WorkQueue 里面也保存了从其他线程里窃取来的任务数量和任务本身,这就是上面提到的工作窃取算法第一次出现的地方了。
该队列支持三种形式的列操作:push、pop、poll(也叫steal),push和pop只能被队列内部持有的线程调用,poll可被其他线程偷取任务时调用。
final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;int b = base, s = top, n;//非空才操作,否则不管你if ((a = array) != null) { // ignore if queue removedint m = a.length - 1; // fenced write for task visibility//把任务添加进去,然后队顶加1U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);U.putOrderedInt(this, QTOP, s + 1);//如果队列内的元素只剩一件或更少,那就叫线程池干活if ((n = s - b) <= 1) {if ((p = pool) != null)p.signalWork(p.workQueues, this);}//如果元素大于数组长度,扩容else if (n >= m)growArray();}}//出队final ForkJoinTask<?> pop() {ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;//如果队列不空,循环将Top位值CAS操作置为空,成功便返回null值。if ((a = array) != null && (m = a.length - 1) >= 0) {for (int s; (s = top - 1) - base >= 0;) {long j = ((m & s) << ASHIFT) + ABASE;if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)break;if (U.compareAndSwapObject(a, j, t, null)) {U.putOrderedInt(this, QTOP, s);return t;}}}return null;}//窃取出队final ForkJoinTask<?> poll() {ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;//判断队列还有东西,数组不为空while ((b = base) - top < 0 && (a = array) != null) {int j = (((a.length - 1) & b) << ASHIFT) + ABASE;t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);//base还没被改变,且任务不为空,就CAS置null,拿走任务,基底加一if (base == b) {if (t != null) {if (U.compareAndSwapObject(a, j, t, null)) {base = b + 1;return t;}}//否则现在可能空咯,不做了else if (b + 1 == top) // now emptybreak;}}return null;}
由于 WorkQueue 使用了 CAS 引用操作 workerQueue 数组,所以在队列 base 和 top 不必使用标志位了。
对索引的更新保证了当 top==base 时代表这个队列为空,但是如果在push、pop或poll没有完全完成的情况下,可能出现即使 base==top 但队列为非空的错误情况(isEmpty方法可以检查“移除最后一个元素操作”部分完成的情况)。所以,单独考虑 poll 操作,它并不是 wait-free 的(无等待算法)。在一个线程正在偷取任务时,另外一个线程是无法完成偷取操作的。大体上讲,我们起码有一定概率保证了阻塞性。如果一个偷取操作失败,偷取线程会选择另外一个随机目标继续尝试。所以,为了保证偷取线程能够正常执行,它必须能够满足任何正在执行的对 queue 的 poll 或 push 操作都能完成(这就是为什么我们通常使用 pollAt 方法和它的变体,在已知的 base 索引中先尝试一次,然后再考虑可替代的操作,而不直接使用可以重试的 poll 方法)。
XorShifts算法
不懂,这个网上看到CV下来的 https://www.jianshu.com/p/32a15ef2f1bf
在上面提到了有对poll操作的变体的操作方法。然后我们回来这个队列,我们可以看到 workQueue 是一个 LIFO 队列,更像是栈这种数据结构。而这种方法也适用于FIFO。
FIFO 和 LIFO 这两种模式都不会考虑共用性、加载、缓存地址等,所以很少能在给定的机器上提供最好的性能,但通过对这些因素进行平均,可以提供良好的吞吐量。更进一步来讲,即使我们尝试使用这些信息,也没有能利用它的基础。例如,一些任务集从缓存共用中获取到良好的性能收益,但其他任务集会因此受到它的影响。另外,虽然队列中提供了扫描功能,但是从长远看来为了吞吐量通常最好使用随机选择,而非直接选择。所以,在ForkJoinPool 中我们也就使用了一种 XorShifts(一种随机算法,有些带有不同的偏移量) 随机算法。
WorkQueue 对于提交到池中的任务也使用类似的随机插入方式。我们不能把这些被工作线程使用的任务混合同一个队列中,所以,我们会使用一种随机哈希算法(有点类似ConcurrentHashMap的随机算法)将工作队列与工作线程关联起来。ThreadLocalRandom的probe(探针值)会为选中的已存在的队列提供一个哈希值,在与其他提交任务的线程(submitters)竞争时,也可以利用probe来随机移位。实际上,submitters 就像工作线程(worker)一样,只不过他们被限制只能执行它们提交的本地任务(CountedCompleter类型任务也不能执行)。在共享模式里提交的任务需要锁来控制(在扩容情况下提供保护),我们使用了一个简单的自旋锁(qlock字段),因为 submitters 遇到一个繁忙队列时会继续尝试提交其他队列或创建新的队列-只有在 submitters 创建和注册新队列时阻塞。另外,”qlock”会在 shutdown 时饱和到不可锁定值(-1),但是解锁操作依然可以执行。
externalPush()
现在就从头开始,像上面的demo一样,当我们调用了 ForkJoinPool 的一个 execute() 方法来执行我们的任务的时候,会发生什么呢?
/*** 只是一个判空操作,实际操作在externalPush() 方法里*/public void execute(ForkJoinTask<?> task) {if (task == null)throw new NullPointerException();externalPush(task);}
execute() 实际上只是做了个判空操作,真正的执行逻辑在 externalPush() 方法里。
final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;//计算一个哈希值,获取线程池状态int r = ThreadLocalRandom.getProbe();int rs = runState;//获取当前WorkQueue数组,判断数组已经被初始化且已有任务,并且按m & r & SQMASK上的元素不为空,//线程池正常运行,获取的随机数也不是0,且CAS上锁成功if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;//获取全部任务列表,判断是否为空,且数组长度大于队列长度if ((a = q.array) != null &&(am = a.length - 1) > (n = (s = q.top) - q.base)) {int j = ((am & s) << ASHIFT) + ABASE;//把该任务放在队列的top端U.putOrderedObject(a, j, task);//然后给Top向上移U.putOrderedInt(q, QTOP, s + 1);U.putIntVolatile(q, QLOCK, 0);//如果队列小于1,提醒开始干活了if (n <= 1)signalWork(ws, q);return;}//CAS解锁U.compareAndSwapInt(q, QLOCK, 1, 0);}//提交任务externalSubmit(task);}
