1. 概述
“分而治之” 一直是一个有效的处理大量数据的方法, 著名的 MapReduce 也是采取了分而治之的思想. 如redis的rdb的bgsave、线程池、以及parallelStream都会用到.<br /> Fork 一词的原始含义是吃饭用的叉子,也有分叉的意思。在Linux 平台中,函数 fork()用来创建子进程,使得系统进程可以多一个执行分支。在 Java 中也沿用了类似的命名方式。<br />而 Join() 的含义和 Thread 类的 join 类似,表示等待。也就是使用 fork() 后系统多了一个执行分支(线程),所以需要等待这个执行分支执行完毕,才有可能得到最终的结果,因此 join 就是表示等待。
2.主要类说明
2.1 ForkJoinPool
实现自ExecutorService是线程的执行器,其他的一些线程池也都是ExecutorService的子类。
它的模型大致是这样的:线程池中的每个线程都有自己的工作队列(PS:这一点和ThreadPoolExecutor不同,ThreadPoolExecutor是所有线程公用一个工作队列,所有线程都从这个工作队列中取任务),当自己队列中的任务都完成以后,会从其它线程的工作队列中偷一个任务执行,这样可以充分利用资源。
2.2 ForkJoinTask
实现自Future,可以看成是任务本身。
严格意义上ForkJoinTask并不是任务本身,由于他没有实现Runnable接口,但是他的子类AdaptedRunnableAction实现了Runnable,这里是「适配器模式」赋予ForkJoinFask任务的执行逻辑Runnable.
根据下面代码可以看到AdaptedRunnableAction是继承于ForkJoinTask, 属于它的子类,并且额外实现RunnableFuture接口, 从而可以实现自定义逻辑的Runnable.
/*** Adaptor for Runnables without results*/static final class AdaptedRunnableAction extends ForkJoinTaskimplements RunnableFuture {final Runnable runnable;AdaptedRunnableAction(Runnable runnable) {if (runnable == null) throw new NullPointerException();this.runnable = runnable;}public final Void getRawResult() { return null; }public final void setRawResult(Void v) { }public final boolean exec() { runnable.run(); return true; }public final void run() { invoke(); }private static final long serialVersionUID = 5232453952276885070L;}
从而通过ForkJoinTask.submit带入该task参数,执行该任务.
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {if (task == null)throw new NullPointerException();externalPush(task);return task;}
2.3 ForkJoinWorkerThread
是Thread的子类,主要负责执行Runnable任务,定义了以下两个成员变量:
final ForkJoinPool pool; // the pool this thread works infinal ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
2.4 三者间的关系
ForkJoinPool主要是由WorkQueue[]组成,WorkQueue队列里面存的是ForkJoinTask[]和ForkJoinWorkerThread。 而ForkJoinWorkerThread持有ForkJoinPool和WorkQueue的引用。
关系图如下:
3.代码原理
3.1 fork流程
- ForkJoinTask
fork() 调用fork()方法。 - 判断调用fork()方法的线程是否是ForkJoinWorkerThread,
如果是直接调用当前线程的workQueue.push方法 「(这里就是为啥,parallelStream开启线程会占用ForkJoinPool线程池的数量)」
如果不是调用全局的ForkJoinPool.common .externalPush(this)
- WorkQueue#push(ForkJoinTask<?> task) 这个push方法并不是把任务加入到当前线程的WorkQueue,而是调用ForkJoinPool#signalWork()方法添加到ForkJoinPool中重新分配到工作线程中的WorkQueue。
3.2 join流程
- ForkJoinTask#join() 调用doJoin().
- doJoin()调用exec()中真正的执行分片任务的逻辑(这里就不展开细说了)
- getRawResult() 获取执行的结果
