1) 概念
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型
运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计
算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运
算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
2) 使用
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下
面定义了一个对 1~n 之间的整数求和的任务
public class AddTaskTest {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
ForkJoinTask<Integer> task = new AddTask(5);
System.out.println(pool.invoke(task));
}
}
class AddTask extends RecursiveTask<Integer> {
private int start;
public AddTask(int start) {
this.start = start;
}
@Override
public String toString() {
return "{" + start + '}';
}
@Override
protected Integer compute() {
if (start == 1) {
System.out.println(Thread.currentThread().getName() + " join() :" + start);
return start;
}
AddTask addTask = new AddTask(start - 1);
addTask.fork();
System.out.println(Thread.currentThread().getName() + " fork() :" + start + " + " + addTask);
int result = start + addTask.join();
System.out.println(Thread.currentThread().getName() + " join() :" + start + " + " + addTask + " = " + result);
return result;
}
}
java8新的写法
long reduce = LongStream
// 0 < x < 6
.range(0, 6)
// 并行流
.parallel().reduce(0, Long::sum);
System.out.println(reduce);
long reduce1 = LongStream
// 0 < x <= 5
.rangeClosed(0, 5)
// 顺序流
.sequential().reduce(0, Long::sum);
System.out.println(reduce1);