前言
ForkJoinPool 并不是 ThreadPoolExecutor 的替代品,而是作为对 ThreadPoolExecutor 的补充
1. ThreadPool Exector
1.1 基本组成
线程池管理器(ThreadPool):创建、销毁、管理(添加新任务等);
- 工作线程(PoolWorker):线程池中线程,在无任务时处于等待状态,可循环执行任务;
- 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
任务队列(taskQueue):一种缓冲机制,存放待处理的任务;
1.2 工作方式
一个总任务队列,线程空闲时,从队列中认领工作,ThreadPool 允许线程重用,以减少线程创建与销毁次数,提高效率;
1.3 适用场景
数量少、任务由外部生成且相互独立、处理耗时、有时阻塞的任务;
2. ForkJoinPool Executor
2.1 基本组成
线程池管理器(ForkJoinPool):负责控制框架内 workerThread 数量、创建与激活,负责 workQueue 队列的创建和分配,即对一个 workerThread 分配相应的 workQueue,workerThread 处理 workQueue 中的任务;
- 工作线程(PoolWorker)ForkJoinWorkerThread:依附于 ForkJoinPool,首先在 ForkJoinPool 中注册(registerWorker),获取相应的 workQueue,然后从 workQueue 里面拿任务出来处理;
- 任务接口(ForkJoinTask):任务抽象接口,包括两个子类 RecursiveTask、RecursiveAction,两者区别在于 RecursiveTask 任务有返回值,RecursiveAction 无返回值,任务的具体切分和处理逻辑在 compute() 方法中;
- ForkJoinPool 实现了 ExecutorService 接口,可通过 submit、invokeAll 和 invokeAny 等方法来执行 Runnable 和 Callable 类型的任务;
- 任务队列(ForkJoinPool.WorkQueue): 属于 Thread ,存储接收任务,底层数据结构是 双端队列;
线程池工作队列(submitting queue) :属于 ForkJoinPool ,用于接收由外部线程(非 ForkJoinThread 线程)提交的任务;
fork / join : 任务分治,通过递归将任务分割成更小的子任务,其中阈值可自定义配置,将子任务分配给不同线程并发执行,最后收集结果;【单机的 map/reduce】
- fork :开启一个新线程或重用线程池内的空闲线程,将任务推入当前工作线程的工作队列里进行处理;
- join :等待该任务的处理线程处理完毕,获得返回值,并不是每个 join 都会造成,具体处理步骤如下:
- 1)检查调用 join() 的线程是否是 ForkJoinThread 线程,如果不是(eg main 线程),则阻塞当前线程,等待任务完成,如果是,则不阻塞;
- 2)查看任务的完成状态,如果已经完成,直接返回结果;
- 3)如果任务尚未完成,但处于自己的工作队列内,则完成它;
- 4)如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务;
- 5)如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务;
- 6)递归地执行第5步;
- workSteal : 允许空闲线程“窃取”分配给另一个线程的工作,高效地利用硬件资源;
- 本质:任务阻塞而线程不阻塞
- workSteal 的图示说明:
2.3 适用场景
数量多、处理耗时短、可生成子任务且其间存在父子依赖、几乎无阻塞(即计算密集型)任务
2.4 使用
数组求和实现(对比 ThreadPool 与 ForkJoin)
- 求和数据比较小的时候无需考虑哪种方式;
- 求和数据比较大的时候 java8 Stream API 是优于 for 循环的;
- 求和数据比较大的时候使用 ForkJoin 需要考虑临界值的设置,否则可能效率不如 for 循环和 java8 Stream; ```java import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; import java.util.stream.IntStream;
public class testForkJoin { public static void main(String[] args){ // testSumCompare(); testCompareCallableAndForkJoin(); } public static void testCompareCallableAndForkJoin(){ int limit = 1000; testLimitCompare(limit); limit = 100000; testLimitCompare(limit); limit = 1000000; testLimitCompare(limit); limit = 100000000; testLimitCompare(limit); /** output
* ************** start ***************
* ForkJoin 求和结果 : 500500
* forkJoin 求和,范围为 0 ~ 1000 , 耗时 : 6168056
* callable 求和结果 : 500500
* Callable 求和,范围为 0 ~ 1000 , 耗时 : 7199986
* ************** end ***************
* ************** start ***************
* ForkJoin 求和结果 : 705082704
* forkJoin 求和,范围为 0 ~ 100000 , 耗时 : 17508161
* callable 求和结果 : 705082704
* Callable 求和,范围为 0 ~ 100000 , 耗时 : 26749768
* ************** end ***************
* ************** start ***************
* ForkJoin 求和结果 : 1784293664
* forkJoin 求和,范围为 0 ~ 1000000 , 耗时 : 162355485
* callable 求和结果 : 1784293664
* Callable 求和,范围为 0 ~ 1000000 , 耗时 : 112935769
* ************** end ***************
* ************** start ***************
* ForkJoin 求和结果 : 987459712
* forkJoin 求和,范围为 0 ~ 100000000 , 耗时 : 448116141
* callable 求和结果 : 987459712
* Callable 求和,范围为 0 ~ 100000000 , 耗时 : 8536777831
* ************** end ***************
* analyse : under general cases, forkJoin seems better than Callable
*
*/
}
public static void testLimitCompare(int limit){
System.out.println("************** start ***************");
Long time_1 = calculateTime(limit, 1);
System.out.println("forkJoin 求和,范围为 0 ~ " + limit + " , 耗时 : " + time_1);
Long time_4 = calculateTime(limit, 4);
System.out.println("Callable 求和,范围为 0 ~ " + limit + " , 耗时 : " + time_4);
System.out.println("************** end ***************");
}
public static void testCallable(int to){
class TestCallable implements Callable {
private int from;
private int to;
public TestCallable(int from, int to){
this.from = from;
this.to = to;
}
@Override
public Integer call(){
int sum_value = 0;
for (int i = from; i <= to; i++) {
sum_value += i;
}
return sum_value;
}
}
ExecutorService executor = Executors.newFixedThreadPool(16);
List<Future<Integer>> arr = new ArrayList<>();
for (int i = 0; i < to/10; i++) {
arr.add(executor.submit(new TestCallable(i*10+1, (i+1)*10)));
}
executor.shutdown();
int sum_all = 0;
for (Future<Integer> a : arr) {
try {
sum_all += a.get();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("callable 求和结果 : " + sum_all);
}
public static void testRecursiveTask(int to){
ForkJoinPool pool = new ForkJoinPool(16);
class TestTask extends RecursiveTask<Integer> {
private int from;
private int to;
private int gap;
public TestTask(int from, int to, int gap){
this.from = from;
this.to = to;
this.gap = gap;
}
@Override
public Integer compute(){
int sum_value = 0;
if (to - from <= gap) {
for (int i = from; i <= to; i++) {
sum_value += i;
}
// System.out.println(“thread : “ + Thread.currentThread().getName() + “, start : “ + from + “, end : “ + to + “ , local_sum : “ + sum_value);
} else{
int middle = (from + to)>>1;
TestTask left = new TestTask(from, middle, gap);
TestTask right = new TestTask(middle + 1, to, gap);
left.fork();
right.fork();
sum_value = left.join() + (int)right.join();
}
return sum_value;
}
}
int gap = 5;
TestTask task = new TestTask(0, to, gap);
Future
* ************** start ***************
* ForkJoin 求和结果 : 12502500
* forkJoin 求和,范围为 0 ~ 5000 , 耗时 : 6708610
* stream 求和结果 : 12502500
* Stream 求和,范围为 0 ~ 5000 , 耗时 : 3206123
* foreach 求和结果 : 12502500
* foreach 求和,范围为 0 ~ 5000 , 耗时 : 121026
* ************** end ***************
* ************** start ***************
* ForkJoin 求和结果 : 1250025000
* forkJoin 求和,范围为 0 ~ 50000 , 耗时 : 38857513
* stream 求和结果 : 1250025000
* Stream 求和,范围为 0 ~ 50000 , 耗时 : 3958367
* foreach 求和结果 : 1250025000
* foreach 求和,范围为 0 ~ 50000 , 耗时 : 1468565
* ************** end ***************
* ************** start ***************
* ForkJoin 求和结果 : 1784293664
* forkJoin 求和,范围为 0 ~ 1000000 , 耗时 : 169632021
* stream 求和结果 : 1784293664
* Stream 求和,范围为 0 ~ 1000000 , 耗时 : 8787865
* foreach 求和结果 : 1784293664
* foreach 求和,范围为 0 ~ 1000000 , 耗时 : 7219230
* ************** end ***************
* ************** start ***************
* ForkJoin 求和结果 : 987459712
* forkJoin 求和,范围为 0 ~ 100000000 , 耗时 : 506719730
* stream 求和结果 : 987459712
* Stream 求和,范围为 0 ~ 100000000 , 耗时 : 51661052
* foreach 求和结果 : 987459712
* foreach 求和,范围为 0 ~ 100000000 , 耗时 : 54815428
* ************** end ***************
*/
}
public static void testWithinSum(int limit){
System.out.println("************** start ***************");
Long time_1 = calculateTime(limit, 1);
System.out.println("forkJoin 求和,范围为 0 ~ " + limit + " , 耗时 : " + time_1);
Long time_2 = calculateTime(limit, 2);
System.out.println("Stream 求和,范围为 0 ~ " + limit + " , 耗时 : " + time_2);
Long time_3 = calculateTime(limit, 3);
System.out.println("foreach 求和,范围为 0 ~ " + limit + " , 耗时 : " + time_3);
System.out.println("************** end ***************");
}
public static Long calculateTime(int limit, int type){
int[] a = IntStream.range(0, limit + 1).toArray();
Long start_1, end_1;
start_1 = System.nanoTime();
if (type == 1) {
testRecursiveTask(limit);
}
else if (type == 2) {
System.out.println("stream 求和结果 : " + Arrays.stream(a).reduce(0, (c,b)->c+b));
}
else if (type == 3) {
int sum_value = 0;
for (int i:a
) {
sum_value += i;
}
System.out.println("foreach 求和结果 : " + sum_value);
}
else if (type == 4) {
testCallable(limit);
}
end_1 = System.nanoTime();
return end_1 - start_1;
}
}
2.5 补充
invoke 、 submit 和 execute 的区别
stream 并行模式通过 fork/join 框架实现, 即 Stream API 实现了高性能的并发程序的封装;
3.2 Stream API 简单使用
参考
- Java-5 ThreadPoolExecutor比Java-7 ForkJoinPool有什么优势?
- 线程池与 forkjoin 比较
- Java 并发编程笔记:如何使用 ForkJoinPool 以及原理
- 爬虫 ForkJoinPool VS ExecutorService
- 窃取算法完整示例图 + 代码示例
- Java 多线程(5):Fork/Join 型线程池与 Work-Stealing 算法
- JCIP-39-Fork/Join 框架、工作窃取算法
- 较为完整 多线程并发之线程池Executor与Fork/Join框架