Fork/Join 框架简介

  • Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子 任务结果合并成最后的计算结果,并进行输出。Fork/Join 框架要完成两件事 情
  • Fork:把一个复杂任务进行分拆,大事化小
  • Join:把分拆任务的结果进行合并
  • image.png
  • 任务分割:首先 Fork/Join 框架需要把大的任务分割成足够小的子任务,如果 子任务比较大的话还要对子任务进行继续分割
  • 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程 分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里, 启动一个线程从队列里取数据,然后合并这些数据
  • 在 Java 的 Fork/Join 框架中,使用两个类完成上述操作

    • ForkJoinTask:我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。 该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集 成 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了两个子类:
      • RecursiveAction:用于没有返回结果的任务
      • RecursiveTask:用于有返回结果的任务
    • ForkJoinPool:ForkJoinTask 需要通过 ForkJoinPool 来执行
    • RecursiveTask:继承后可以实现递归(自己调自己)调用的任务

      Fork方法

  • image.png

  • image.png
  • Fork 方法的实现原理: 当我们调用 ForkJoinTask 的 fork 方法时,程序会把 任务放在 ForkJoinWorkerThread 的 pushTask 的 workQueue 中,异步地 执行这个任务,然后立即返回结果

    1. public final ForkJoinTask<V> fork() {
    2. Thread t;
    3. if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    4. ((ForkJoinWorkerThread)t).workQueue.push(this);
    5. else
    6. ForkJoinPool.common.externalPush(this);
    7. return this;
    8. }
    • pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后再调用 ForkJoinPool 的 signalWork()方法唤醒或创建一个工作线程来执行任务。代 码如下:
      1. final void push(ForkJoinTask<?> task) {
      2. ForkJoinTask<?>[] a;
      3. int s = top, d, cap, m;
      4. ForkJoinPool p = pool;
      5. if ((a = array) != null && (cap = a.length) > 0) {
      6. QA.setRelease(a, (m = cap - 1) & s, task);
      7. top = s + 1;
      8. if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
      9. p != null) { // size 0 or 1
      10. VarHandle.fullFence();
      11. p.signalWork();
      12. }
      13. else if (d == m)
      14. growArray(false);
      15. }
      16. }

join 方法

  1. public final V join() {
  2. int s;
  3. if (((s = doJoin()) & ABNORMAL) != 0)
  4. reportException(s);
  5. return getRawResult();
  6. }
  1. private int doJoin() {
  2. int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
  3. return (s = status) < 0 ? s :
  4. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
  5. (w = (wt = (ForkJoinWorkerThread)t).workQueue).
  6. tryUnpush(this) && (s = doExec()) < 0 ? s :
  7. wt.pool.awaitJoin(w, this, 0L) :
  8. externalAwaitDone();
  9. }
  1. private int doJoin() {
  2. int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
  3. return (s = status) < 0 ? s :
  4. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
  5. (w = (wt = (ForkJoinWorkerThread)t).workQueue).
  6. tryUnpush(this) && (s = doExec()) < 0 ? s :
  7. wt.pool.awaitJoin(w, this, 0L) :
  8. externalAwaitDone();
  9. }
  • 在 doJoin()方法流程如下:
    • 首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接 返回任务状态;
    • 如果没有执行完,则从任务数组里取出任务并执行。
    • 如果任务顺利执行完成,则设置任务状态为 NORMAL,如果出现异常,则记 录异常,并将任务状态设置为 EXCEPTIONAL

Fork/Join 框架的异常处理

  • ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接 捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally()方法来检查 任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的 getException 方法获取异常
  • getException 方法返回 Throwable 对象,如果任务被取消了则返回 CancellationException。如果任务没有完成或者没有抛出异常则返回 null

计算1至100万的整数之和

  1. class MyTask extends RecursiveTask<Integer> {
  2. private static final Integer VALUE = 10;
  3. private int begin;
  4. private int end;
  5. private int result;
  6. public MyTask(int begin, int end) {
  7. this.begin = begin;
  8. this.end = end;
  9. }
  10. @Override
  11. protected Integer compute() {
  12. if ((end - begin) <= begin) {
  13. for (int i = begin; i <= end; i++) {
  14. result += i;
  15. }
  16. } else {
  17. int middle = (begin + end) / 2;
  18. MyTask myTask01 = new MyTask(begin, middle);
  19. MyTask myTask02 = new MyTask(middle + 1, end);
  20. myTask01.fork();
  21. myTask02.fork();
  22. result = myTask01.join() + myTask02.join();
  23. }
  24. return result;
  25. }
  26. }
  27. public class ForkJoinDemo1 {
  28. public static void main(String[] args) throws ExecutionException, InterruptedException {
  29. MyTask myTask = new MyTask(0, 1000000);
  30. ForkJoinPool forkJoinPool = new ForkJoinPool();
  31. ForkJoinTask<Integer> submit = forkJoinPool.submit(myTask);
  32. Integer integer = submit.get();
  33. System.err.println(integer);
  34. forkJoinPool.shutdown();
  35. }
  36. }