• Fork/Join主要用来解决什么样的问题?
  • Fork/Join框架是在哪个JDK版本中引入的?
  • Fork/Join框架主要包含哪三个模块? 模块之间的关系是怎么样的?
  • ForkJoinPool类继承关系?
  • ForkJoinTask抽象类继承关系? 在实际运用中,我们一般都会继承 RecursiveTask 、RecursiveAction 或 CountedCompleter 来实现我们的业务需求,而不会直接继承 ForkJoinTask 类。
  • 整个Fork/Join 框架的执行流程/运行机制是怎么样的?
  • 具体阐述Fork/Join的分治思想和work-stealing 实现方式?
  • 有哪些JDK源码中使用了Fork/Join思想?
  • 如何使用Executors工具类创建ForkJoinPool?
  • 写一个例子: 用ForkJoin方式实现1+2+3+…+100000?
  • Fork/Join在使用时有哪些注意事项? 结合JDK中的斐波那契数列实例具体说明。

一、 介绍

1.1、ForkJoin 简介

ForkJoinPool 是JDK 7加入的一个线程池类。Fork/Join 技术是分治算法(Divide-and-Conquer)的并行实现,它是一项可以获得良好的并行性能的简单且高效的设计技术。目的是为了帮助我们更好地利用多处理器带来的好处,使用所有可用的运算能力来提升应用的性能。

1.2、ForkJoin 模块

ForkJoinPool可以通过池中的ForkJoinWorkerThread来处理ForkJoinTask任务

1.2.1 任务对象: ForkJoinTask

  • RecursiveTask

RecursiveTask 是 ForkJoinTask 的子类,是一个可以递归执行的 ForkJoinTask,

  • RecursiveAction

RecursiveAction 是一个无返回值的 RecursiveTask

  • CountedCompleter

CountedCompleter 在任务完成执行后会触发执行一个自定义的钩子函数。

1.2.2、执行Fork/Join任务的线程: ForkJoinWorkerThread

1.2.3、线程池: ForkJoinPool

1.3、ForkJoin 核心思想

1.3.1、分治算法(Divide-and-Conquer)

分治算法(Divide-and-Conquer)把任务递归的拆分为各个子任务,这样可以更好的利用系统资源,尽可能的使用所有可用的计算能力来提升应用性能。首先看一下 Fork/Join 框架的任务运行机制:
并发编程之ForkJoin 框架 - 图1

1.3.2、work-stealing(工作窃取)算法

work-stealing(工作窃取)算法: 线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。这种特性使得 ForkJoinPool 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。尤其是构建异步模型的 ForkJoinPool 时,对不需要合并(join)的事件类型任务也非常适用。

在 ForkJoinPool 中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务。

具体思路如下:

  • 每个线程都有自己的一个WorkQueue,该工作队列是一个双端队列。
  • 队列支持三个功能push、pop、poll
  • push/pop只能被队列的所有者线程调用,而poll可以被其他线程调用。
  • 划分的子任务调用fork时,都会被push到自己的队列中。
  • 默认情况下,工作线程从自己的双端队列获出任务并执行。
  • 当自己的队列为空时,线程随机从另一个线程的队列末尾调用poll方法窃取任务。

二、ForkJoin 使用

2.1、继承RecursiveTask,实现累加

  1. class SumTask extends RecursiveTask<Integer> {
  2. private static final long serialVersionUID = 1L;
  3. final int start; //开始计算的数
  4. final int end; //最后计算的数
  5. SumTask(int start, int end) {
  6. this.start = start;
  7. this.end = end;
  8. }
  9. @Override
  10. protected Integer compute() {
  11. //如果计算量小于1000,那么分配一个线程执行if中的代码块,并返回执行结果
  12. if (end - start < 1000) {
  13. System.out.println(Thread.currentThread().getName() + " 开始执行: " + start + "-" + end);
  14. int sum = 0;
  15. for (int i = start; i <= end; i++) {
  16. sum += i;
  17. }
  18. return sum;
  19. }
  20. //如果计算量大于1000,那么拆分为两个任务
  21. SumTask task1 = new SumTask(start, (start + end) / 2);
  22. SumTask task2 = new SumTask((start + end) / 2 + 1, end);
  23. //执行任务
  24. invokeAll(task1, task2);
  25. //获取任务执行的结果
  26. return task1.join() + task2.join();
  27. }
  28. }

三、 源码分析

参考