前言

  • ForkJoinPool 并不是 ThreadPoolExecutor 的替代品,而是作为对 ThreadPoolExecutor 的补充

    1. ThreadPool Exector

    1.1 基本组成

  • 线程池管理器(ThreadPool):创建、销毁、管理(添加新任务等);

  • 工作线程(PoolWorker):线程池中线程,在无任务时处于等待状态,可循环执行任务;
  • 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
  • 任务队列(taskQueue):一种缓冲机制,存放待处理的任务;

    1.2 工作方式

  • 一个总任务队列,线程空闲时,从队列中认领工作,ThreadPool 允许线程重用,以减少线程创建与销毁次数,提高效率;

ForkJoin 框架 - 图1

1.3 适用场景

  • 数量少、任务由外部生成且相互独立、处理耗时、有时阻塞的任务;

    2. ForkJoinPool Executor

    2.1 基本组成

  • 线程池管理器(ForkJoinPool):负责控制框架内 workerThread 数量、创建与激活,负责 workQueue 队列的创建和分配,即对一个 workerThread 分配相应的 workQueue,workerThread 处理 workQueue 中的任务;

  • 工作线程(PoolWorker)ForkJoinWorkerThread:依附于 ForkJoinPool,首先在 ForkJoinPool 中注册(registerWorker),获取相应的 workQueue,然后从 workQueue 里面拿任务出来处理;
  • 任务接口(ForkJoinTask):任务抽象接口,包括两个子类 RecursiveTask、RecursiveAction,两者区别在于 RecursiveTask 任务有返回值,RecursiveAction 无返回值,任务的具体切分和处理逻辑在 compute() 方法中;
    • ForkJoinPool 实现了 ExecutorService 接口,可通过 submit、invokeAll 和 invokeAny 等方法来执行 Runnable 和 Callable 类型的任务;
  • 任务队列(ForkJoinPool.WorkQueue): 属于 Thread ,存储接收任务,底层数据结构是 双端队列;
  • 线程池工作队列(submitting queue) :属于 ForkJoinPool ,用于接收由外部线程(非 ForkJoinThread 线程)提交的任务;

    • submit 操作:最初的任务是 push 到外部线程的 submitting queue 里的;
    • submit() 和 fork() 无本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操作),submitting queue 和其他 work queue 一样,是工作线程”窃取“的对象,因此当其中的任务被一个工作线程成功窃取时,就意味着提交的任务真正开始进入执行阶段;

      2.2 工作方式

  • fork / join : 任务分治,通过递归将任务分割成更小的子任务,其中阈值可自定义配置,将子任务分配给不同线程并发执行,最后收集结果;【单机的 map/reduce】

    • fork :开启一个新线程或重用线程池内的空闲线程,将任务推入当前工作线程的工作队列里进行处理;
    • join :等待该任务的处理线程处理完毕,获得返回值,并不是每个 join 都会造成,具体处理步骤如下:
      • 1)检查调用 join() 的线程是否是 ForkJoinThread 线程,如果不是(eg main 线程),则阻塞当前线程,等待任务完成,如果是,则不阻塞;
      • 2)查看任务的完成状态,如果已经完成,直接返回结果;
      • 3)如果任务尚未完成,但处于自己的工作队列内,则完成它;
      • 4)如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务;
      • 5)如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务;
      • 6)递归地执行第5步;
  • workSteal : 允许空闲线程“窃取”分配给另一个线程的工作,高效地利用硬件资源;
    • 本质:任务阻塞而线程不阻塞

ForkJoin 框架 - 图2

  • workSteal 的图示说明:

ForkJoin 框架 - 图3ForkJoin 框架 - 图4

