作用:将大任务分解为小任务,然后把每个小任务的结果汇总,得到的是大任务的结果
image.png

什么是工作窃取算法

image.png

如何使用Fork/Join

  1. 创建任务:通过ForkJoinTask的两个子类定义任务。
    1. RecursiveAction:用于没有返回结果的任务
    2. RecursiveTask:用于有结果的任务
  2. 执行者:ForkJoinPool

    实例

    需求:1+2+…+10000 ```java public class CountTask extends RecursiveTask {

    private static final int THRESHOLD = 2;

    private int start; private int end;

    public CountTask(int start, int end) {

    1. this.start = start;
    2. this.end = end;

    }

    @Override protected Integer compute() {

    1. int sum = 0;
    2. boolean canCompute = (end - start) <= THRESHOLD;
    3. if (canCompute) {
    4. for (int i = start; i <= end; i++) {
    5. sum += i;
    6. }
    7. } else {
    8. int middle = (start + end) / 2;
    9. CountTask left = new CountTask(start, middle);
    10. CountTask right = new CountTask(middle + 1, end);
    11. left.fork();
    12. right.fork();
    13. int leftR = left.join();
    14. int rightR = right.join();
    15. sum = leftR + rightR;
    16. }
    17. return sum;

    } }

========================

public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask countTask = new CountTask(1, 100); ForkJoinTask submit = forkJoinPool.submit(countTask); System.out.println(submit.get()); } ```

Fork/Join框架异常处理

ForkJoinTask提供了isCompletedAbnormally()方法检查任务是否抛出异常或已经被取消了。
可以通过 ForkJoinTask.getException()来获取异常

  • 返回Throwable对象
  • 被取消返回 CancellationException
  • 没有异常或没有完成,则为null

实现原理

ForkJoinPool的组成

image.png

ForkJoinTask的fork方法实现原理

image.png
pushTask()将当前任务放在ForkJoinTask数组中,然后会创建一个工作线程去执行ForkJoinTask

ForkJoinTask的join方法实现原理

image.png

  1. 首先调用了doJoin,返回任务执行状态
    1. NORMAL:已完成
    2. CANCELLED:被取消
    3. SIGNAL:信号
    4. EXCEPTIONAL:出现异常
  2. doJoin()方法代码

image.png

  1. 若任务执行完成直接返回任务状态
  2. 若没有执行完,从数组中取出执行
    1. 成功:返回完成状态
    2. 异常:返回异常状态