ForkJoinTask
ForkJoinTask是任务的实际载体,定义了任务执行时的具体逻辑和拆分逻辑。ForkJoinTask继承了Future接口。ForkJoinTask 是一个抽象类,最核心的是 fork() 方法和 join() 方法。
- fork,提交任务
- fork()方法用于向当前任务所运行的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。
- join(),获取任务执行结果
- join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果
public class LongSum extends RecursiveTask<Long> {// 任务拆分最小阈值static final int SEQUENTIAL_THRESHOLD = 20000000;int low;int high;int[] array;LongSum(int[] arr, int lo, int hi) {array = arr;low = lo;high = hi;}@Overrideprotected Long compute() {//当任务拆分到小于等于阀值时开始求和if (high - low <= SEQUENTIAL_THRESHOLD) {long sum = 0;for (int i = low; i < high; ++i) {sum += array[i];}return sum;} else { // 任务过大继续拆分int mid = low + (high - low) / 2;LongSum left = new LongSum(array, low, mid);LongSum right = new LongSum(array, mid, high);// 提交任务left.fork();right.fork();//获取任务的执行结果,将阻塞当前线程直到对应的子任务完成运行并返回结果long rightAns = right.compute();long leftAns = left.join();return leftAns + rightAns;}}}public class LongSumMain {// 获取逻辑处理器数量 12static final int NCPU = Runtime.getRuntime().availableProcessors();static long calcSum;public static void main(String[] args) throws Exception {//准备数组int[] array = Utils.buildRandomIntArray(100000000);Instant now = Instant.now();// 单线程计算数组总和calcSum = seqSum(array);System.out.println("seq sum=" + calcSum);System.out.println("执行时间:"+ Duration.between(now,Instant.now()).toMillis());//递归任务LongSum ls = new LongSum(array, 0, array.length);// 构建ForkJoinPoolForkJoinPool fjp = new ForkJoinPool(NCPU);now = Instant.now();//ForkJoin计算数组总和ForkJoinTask<Long> result = fjp.submit(ls);System.out.println("forkjoin sum=" + result.get());System.out.println("执行时间:"+ Duration.between(now,Instant.now()).toMillis());fjp.shutdown();now = Instant.now();//并行流计算数组总和Long sum = (Long) IntStream.of(array).asLongStream().parallel().sum();System.out.println("IntStream sum="+sum);System.out.println("执行时间:"+ Duration.between(now,Instant.now()).toMillis());}static long seqSum(int[] array) {long sum = 0;for (int i = 0; i < array.length; ++i) {sum += array[i];}return sum;}}

ForkJoinTask使用限制
ForkJoinTask最适合用于纯粹的计算任务,也就是纯函数计算,计算过程中的对象都是独立的,对外部没有依赖。提交到ForkJoinPool中的任务应避免执行阻塞I/O。
ForkJoinPool 的工作原理
- ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列,里面存放的对象是任务(ForkJoinTask)。
- 每个工作线程在运行中产生新的任务(通常是因为调用了 fork)时,会放入工作队列的top(队头),并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从top(队头)取出任务来执行。
- 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务,窃取的任务位于其他线程的工作队列的base(队尾),也就是说工作线程在窃取其他工作线程的任务时,使用的是FIFO 方式
- 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
- 在既没有自己的任务,也没有可以窃取的任务时,进入休眠 。
工作窃取
工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法,最大限度地减少了线程竞争任务的可能性。
- 工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争s
- 工作窃取算法缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列

工作队列WorkQueue
- WorkQueue 是双向列表,用于任务的有序执行
- 每个 ForkJoinWorkThread 都有属于自己的 WorkQueue,但不是每个 WorkQueue 都有对应的 ForkJoinWorkThread
- 没有 ForkJoinWorkThread 的 WorkQueue 保存的是 submission,来自外部提交,在WorkQueues[] 的下标是 偶数 位
ForkJoinWorkThread
ForkJoinWorkThread 是用于执行任务的线程,用于区别使用非 ForkJoinWorkThread 线程提交task。启动一个该 Thread,会自动注册一个 WorkQueue 到 Pool,拥有 Thread 的 WorkQueue 只能出现在 WorkQueues[] 的 奇数位
Fork/Join的优势
- 任务切分:将大的任务分割成更小粒度的小任务,让更多的线程参与执行
- 任务窃取:通过任务窃取,充分地利用空闲线程,并减少竞争
