叉/加入

原文: https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html

fork / join 框架是ExecutorService接口的实现,可帮助您利用多个处理器。它专为可以递归分解成小块的工作而设计。目标是使用所有可用的处理能力来提高应用程序的性能。

与任何ExecutorService实现一样,fork / join 框架将任务分配给线程池中的工作线程。 fork / join 框架是不同的,因为它使用工作窃取算法。用尽要做的事情的工作线程可以从仍然忙碌的其他线程中窃取任务。

fork / join 框架的中心是 ForkJoinPool 类,它是AbstractExecutorService类的扩展。 ForkJoinPool实现核心工作窃取算法,可以执行 ForkJoinTask 进程。

基本用法

使用 fork / join 框架的第一步是编写执行一部分工作的代码。您的代码应类似于以下伪代码:

  1. if (my portion of the work is small enough)
  2. do the work directly
  3. else
  4. split my work into two pieces
  5. invoke the two pieces and wait for the results

将此代码包装在ForkJoinTask子类中,通常使用其中一种更特殊的类型, RecursiveTask (可以返回结果)或 RecursiveAction

ForkJoinTask子类就绪后,创建表示要完成的所有工作的对象,并将其传递给ForkJoinPool实例的invoke()方法。

模糊为 Clarity

为了帮助您了解 fork / join 框架的工作原理,请考虑以下示例。假设您想模糊图像。原始图像由整数数组表示,其中每个整数包含单个像素的颜色值。模糊的目的地图像也由与源相同大小的整数数组表示。

通过一次一个像素地处理源阵列来完成模糊。每个像素的平均周围像素(红色,绿色和蓝色分量被平均),结果放在目标数组中。由于图像是大型数组,因此此过程可能需要很长时间。通过使用 fork / join 框架实现算法,您可以利用多处理器系统上的并发处理。这是一个可能的实现:

  1. public class ForkBlur extends RecursiveAction {
  2. private int[] mSource;
  3. private int mStart;
  4. private int mLength;
  5. private int[] mDestination;
  6. // Processing window size; should be odd.
  7. private int mBlurWidth = 15;
  8. public ForkBlur(int[] src, int start, int length, int[] dst) {
  9. mSource = src;
  10. mStart = start;
  11. mLength = length;
  12. mDestination = dst;
  13. }
  14. protected void computeDirectly() {
  15. int sidePixels = (mBlurWidth - 1) / 2;
  16. for (int index = mStart; index < mStart + mLength; index++) {
  17. // Calculate average.
  18. float rt = 0, gt = 0, bt = 0;
  19. for (int mi = -sidePixels; mi <= sidePixels; mi++) {
  20. int mindex = Math.min(Math.max(mi + index, 0),
  21. mSource.length - 1);
  22. int pixel = mSource[mindex];
  23. rt += (float)((pixel & 0x00ff0000) >> 16)
  24. / mBlurWidth;
  25. gt += (float)((pixel & 0x0000ff00) >> 8)
  26. / mBlurWidth;
  27. bt += (float)((pixel & 0x000000ff) >> 0)
  28. / mBlurWidth;
  29. }
  30. // Reassemble destination pixel.
  31. int dpixel = (0xff000000 ) |
  32. (((int)rt) << 16) |
  33. (((int)gt) << 8) |
  34. (((int)bt) << 0);
  35. mDestination[index] = dpixel;
  36. }
  37. }
  38. ...

现在,您实现了抽象compute()方法,该方法可以直接执行模糊或将其拆分为两个较小的任务。简单的数组长度阈值有助于确定是执行还是拆分工作。

  1. protected static int sThreshold = 100000;
  2. protected void compute() {
  3. if (mLength < sThreshold) {
  4. computeDirectly();
  5. return;
  6. }
  7. int split = mLength / 2;
  8. invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
  9. new ForkBlur(mSource, mStart + split, mLength - split,
  10. mDestination));
  11. }

如果以前的方法在RecursiveAction类的子类中,那么将任务设置为在ForkJoinPool中运行很简单,并涉及以下步骤:

  1. 创建一个代表要完成的所有工作的任务。

    1. // source image pixels are in src
    2. // destination image pixels are in dst
    3. ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
  2. 创建将运行任务的ForkJoinPool

    1. ForkJoinPool pool = new ForkJoinPool();
  3. 运行任务。

    1. pool.invoke(fb);

有关完整源代码(包括创建目标图像文件的一些额外代码),请参阅 ForkBlur示例。

标准实现

除了使用 fork / join 框架来实现在多处理器系统上同时执行的任务的自定义算法(例如上一节中的ForkBlur.java示例),Java SE 中有一些通常有用的功能,这些功能已经使用 fork / join 框架。在 Java SE 8 中引入的一种这样的实现由 java.util.Arrays 类用于其parallelSort()方法。这些方法类似于sort(),但通过 fork / join 框架利用并发性。在多处理器系统上运行时,大型数组的并行排序比顺序排序更快。但是,这些方法如何利用 fork / join 框架超出了 Java Tutorials 的范围。有关此信息,请参阅 Java API 文档。

fork / join 框架的另一个实现由java.util.streams包中的方法使用,该包是为计划用于 Java SE 8 的 Project Lambda 的一部分。有关更多信息,请参见 Lambda 表达式部分。