叉/加入
原文: https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
fork / join 框架是ExecutorService接口的实现,可帮助您利用多个处理器。它专为可以递归分解成小块的工作而设计。目标是使用所有可用的处理能力来提高应用程序的性能。
与任何ExecutorService实现一样,fork / join 框架将任务分配给线程池中的工作线程。 fork / join 框架是不同的,因为它使用工作窃取算法。用尽要做的事情的工作线程可以从仍然忙碌的其他线程中窃取任务。
fork / join 框架的中心是 ForkJoinPool 类,它是AbstractExecutorService类的扩展。 ForkJoinPool实现核心工作窃取算法,可以执行 ForkJoinTask 进程。
基本用法
使用 fork / join 框架的第一步是编写执行一部分工作的代码。您的代码应类似于以下伪代码:
if (my portion of the work is small enough)do the work directlyelsesplit my work into two piecesinvoke the two pieces and wait for the results
将此代码包装在ForkJoinTask子类中,通常使用其中一种更特殊的类型, RecursiveTask (可以返回结果)或 RecursiveAction 。
在ForkJoinTask子类就绪后,创建表示要完成的所有工作的对象,并将其传递给ForkJoinPool实例的invoke()方法。
模糊为 Clarity
为了帮助您了解 fork / join 框架的工作原理,请考虑以下示例。假设您想模糊图像。原始源图像由整数数组表示,其中每个整数包含单个像素的颜色值。模糊的目的地图像也由与源相同大小的整数数组表示。
通过一次一个像素地处理源阵列来完成模糊。每个像素的平均周围像素(红色,绿色和蓝色分量被平均),结果放在目标数组中。由于图像是大型数组,因此此过程可能需要很长时间。通过使用 fork / join 框架实现算法,您可以利用多处理器系统上的并发处理。这是一个可能的实现:
public class ForkBlur extends RecursiveAction {private int[] mSource;private int mStart;private int mLength;private int[] mDestination;// Processing window size; should be odd.private int mBlurWidth = 15;public ForkBlur(int[] src, int start, int length, int[] dst) {mSource = src;mStart = start;mLength = length;mDestination = dst;}protected void computeDirectly() {int sidePixels = (mBlurWidth - 1) / 2;for (int index = mStart; index < mStart + mLength; index++) {// Calculate average.float rt = 0, gt = 0, bt = 0;for (int mi = -sidePixels; mi <= sidePixels; mi++) {int mindex = Math.min(Math.max(mi + index, 0),mSource.length - 1);int pixel = mSource[mindex];rt += (float)((pixel & 0x00ff0000) >> 16)/ mBlurWidth;gt += (float)((pixel & 0x0000ff00) >> 8)/ mBlurWidth;bt += (float)((pixel & 0x000000ff) >> 0)/ mBlurWidth;}// Reassemble destination pixel.int dpixel = (0xff000000 ) |(((int)rt) << 16) |(((int)gt) << 8) |(((int)bt) << 0);mDestination[index] = dpixel;}}...
现在,您实现了抽象compute()方法,该方法可以直接执行模糊或将其拆分为两个较小的任务。简单的数组长度阈值有助于确定是执行还是拆分工作。
protected static int sThreshold = 100000;protected void compute() {if (mLength < sThreshold) {computeDirectly();return;}int split = mLength / 2;invokeAll(new ForkBlur(mSource, mStart, split, mDestination),new ForkBlur(mSource, mStart + split, mLength - split,mDestination));}
如果以前的方法在RecursiveAction类的子类中,那么将任务设置为在ForkJoinPool中运行很简单,并涉及以下步骤:
创建一个代表要完成的所有工作的任务。
// source image pixels are in src// destination image pixels are in dstForkBlur fb = new ForkBlur(src, 0, src.length, dst);
创建将运行任务的
ForkJoinPool。ForkJoinPool pool = new ForkJoinPool();
运行任务。
pool.invoke(fb);
有关完整源代码(包括创建目标图像文件的一些额外代码),请参阅 ForkBlur示例。
标准实现
除了使用 fork / join 框架来实现在多处理器系统上同时执行的任务的自定义算法(例如上一节中的ForkBlur.java示例),Java SE 中有一些通常有用的功能,这些功能已经使用 fork / join 框架。在 Java SE 8 中引入的一种这样的实现由 java.util.Arrays 类用于其parallelSort()方法。这些方法类似于sort(),但通过 fork / join 框架利用并发性。在多处理器系统上运行时,大型数组的并行排序比顺序排序更快。但是,这些方法如何利用 fork / join 框架超出了 Java Tutorials 的范围。有关此信息,请参阅 Java API 文档。
fork / join 框架的另一个实现由java.util.streams包中的方法使用,该包是为计划用于 Java SE 8 的 Project Lambda 的一部分。有关更多信息,请参见 Lambda 表达式部分。
