Fork/Join分治框架是JDK1.7引入的。主要作用是将一个大任务拆分为一系列的小任务,然后合并计算结果,以提高并发效率。这也充分发挥了现在多核CPU的特性,工作特点如下图所示:
    image.png
    其主要思想是用到了工作窃取(work stealing)算法,一个线程从其他任务队列中窃取任务执行。所谓工作窃取其实是一种提高效率的方式,一个大任务会被拆分为一个个的小任务,为了方便小任务的管理,每个小任务会被放到各自的队列中,一个大任务被拆分为一个个小的任务队列,这就会出现有的任务队列任务执行完成,有的未完成,那么执行完成的队列就要等待未完成的任务执行完再进行结果合并,那还不如把未完成的任务队列分出一部分任务给空闲的CPU来执行,因此采用工作窃取的方式来提高并发。
    由于任务存在拆分因此ForkJoinPool中的任务分为外部任务和内部任务,存在于全局队列的为外部任务,存在每个工作线程的队列为内部任务,被拆分的子任务会存放在各自的线程队列中。线程在取任务的时候会先判断任务是否执行完成,然后在判断是否被窃取,如果被窃取,就去执行其他任务,优先执行本队列内的任务,一般工作线程不会因为空闲而阻塞。
    ForkJoinPool中的队列使用的是一种WorkQueue的双端队列,之所以使用双端队列是因为,工作线程在执行自身任务的时候是从头部开始取任务,窃取任务时从尾部取任务,这样一定程度的保证了线程安全问题。
    整个分治编程框架中涉及到的类主要包含以下实现类:

    • ForkJoinPool 一个继承自ExecutorService 接口并且用来运行ForkJoinTask的池。
    • ForkJoinTask 在 ForkJoinPool 中运行的任务并且实现了Future接口的抽象基类。 ForkJoinTask 是一个类似于线程的实体,比普通线程轻得多。大量任务和子任务可能由 ForkJoinPool 中的少量实际线程托管。
    • ForkJoinWorkerThread 继承了Thread类,被ForkJoinPool管理的线程,实际ForkJoinTask的执行者。
    • RecursiveAction 继承自ForkJoinTask类,用来递归拆分无结果的ForkJoinTask。
    • RecursiveTask 继承自ForkJoinTask,与RecursiveAction的区别在于用来承载有结果的ForkJoinTask。

    简单演示使用ForkJoinPool来进行大数计算求和:

    1. package forkjoinpool;
    2. import java.time.Duration;
    3. import java.time.Instant;
    4. import java.util.concurrent.ExecutionException;
    5. import java.util.concurrent.ForkJoinPool;
    6. import java.util.concurrent.ForkJoinTask;
    7. import java.util.concurrent.RecursiveTask;
    8. import java.util.stream.LongStream;
    9. public class ForkJoinTest {
    10. private static final long MIN = 0L;
    11. private static final long MAX = 1000_000_000L;
    12. public static void main(String[] args) {
    13. forTest();
    14. forkJoinTest();
    15. streamTest();
    16. }
    17. static void forTest() {
    18. Instant start = Instant.now();
    19. long sum = 0L;
    20. for (long i = MIN; i <= MAX; i++) {
    21. sum = sum + i;
    22. }
    23. Instant end = Instant.now();
    24. System.out.println("for test result :" + sum);
    25. System.out.println("fork test :" + Duration.between(start, end).toMillis());
    26. }
    27. static void forkJoinTest() {
    28. Instant start = Instant.now();
    29. ForkJoinCalculate task = new ForkJoinCalculate(MIN, MAX);
    30. ForkJoinPool pool = new ForkJoinPool();
    31. ForkJoinTask<Long> submit = pool.submit(task);
    32. try {
    33. Long result = submit.get();
    34. System.out.println("fork join test result :" + result);
    35. } catch (InterruptedException | ExecutionException e) {
    36. e.printStackTrace();
    37. }
    38. Instant end = Instant.now();
    39. System.out.println("fork join test:" + Duration.between(start, end).toMillis());
    40. }
    41. static void streamTest() {
    42. Instant start = Instant.now();
    43. long sum = LongStream.range(MIN, MAX + 1).parallel().reduce(0L, Long::sum);
    44. Instant end = Instant.now();
    45. System.out.println("java stream result :" + sum);
    46. System.out.println("java stream test:" + Duration.between(start, end).toMillis());
    47. }
    48. }
    49. class ForkJoinCalculate extends RecursiveTask<Long> {
    50. private final Long start;
    51. private final Long end;
    52. private final static Long LIMIT = 500_000L;
    53. public ForkJoinCalculate(Long start, Long end) {
    54. this.start = start;
    55. this.end = end;
    56. }
    57. @Override
    58. protected Long compute() {
    59. long length = end - start;
    60. long sum = 0L;
    61. if (length <= LIMIT) {
    62. for (long i = start; i <= end; i++) {
    63. sum = sum + i;
    64. }
    65. return sum;
    66. } else {
    67. long middle = (end + start) / 2;
    68. ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
    69. left.fork();
    70. ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end);
    71. right.fork();
    72. return left.join() + right.join();
    73. }
    74. }
    75. }

    输出结果:

    1. for test result :500000000500000000
    2. fork test :457
    3. fork join test result :500000000500000000
    4. fork join test:277
    5. java stream result :500000000500000000
    6. java stream test:246

    可以明显看到使用ForkJoinPool的速度有很大提升,在设置一个合理拆分的临界点时,计算速度几乎媲美Java8中的并行流计算,对于一些较为复杂的IO等待操作使用起来的优势就会更加明显。
    从上面例子也可以概括出Fork/Join框架的模板代码格式:

    1. 定义一个任务单元继承RecursiveTask或者RecursiveAction类。
    2. if 不可拆分的单元
    3. // do something
    4. else
    5. 根据拆分临界值,继续拆分任务,拆分的任务调用fork方法,一般按照二分拆分任务
    6. return task.join() + task.join

    查看ForkJoinPool中的一些核心方法
    ForkJoinPool.png
    关于操作的主要包以下几个:

    • shutdown

      1. public void shutdown() {
      2. checkPermission();
      3. tryTerminate(false, true);
      4. }

      熟悉线程池的应该了解,调用shutdown()方法并不发导致正在执行的任务被停止,正在执行的任务会继续执行,不在接收新的任务,如果并发提交的任务可能会被执行,也可能不执行。

    • shutdownNow 该方法会尝试取消所有任务,并且拒绝随后提交的任务,并且抛出InterruptedException异常。需要注意的是先调用shutdown方法在调用get方法是不会抛出异常的,如果先调用shutdownNow方法再调用get方法会抛出CancellationException异常。

    • invoke 提交一个有返回结果的任务,方法返回值即为结果,不需要再次调用方法获取,所以该方法为阻塞的。
    • submit 提交一个有返回结果的任务,返回结果需要手动调用get方法获得,调用get时会阻塞至返回结果。
    • execute 提交一个无返回结果的任务。

    ForkJoinPool在使用中主要是一种分治思想的参考,源码层面就不过多分析。在使用过程中需要注意的是任务拆分本身也是一种成本,如何拆分就需要权衡拆分的消耗和任务执行本身的一个消耗平衡点,不能为了分治而分治,此外分治结果如果对合并结果有影响,那分治自然也是万万不可取的了,ForkJoinPoo更多的适合CPU密集型任务,例如大数计算、汉诺塔、递归、归并计算等。