1) 概念

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型
运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计
算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运
算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

2) 使用

提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下
面定义了一个对 1~n 之间的整数求和的任务

  1. public class AddTaskTest {
  2. public static void main(String[] args) {
  3. ForkJoinPool pool = new ForkJoinPool(4);
  4. ForkJoinTask<Integer> task = new AddTask(5);
  5. System.out.println(pool.invoke(task));
  6. }
  7. }
  8. class AddTask extends RecursiveTask<Integer> {
  9. private int start;
  10. public AddTask(int start) {
  11. this.start = start;
  12. }
  13. @Override
  14. public String toString() {
  15. return "{" + start + '}';
  16. }
  17. @Override
  18. protected Integer compute() {
  19. if (start == 1) {
  20. System.out.println(Thread.currentThread().getName() + " join() :" + start);
  21. return start;
  22. }
  23. AddTask addTask = new AddTask(start - 1);
  24. addTask.fork();
  25. System.out.println(Thread.currentThread().getName() + " fork() :" + start + " + " + addTask);
  26. int result = start + addTask.join();
  27. System.out.println(Thread.currentThread().getName() + " join() :" + start + " + " + addTask + " = " + result);
  28. return result;
  29. }
  30. }

image.png
java8新的写法

  1. long reduce = LongStream
  2. // 0 < x < 6
  3. .range(0, 6)
  4. // 并行流
  5. .parallel().reduce(0, Long::sum);
  6. System.out.println(reduce);
  7. long reduce1 = LongStream
  8. // 0 < x <= 5
  9. .rangeClosed(0, 5)
  10. // 顺序流
  11. .sequential().reduce(0, Long::sum);
  12. System.out.println(reduce1);