2.3 适用场景

  • 数量多、处理耗时短、可生成子任务且其间存在父子依赖、几乎无阻塞(即计算密集型)任务

    2.4 使用

  • 数组求和实现(对比 ThreadPool 与 ForkJoin)

    • 求和数据比较小的时候无需考虑哪种方式;
    • 求和数据比较大的时候 java8 Stream API 是优于 for 循环的;
    • 求和数据比较大的时候使用 ForkJoin 需要考虑临界值的设置,否则可能效率不如 for 循环和 java8 Stream; ```java import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; import java.util.stream.IntStream;

public class testForkJoin { public static void main(String[] args){ // testSumCompare(); testCompareCallableAndForkJoin(); } public static void testCompareCallableAndForkJoin(){ int limit = 1000; testLimitCompare(limit); limit = 100000; testLimitCompare(limit); limit = 1000000; testLimitCompare(limit); limit = 100000000; testLimitCompare(limit); /** output

  1. * ************** start ***************
  2. * ForkJoin 求和结果 : 500500
  3. * forkJoin 求和,范围为 0 ~ 1000 , 耗时 6168056
  4. * callable 求和结果 500500
  5. * Callable 求和,范围为 0 ~ 1000 , 耗时 7199986
  6. * ************** end ***************
  7. * ************** start ***************
  8. * ForkJoin 求和结果 : 705082704
  9. * forkJoin 求和,范围为 0 ~ 100000 , 耗时 17508161
  10. * callable 求和结果 705082704
  11. * Callable 求和,范围为 0 ~ 100000 , 耗时 26749768
  12. * ************** end ***************
  13. * ************** start ***************
  14. * ForkJoin 求和结果 : 1784293664
  15. * forkJoin 求和,范围为 0 ~ 1000000 , 耗时 162355485
  16. * callable 求和结果 1784293664
  17. * Callable 求和,范围为 0 ~ 1000000 , 耗时 112935769
  18. * ************** end ***************
  19. * ************** start ***************
  20. * ForkJoin 求和结果 : 987459712
  21. * forkJoin 求和,范围为 0 ~ 100000000 , 耗时 448116141
  22. * callable 求和结果 987459712
  23. * Callable 求和,范围为 0 ~ 100000000 , 耗时 8536777831
  24. * ************** end ***************
  25. * analyse : under general cases, forkJoin seems better than Callable
  26. *
  27. */
  28. }
  29. public static void testLimitCompare(int limit){
  30. System.out.println("************** start ***************");
  31. Long time_1 = calculateTime(limit, 1);
  32. System.out.println("forkJoin 求和,范围为 0 ~ " + limit + " , 耗时 : " + time_1);
  33. Long time_4 = calculateTime(limit, 4);
  34. System.out.println("Callable 求和,范围为 0 ~ " + limit + " , 耗时 : " + time_4);
  35. System.out.println("************** end ***************");
  36. }
  37. public static void testCallable(int to){
  38. class TestCallable implements Callable {
  39. private int from;
  40. private int to;
  41. public TestCallable(int from, int to){
  42. this.from = from;
  43. this.to = to;
  44. }
  45. @Override
  46. public Integer call(){
  47. int sum_value = 0;
  48. for (int i = from; i <= to; i++) {
  49. sum_value += i;
  50. }
  51. return sum_value;
  52. }
  53. }
  54. ExecutorService executor = Executors.newFixedThreadPool(16);
  55. List<Future<Integer>> arr = new ArrayList<>();
  56. for (int i = 0; i < to/10; i++) {
  57. arr.add(executor.submit(new TestCallable(i*10+1, (i+1)*10)));
  58. }
  59. executor.shutdown();
  60. int sum_all = 0;
  61. for (Future<Integer> a : arr) {
  62. try {
  63. sum_all += a.get();
  64. } catch (Exception e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. System.out.println("callable 求和结果 : " + sum_all);
  69. }
  70. public static void testRecursiveTask(int to){
  71. ForkJoinPool pool = new ForkJoinPool(16);
  72. class TestTask extends RecursiveTask<Integer> {
  73. private int from;
  74. private int to;
  75. private int gap;
  76. public TestTask(int from, int to, int gap){
  77. this.from = from;
  78. this.to = to;
  79. this.gap = gap;
  80. }
  81. @Override
  82. public Integer compute(){
  83. int sum_value = 0;
  84. if (to - from <= gap) {
  85. for (int i = from; i <= to; i++) {
  86. sum_value += i;
  87. }

// System.out.println(“thread : “ + Thread.currentThread().getName() + “, start : “ + from + “, end : “ + to + “ , local_sum : “ + sum_value); } else{ int middle = (from + to)>>1; TestTask left = new TestTask(from, middle, gap); TestTask right = new TestTask(middle + 1, to, gap); left.fork(); right.fork(); sum_value = left.join() + (int)right.join(); } return sum_value; } } int gap = 5; TestTask task = new TestTask(0, to, gap); Future result = pool.submit(task); try { System.out.println(“ForkJoin 求和结果 : “ + result.get()); } catch (Exception e) { e.printStackTrace(); } pool.shutdown(); } public static void testSumCompare(){ // no perfect algorithm, just have to test and compare, please remember~ int limit = 5000; testWithinSum(limit); limit = 50000; testWithinSum(limit); limit = 1000000; testWithinSum(limit); limit = 100000000; testWithinSum(limit); /** output

     * ************** start ***************
     * ForkJoin 求和结果 : 12502500
     * forkJoin 求和,范围为 0 ~ 5000 , 耗时 : 6708610
     * stream 求和结果 : 12502500
     * Stream 求和,范围为 0 ~ 5000 , 耗时 : 3206123
     * foreach 求和结果 : 12502500
     * foreach 求和,范围为 0 ~ 5000 , 耗时 : 121026
     * ************** end ***************
     * ************** start ***************
     * ForkJoin 求和结果 : 1250025000
     * forkJoin 求和,范围为 0 ~ 50000 , 耗时 : 38857513
     * stream 求和结果 : 1250025000
     * Stream 求和,范围为 0 ~ 50000 , 耗时 : 3958367
     * foreach 求和结果 : 1250025000
     * foreach 求和,范围为 0 ~ 50000 , 耗时 : 1468565
     * ************** end ***************
     * ************** start ***************
     * ForkJoin 求和结果 : 1784293664
     * forkJoin 求和,范围为 0 ~ 1000000 , 耗时 : 169632021
     * stream 求和结果 : 1784293664
     * Stream 求和,范围为 0 ~ 1000000 , 耗时 : 8787865
     * foreach 求和结果 : 1784293664
     * foreach 求和,范围为 0 ~ 1000000 , 耗时 : 7219230
     * ************** end ***************
     * ************** start ***************
     * ForkJoin 求和结果 : 987459712
     * forkJoin 求和,范围为 0 ~ 100000000 , 耗时 : 506719730
     * stream 求和结果 : 987459712
     * Stream 求和,范围为 0 ~ 100000000 , 耗时 : 51661052
     * foreach 求和结果 : 987459712
     * foreach 求和,范围为 0 ~ 100000000 , 耗时 : 54815428
     * ************** end ***************
     */
}
public static void testWithinSum(int limit){
    System.out.println("************** start ***************");
    Long time_1 = calculateTime(limit, 1);
    System.out.println("forkJoin 求和,范围为 0 ~ " + limit + " , 耗时 : " + time_1);
    Long time_2 = calculateTime(limit, 2);
    System.out.println("Stream 求和,范围为 0 ~ " + limit + " , 耗时 : " + time_2);
    Long time_3 = calculateTime(limit, 3);
    System.out.println("foreach 求和,范围为 0 ~ " + limit + " , 耗时 : " + time_3);
    System.out.println("************** end ***************");
}
public static Long calculateTime(int limit, int type){
    int[] a = IntStream.range(0, limit + 1).toArray();
    Long start_1, end_1;
    start_1 = System.nanoTime();
    if (type == 1) {
        testRecursiveTask(limit);
    }
    else if (type == 2) {
        System.out.println("stream 求和结果 : " + Arrays.stream(a).reduce(0, (c,b)->c+b));
    }
    else if (type == 3) {
        int sum_value = 0;
        for (int i:a
             ) {
            sum_value += i;
        }
        System.out.println("foreach 求和结果 : " + sum_value);
    }
    else if (type == 4) {
        testCallable(limit);
    }
    end_1 = System.nanoTime();
    return end_1 - start_1;
}

}

```

2.5 补充