Fork-Join并行处理框架
- Java 1.7 引入了一种新的并发框架—— Fork/Join Framework 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数
- 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架
- 与ThreadPool共存,并不是要替换ThreadPool
- Java 8 parallelStream: List
numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); numbers.parallelStream() .forEach(out::println); - 内部就是采用ForkJoinPool
分治法 (Divide and Conquer)
基本思想:把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解得到原问题的解。
步骤:
- 分割原问题
- 求解子问题
- 合并子问题的解为原问题的解
在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题。
ForkJoinPool 框架主要类:
- ForkJoinPool 实现ForkJoin的线程池 - ThreadPool
- ForkJoinWorkerThread 实现ForkJoin的线程
- ForkJoinTask
一个描述ForkJoin的抽象类 Runnable/Callable - RecursiveAction 无返回结果的ForkJoinTask实现Runnable
- RecursiveTask
有返回结果的ForkJoinTask实现Callable - CountedCompleter
在任务完成执行后会触发执行一个自定义的钩子函数
- ForkJoinPool 实现了Executor Service 接口
- ExecutorService 是Java Executor框架的基础类
- 其他ExecutorService的实现执行Runnable或Callables任务
- ForkJoinPool执行ForkJoinTasks任务
- ForkJoinPool 实现了Executor Service 接口
- ExecutorService 是Java Executor框架的基础类
- 其他ExecutorService的实现执行Runnable或Callables任务
- ForkJoinPool执行ForkJoinTasks任务
- Executors. newWorkStealPool创建ForkJoinPool

| 返回值 | 方法签名 |
|---|---|
| void | execute(ForkJoinTask<?> task) execute(Runnable task) |
| T | invoke(ForkJoinTask |
| List |
invokeAll(Collection<? extends Callable |
ForkJoinTask
submit(ForkJoinTask
submit(Callable
submit(Runnable task)
submit(Runnable task, T result) |
- ForkJoinTask封装了数据及其相应的计算
- 支持细粒度的数据并行
- ForkJoinTask比线程要轻量
- ForkJoinPool中少量工作线程能够运行大量的ForkJoinTask
- ForkJoinTask主要包括两个方法分别实现任务的分拆与合并:

- fork()类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中
- 跟Thread.join()不同,ForkJoinTask的join()方法并不简单的阻塞线程
- 利用工作线程运行其他任务
- 当一个工作线程中调用join(),它将处理其他任务,直到注意到目标子任务已经完成
ForkJoinTask有3个子类:
ForkJoinPool中的所有的工作线程均有一个自己的工作队列WorkQueue
- 双端队列(Deque)
- 从队头取任务
- 线程私有,不共享

- ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头
- 工作线程以LIFO的顺序来处理它队列中的任务

- 为了最大化CPU利用率,空闲的线程将从其他线程的队列中“窃取”任务来执行
- 从工作队列的队尾“窃取”任务,以减少竞争
- 任务的“窃取”是以FIFO顺序进行的,因为先放入的任务往往表示更大的工作量
- 支持“窃取”线程进行进一步的递归分解

- WorkQueue双端队列最小化任务“窃取”的竞争
- push()/pop()仅在其所有者工作线程中调用 这些操作都是通过CAS来实现的,是Wait-free的
- poll() 则由其他工作线程来调用“窃取”任务
- 可能不是wait-free
Fork-Join最佳实践
- 最适合的是计算密集型的任务
- 在需要阻塞工作线程时,可以使用 ManagedBlocker。是否内联方法
- If one of the FJ Threads has to block, a new thread can be started to take its place
- 不应该在RecursiveTask
的内部使用ForkJoinPool.invoke()
