Java 7 开始引入了一种新的 Fork/Join 线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行
我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:

  1. ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
  2. └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:

  1. ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
  2. └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
  3. ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
  4. └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

如果拆成两部分还是很大,我们还可以继续拆,用 4 个线程并行执行:

  1. ┌─┬─┬─┬─┬─┬─┐
  2. └─┴─┴─┴─┴─┴─┘
  3. ┌─┬─┬─┬─┬─┬─┐
  4. └─┴─┴─┴─┴─┴─┘
  5. ┌─┬─┬─┬─┬─┬─┐
  6. └─┴─┴─┴─┴─┴─┘
  7. ┌─┬─┬─┬─┬─┬─┐
  8. └─┴─┴─┴─┴─┴─┘

这就是 Fork/Join 任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复「裂变」成一系列小任务。
例如,对大数据进行并行求和:

  1. import java.util.Random;
  2. import java.util.concurrent.*;
  3. public class Main {
  4. public static void main(String[] args) throws Exception {
  5. // 创建 2000 个随机数组成的数组:
  6. long[] array = new long[2000];
  7. long expectedSum = 0;
  8. for (int i = 0; i < array.length; i++) {
  9. array[i] = random();
  10. expectedSum += array[i];
  11. }
  12. System.out.println("Expected sum: " + expectedSum);
  13. // fork/join:
  14. ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
  15. long startTime = System.currentTimeMillis();
  16. Long result = ForkJoinPool.commonPool().invoke(task);
  17. long endTime = System.currentTimeMillis();
  18. System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
  19. // 直接 for 循环的计时
  20. //long startTime_ = System.currentTimeMillis();
  21. //Long result_ = 0L;
  22. //for (int i = 0; i < array.length; i++) {
  23. // result_ += array[i];
  24. //}
  25. //long endTime_ = System.currentTimeMillis();
  26. //System.out.println("Normal sum: " + result_ + " in " + (endTime_ - startTime_) + " ms.");
  27. }
  28. static Random random = new Random(0);
  29. static long random() {
  30. return random.nextInt(10000);
  31. }
  32. }
  33. class SumTask extends RecursiveTask<Long> {
  34. static final int THRESHOLD = 500;
  35. long[] array;
  36. int start;
  37. int end;
  38. SumTask(long[] array, int start, int end) {
  39. this.array = array;
  40. this.start = start;
  41. this.end = end;
  42. }
  43. @Override
  44. protected Long compute() {
  45. if (end - start <= THRESHOLD) {
  46. // 如果任务足够小,直接计算:
  47. long sum = 0;
  48. for (int i = start; i < end; i++) {
  49. sum += this.array[i];
  50. // 故意放慢计算速度:
  51. try {
  52. Thread.sleep(1);
  53. } catch (InterruptedException e) {
  54. }
  55. }
  56. return sum;
  57. }
  58. // 任务太大,一分为二:
  59. int middle = (end + start) / 2;
  60. System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
  61. SumTask subtask1 = new SumTask(this.array, start, middle);
  62. SumTask subtask2 = new SumTask(this.array, middle, end);
  63. invokeAll(subtask1, subtask2);
  64. Long subresult1 = subtask1.join();
  65. Long subresult2 = subtask2.join();
  66. Long result = subresult1 + subresult2;
  67. System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
  68. return result;
  69. }
  70. }

可以将上述求 for 循环的时间代码去掉注释,与 Fork/Join 对比。需要注意的是,要注释掉 Fork/Join 中 compute()slepp()System.out.println() 输出语句。这些都会增加不必要的时间开销。
观察上述代码的执行过程,一个大的计算任务 0~2000 首先分裂为两个小任务 0~1000 和 1000~2000 ,这两个小任务仍然太大,继续分裂为更小的 0~500 ,500~1000 ,1000~1500 ,1500~2000,最后,计算结果被依次合并,得到最终结果。
因此,核心代码 SumTask 继承自 RecursiveTask,在 compute() 方法中,关键是如何「分裂」出子任务并且提交子任务:

  1. class SumTask extends RecursiveTask<Long> {
  2. protected Long compute() {
  3. // 分裂子任务:
  4. SumTask subtask1 = new SumTask(...);
  5. SumTask subtask2 = new SumTask(...);
  6. // invokeAll会并行运行两个子任务:
  7. invokeAll(subtask1, subtask2);
  8. // 获得子任务的结果:
  9. Long subresult1 = subtask1.join();
  10. Long subresult2 = subtask2.join();
  11. // 汇总结果:
  12. return subresult1 + subresult2;
  13. }
  14. }

Fork/Join 线程池在 Java 标准库中就有应用。Java 标准库提供的 java.util.Arrays.parallelSort(array) 可以进行并行排序,它的原理就是内部通过 Fork/Join 对大数组分拆进行并行排序,在多核 CPU 上就可以大大提高排序的速度。

小结

Fork/Join 是一种基于「分治」的算法:通过分解任务,并行执行,最后合并结果得到最终结果。
ForkJoinPool 线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自 RecursiveTaskRecursiveAction
使用 Fork/Join 模式可以进行并行计算以提高效率。