1、ForkJoin思想

ForkJoin将一个大任务拆分成许多个独立的子任务,然后开启线程去并行的处理这些子任务,如果子任务规模不够小,还会进一步拆解,直到得到足够小的子任务,最后将子任务的结果合并成最后的结果。

image.png

2、ForkJoin框架

2.1、框架介绍

JUC包以ForkJoin线程池的方式提供了一套ForkJoin框架的实现,且ForkJoin线程池在Java8中lambda并行流框架中充当着底层框架的角色。

  • ForkJoinPool: 执行任务的线程池,继承自AbstractExecutorService
  • ForkJoinWorkerThread:执行任务的工作线程(ForkJoinPool线程池中线程),每个线程都维护了一个内部队列,用于存放内部任务
  • ForkJoinTask:用于ForkJoinPool的任务抽象类,实现了Future接口
  • RecursiveTask:ForkJoinTask子类,带返回结果的递归执行任务
  • RecursiveAction:ForkJoinTask子类,不带返回结果的递归执行任务

ForkJoinTask类比较复杂,抽象方法较多,在日常使用时一般不会直接继承ForkJoinTask来实现自定义任务类,而是通过继承ForkJoinTask的两个子类
RecursiveTask和RecursiveAction去实现自定义任务类,自定义任务类需实现这些子类的compute()方法,该方法的执行流程一般如下:

  1. if 任务足够小
  2. 计算返回结果
  3. else
  4. 分割成n个子任务
  5. 依次调用每个子任务的fork方法执行子任务
  6. 依次调用每个子任务的join方法,等待子任务完成,然后合并执行结果

2.2、使用实例

  1. public class SumDemo {
  2. private static final int THRESHOLD = 100;
  3. @AllArgsConstructor
  4. @NoArgsConstructor
  5. public static class AccumulateTask extends RecursiveTask<Long> {
  6. private long start;
  7. private long end;
  8. @Override
  9. protected Long compute() {
  10. long sum = 0;
  11. if (end - start <= THRESHOLD) {
  12. for (long i = start; i <= end; i++) {
  13. sum += i;
  14. }
  15. return sum;
  16. } else {
  17. long middle = ((end - start) >> 2) + start;
  18. AccumulateTask subTask1 = new AccumulateTask(start, middle);
  19. subTask1.fork();
  20. AccumulateTask subTask2 = new AccumulateTask(middle + 1, end);
  21. subTask2.fork();
  22. return subTask1.join() + subTask2.join();
  23. }
  24. }
  25. }
  26. public static void main(String[] args) throws ExecutionException, InterruptedException {
  27. ForkJoinPool pool = new ForkJoinPool();
  28. long time1 = System.currentTimeMillis();
  29. AccumulateTask task = new AccumulateTask(0L, 10000000L);
  30. ForkJoinTask<Long> taskFuture = pool.submit(task);
  31. Long result = taskFuture.get();
  32. long time2 = System.currentTimeMillis();
  33. System.out.println("耗时:" + (time2 - time1) + "结果:" + result);
  34. long time3 = System.currentTimeMillis();
  35. result = sum(0L, 10000000L);
  36. System.out.println("耗时:" + (time3 - time2) + "结果:" + result);
  37. }
  38. private static Long sum(long start, long end) {
  39. long sum = 0;
  40. for (long i = start; i <= end; i++) {
  41. sum += i;
  42. }
  43. return sum;
  44. }
  45. }