1、ForkJoin思想
ForkJoin将一个大任务拆分成许多个独立的子任务,然后开启线程去并行的处理这些子任务,如果子任务规模不够小,还会进一步拆解,直到得到足够小的子任务,最后将子任务的结果合并成最后的结果。
2、ForkJoin框架
2.1、框架介绍
JUC包以ForkJoin线程池的方式提供了一套ForkJoin框架的实现,且ForkJoin线程池在Java8中lambda并行流框架中充当着底层框架的角色。
- ForkJoinPool: 执行任务的线程池,继承自AbstractExecutorService
- ForkJoinWorkerThread:执行任务的工作线程(ForkJoinPool线程池中线程),每个线程都维护了一个内部队列,用于存放内部任务
- ForkJoinTask:用于ForkJoinPool的任务抽象类,实现了Future接口
- RecursiveTask:ForkJoinTask子类,带返回结果的递归执行任务
- RecursiveAction:ForkJoinTask子类,不带返回结果的递归执行任务
ForkJoinTask类比较复杂,抽象方法较多,在日常使用时一般不会直接继承ForkJoinTask来实现自定义任务类,而是通过继承ForkJoinTask的两个子类
RecursiveTask和RecursiveAction去实现自定义任务类,自定义任务类需实现这些子类的compute()方法,该方法的执行流程一般如下:
if 任务足够小
计算返回结果
else
分割成n个子任务
依次调用每个子任务的fork方法执行子任务
依次调用每个子任务的join方法,等待子任务完成,然后合并执行结果
2.2、使用实例
public class SumDemo {
private static final int THRESHOLD = 100;
@AllArgsConstructor
@NoArgsConstructor
public static class AccumulateTask extends RecursiveTask<Long> {
private long start;
private long end;
@Override
protected Long compute() {
long sum = 0;
if (end - start <= THRESHOLD) {
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = ((end - start) >> 2) + start;
AccumulateTask subTask1 = new AccumulateTask(start, middle);
subTask1.fork();
AccumulateTask subTask2 = new AccumulateTask(middle + 1, end);
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
long time1 = System.currentTimeMillis();
AccumulateTask task = new AccumulateTask(0L, 10000000L);
ForkJoinTask<Long> taskFuture = pool.submit(task);
Long result = taskFuture.get();
long time2 = System.currentTimeMillis();
System.out.println("耗时:" + (time2 - time1) + "结果:" + result);
long time3 = System.currentTimeMillis();
result = sum(0L, 10000000L);
System.out.println("耗时:" + (time3 - time2) + "结果:" + result);
}
private static Long sum(long start, long end) {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
}