Fork/Join分治框架是JDK1.7引入的。主要作用是将一个大任务拆分为一系列的小任务,然后合并计算结果,以提高并发效率。这也充分发挥了现在多核CPU的特性,工作特点如下图所示:
其主要思想是用到了工作窃取(work stealing)算法,一个线程从其他任务队列中窃取任务执行。所谓工作窃取其实是一种提高效率的方式,一个大任务会被拆分为一个个的小任务,为了方便小任务的管理,每个小任务会被放到各自的队列中,一个大任务被拆分为一个个小的任务队列,这就会出现有的任务队列任务执行完成,有的未完成,那么执行完成的队列就要等待未完成的任务执行完再进行结果合并,那还不如把未完成的任务队列分出一部分任务给空闲的CPU来执行,因此采用工作窃取的方式来提高并发。
由于任务存在拆分因此ForkJoinPool中的任务分为外部任务和内部任务,存在于全局队列的为外部任务,存在每个工作线程的队列为内部任务,被拆分的子任务会存放在各自的线程队列中。线程在取任务的时候会先判断任务是否执行完成,然后在判断是否被窃取,如果被窃取,就去执行其他任务,优先执行本队列内的任务,一般工作线程不会因为空闲而阻塞。
ForkJoinPool中的队列使用的是一种WorkQueue的双端队列,之所以使用双端队列是因为,工作线程在执行自身任务的时候是从头部开始取任务,窃取任务时从尾部取任务,这样一定程度的保证了线程安全问题。
整个分治编程框架中涉及到的类主要包含以下实现类:
- ForkJoinPool 一个继承自ExecutorService 接口并且用来运行ForkJoinTask的池。
- ForkJoinTask 在 ForkJoinPool 中运行的任务并且实现了Future接口的抽象基类。 ForkJoinTask 是一个类似于线程的实体,比普通线程轻得多。大量任务和子任务可能由 ForkJoinPool 中的少量实际线程托管。
- ForkJoinWorkerThread 继承了Thread类,被ForkJoinPool管理的线程,实际ForkJoinTask的执行者。
- RecursiveAction 继承自ForkJoinTask类,用来递归拆分无结果的ForkJoinTask。
- RecursiveTask 继承自ForkJoinTask,与RecursiveAction的区别在于用来承载有结果的ForkJoinTask。
简单演示使用ForkJoinPool来进行大数计算求和:
package forkjoinpool;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
public class ForkJoinTest {
private static final long MIN = 0L;
private static final long MAX = 1000_000_000L;
public static void main(String[] args) {
forTest();
forkJoinTest();
streamTest();
}
static void forTest() {
Instant start = Instant.now();
long sum = 0L;
for (long i = MIN; i <= MAX; i++) {
sum = sum + i;
}
Instant end = Instant.now();
System.out.println("for test result :" + sum);
System.out.println("fork test :" + Duration.between(start, end).toMillis());
}
static void forkJoinTest() {
Instant start = Instant.now();
ForkJoinCalculate task = new ForkJoinCalculate(MIN, MAX);
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> submit = pool.submit(task);
try {
Long result = submit.get();
System.out.println("fork join test result :" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Instant end = Instant.now();
System.out.println("fork join test:" + Duration.between(start, end).toMillis());
}
static void streamTest() {
Instant start = Instant.now();
long sum = LongStream.range(MIN, MAX + 1).parallel().reduce(0L, Long::sum);
Instant end = Instant.now();
System.out.println("java stream result :" + sum);
System.out.println("java stream test:" + Duration.between(start, end).toMillis());
}
}
class ForkJoinCalculate extends RecursiveTask<Long> {
private final Long start;
private final Long end;
private final static Long LIMIT = 500_000L;
public ForkJoinCalculate(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
long sum = 0L;
if (length <= LIMIT) {
for (long i = start; i <= end; i++) {
sum = sum + i;
}
return sum;
} else {
long middle = (end + start) / 2;
ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
left.fork();
ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end);
right.fork();
return left.join() + right.join();
}
}
}
输出结果:
for test result :500000000500000000
fork test :457
fork join test result :500000000500000000
fork join test:277
java stream result :500000000500000000
java stream test:246
可以明显看到使用ForkJoinPool的速度有很大提升,在设置一个合理拆分的临界点时,计算速度几乎媲美Java8中的并行流计算,对于一些较为复杂的IO等待操作使用起来的优势就会更加明显。
从上面例子也可以概括出Fork/Join框架的模板代码格式:
定义一个任务单元继承RecursiveTask或者RecursiveAction类。
if 不可拆分的单元
// do something
else
根据拆分临界值,继续拆分任务,拆分的任务调用fork方法,一般按照二分拆分任务
return task.join() + task.join
查看ForkJoinPool中的一些核心方法
关于操作的主要包以下几个:
shutdown
public void shutdown() {
checkPermission();
tryTerminate(false, true);
}
熟悉线程池的应该了解,调用shutdown()方法并不发导致正在执行的任务被停止,正在执行的任务会继续执行,不在接收新的任务,如果并发提交的任务可能会被执行,也可能不执行。
shutdownNow 该方法会尝试取消所有任务,并且拒绝随后提交的任务,并且抛出InterruptedException异常。需要注意的是先调用shutdown方法在调用get方法是不会抛出异常的,如果先调用shutdownNow方法再调用get方法会抛出CancellationException异常。
- invoke 提交一个有返回结果的任务,方法返回值即为结果,不需要再次调用方法获取,所以该方法为阻塞的。
- submit 提交一个有返回结果的任务,返回结果需要手动调用get方法获得,调用get时会阻塞至返回结果。
- execute 提交一个无返回结果的任务。
ForkJoinPool在使用中主要是一种分治思想的参考,源码层面就不过多分析。在使用过程中需要注意的是任务拆分本身也是一种成本,如何拆分就需要权衡拆分的消耗和任务执行本身的一个消耗平衡点,不能为了分治而分治,此外分治结果如果对合并结果有影响,那分治自然也是万万不可取的了,ForkJoinPoo更多的适合CPU密集型任务,例如大数计算、汉诺塔、递归、归并计算等。