1. 概述

  1. “分而治之” 一直是一个有效的处理大量数据的方法, 著名的 MapReduce 也是采取了分而治之的思想. redisrdbbgsave、线程池、以及parallelStream都会用到.<br /> Fork 一词的原始含义是吃饭用的叉子,也有分叉的意思。在Linux 平台中,函数 fork()用来创建子进程,使得系统进程可以多一个执行分支。在 Java 中也沿用了类似的命名方式。<br />而 Join() 的含义和 Thread 类的 join 类似,表示等待。也就是使用 fork() 后系统多了一个执行分支(线程),所以需要等待这个执行分支执行完毕,才有可能得到最终的结果,因此 join 就是表示等待。

2.主要类说明

2.1 ForkJoinPool

实现自ExecutorService是线程的执行器,其他的一些线程池也都是ExecutorService的子类。
它的模型大致是这样的:线程池中的每个线程都有自己的工作队列(PS:这一点和ThreadPoolExecutor不同,ThreadPoolExecutor是所有线程公用一个工作队列,所有线程都从这个工作队列中取任务),当自己队列中的任务都完成以后,会从其它线程的工作队列中偷一个任务执行,这样可以充分利用资源。
image.png

2.2 ForkJoinTask

实现自Future,可以看成是任务本身。

严格意义上ForkJoinTask并不是任务本身,由于他没有实现Runnable接口,但是他的子类AdaptedRunnableAction实现了Runnable,这里是「适配器模式」赋予ForkJoinFask任务的执行逻辑Runnable.

根据下面代码可以看到AdaptedRunnableAction是继承于ForkJoinTask, 属于它的子类,并且额外实现RunnableFuture接口, 从而可以实现自定义逻辑的Runnable.

  1. /**
  2. * Adaptor for Runnables without results
  3. */
  4. static final class AdaptedRunnableAction extends ForkJoinTask
  5. implements RunnableFuture {
  6. final Runnable runnable;
  7. AdaptedRunnableAction(Runnable runnable) {
  8. if (runnable == null) throw new NullPointerException();
  9. this.runnable = runnable;
  10. }
  11. public final Void getRawResult() { return null; }
  12. public final void setRawResult(Void v) { }
  13. public final boolean exec() { runnable.run(); return true; }
  14. public final void run() { invoke(); }
  15. private static final long serialVersionUID = 5232453952276885070L;
  16. }

从而通过ForkJoinTask.submit带入该task参数,执行该任务.

  1. public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
  2. if (task == null)
  3. throw new NullPointerException();
  4. externalPush(task);
  5. return task;
  6. }

2.3 ForkJoinWorkerThread

是Thread的子类,主要负责执行Runnable任务,定义了以下两个成员变量:

  1. final ForkJoinPool pool; // the pool this thread works in
  2. final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

2.4 三者间的关系

ForkJoinPool主要是由WorkQueue[]组成,WorkQueue队列里面存的是ForkJoinTask[]和ForkJoinWorkerThread。 而ForkJoinWorkerThread持有ForkJoinPool和WorkQueue的引用。
关系图如下:
image.png

3.代码原理

3.1 fork流程

  • ForkJoinTask fork() 调用fork()方法。
  • 判断调用fork()方法的线程是否是ForkJoinWorkerThread,

如果是直接调用当前线程的workQueue.push方法 「(这里就是为啥,parallelStream开启线程会占用ForkJoinPool线程池的数量)」
如果不是调用全局的ForkJoinPool.common .externalPush(this)

  • WorkQueue#push(ForkJoinTask<?> task) 这个push方法并不是把任务加入到当前线程的WorkQueue,而是调用ForkJoinPool#signalWork()方法添加到ForkJoinPool中重新分配到工作线程中的WorkQueue。

3.2 join流程

  • ForkJoinTask#join() 调用doJoin().
  • doJoin()调用exec()中真正的执行分片任务的逻辑(这里就不展开细说了)
  • getRawResult() 获取执行的结果

3.2 任务窃取流程