- Fork/Join主要用来解决什么样的问题?
- Fork/Join框架是在哪个JDK版本中引入的?
- Fork/Join框架主要包含哪三个模块? 模块之间的关系是怎么样的?
- ForkJoinPool类继承关系?
- ForkJoinTask抽象类继承关系? 在实际运用中,我们一般都会继承 RecursiveTask 、RecursiveAction 或 CountedCompleter 来实现我们的业务需求,而不会直接继承 ForkJoinTask 类。
- 整个Fork/Join 框架的执行流程/运行机制是怎么样的?
- 具体阐述Fork/Join的分治思想和work-stealing 实现方式?
- 有哪些JDK源码中使用了Fork/Join思想?
- 如何使用Executors工具类创建ForkJoinPool?
- 写一个例子: 用ForkJoin方式实现1+2+3+…+100000?
- Fork/Join在使用时有哪些注意事项? 结合JDK中的斐波那契数列实例具体说明。
一、 介绍
1.1、ForkJoin 简介
ForkJoinPool 是JDK 7加入的一个线程池类。Fork/Join 技术是分治算法(Divide-and-Conquer)的并行实现,它是一项可以获得良好的并行性能的简单且高效的设计技术。目的是为了帮助我们更好地利用多处理器带来的好处,使用所有可用的运算能力来提升应用的性能。
1.2、ForkJoin 模块
ForkJoinPool可以通过池中的ForkJoinWorkerThread来处理ForkJoinTask任务
1.2.1 任务对象: ForkJoinTask
- RecursiveTask
RecursiveTask 是 ForkJoinTask 的子类,是一个可以递归执行的 ForkJoinTask,
- RecursiveAction
RecursiveAction 是一个无返回值的 RecursiveTask
- CountedCompleter
CountedCompleter 在任务完成执行后会触发执行一个自定义的钩子函数。
1.2.2、执行Fork/Join任务的线程: ForkJoinWorkerThread
1.2.3、线程池: ForkJoinPool
1.3、ForkJoin 核心思想
1.3.1、分治算法(Divide-and-Conquer)
分治算法(Divide-and-Conquer)把任务递归的拆分为各个子任务,这样可以更好的利用系统资源,尽可能的使用所有可用的计算能力来提升应用性能。首先看一下 Fork/Join 框架的任务运行机制:
1.3.2、work-stealing(工作窃取)算法
work-stealing(工作窃取)算法: 线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。这种特性使得 ForkJoinPool 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。尤其是构建异步模型的 ForkJoinPool 时,对不需要合并(join)的事件类型任务也非常适用。
在 ForkJoinPool 中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务。
具体思路如下:
- 每个线程都有自己的一个WorkQueue,该工作队列是一个双端队列。
- 队列支持三个功能push、pop、poll
- push/pop只能被队列的所有者线程调用,而poll可以被其他线程调用。
- 划分的子任务调用fork时,都会被push到自己的队列中。
- 默认情况下,工作线程从自己的双端队列获出任务并执行。
- 当自己的队列为空时,线程随机从另一个线程的队列末尾调用poll方法窃取任务。
二、ForkJoin 使用
2.1、继承RecursiveTask,实现累加
class SumTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
final int start; //开始计算的数
final int end; //最后计算的数
SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
//如果计算量小于1000,那么分配一个线程执行if中的代码块,并返回执行结果
if (end - start < 1000) {
System.out.println(Thread.currentThread().getName() + " 开始执行: " + start + "-" + end);
int sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
}
//如果计算量大于1000,那么拆分为两个任务
SumTask task1 = new SumTask(start, (start + end) / 2);
SumTask task2 = new SumTask((start + end) / 2 + 1, end);
//执行任务
invokeAll(task1, task2);
//获取任务执行的结果
return task1.join() + task2.join();
}
}