Fork-Join并行处理框架

  • Java 1.7 引入了一种新的并发框架—— Fork/Join Framework 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数
  • 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架
  • 与ThreadPool共存,并不是要替换ThreadPool
  • Java 8 parallelStream: List numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); numbers.parallelStream() .forEach(out::println);
  • 内部就是采用ForkJoinPool

分治法 (Divide and Conquer)
基本思想:把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解得到原问题的解。
步骤:

  1. 分割原问题
  2. 求解子问题
  3. 合并子问题的解为原问题的解

在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题。

ForkJoinPool 框架主要类:

  • ForkJoinPool 实现ForkJoin的线程池 - ThreadPool
    • ForkJoinWorkerThread 实现ForkJoin的线程
  • ForkJoinTask 一个描述ForkJoin的抽象类 Runnable/Callable
    • RecursiveAction 无返回结果的ForkJoinTask实现Runnable
    • RecursiveTask 有返回结果的ForkJoinTask实现Callable
    • CountedCompleter 在任务完成执行后会触发执行一个自定义的钩子函数
  • ForkJoinPool 实现了Executor Service 接口
    • ExecutorService 是Java Executor框架的基础类
    • 其他ExecutorService的实现执行Runnable或Callables任务
    • ForkJoinPool执行ForkJoinTasks任务
  • ForkJoinPool 实现了Executor Service 接口
    • ExecutorService 是Java Executor框架的基础类
    • 其他ExecutorService的实现执行Runnable或Callables任务
    • ForkJoinPool执行ForkJoinTasks任务
  • Executors. newWorkStealPool创建ForkJoinPool

image.png

返回值 方法签名
void execute(ForkJoinTask<?> task)
execute(Runnable task)
T invoke(ForkJoinTask task)
List> invokeAll(Collection<? extends Callable> tasks)

ForkJoinTask |

submit(ForkJoinTask task)
submit(Callable task)
submit(Runnable task)
submit(Runnable task, T result) |

  • ForkJoinTask封装了数据及其相应的计算
    • 支持细粒度的数据并行
  • ForkJoinTask比线程要轻量
    • ForkJoinPool中少量工作线程能够运行大量的ForkJoinTask
  • ForkJoinTask主要包括两个方法分别实现任务的分拆与合并:

image.png

  • fork()类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中
  • 跟Thread.join()不同,ForkJoinTask的join()方法并不简单的阻塞线程
    • 利用工作线程运行其他任务
    • 当一个工作线程中调用join(),它将处理其他任务,直到注意到目标子任务已经完成
  • ForkJoinTask有3个子类:

    • RecursiveAction 无返回值的任务
    • RecursiveTask 有返回值的任务
    • CountedCompleter 完成任务后将触发其他任务

      ForkJoin内部原理

  • ForkJoinPool中的所有的工作线程均有一个自己的工作队列WorkQueue

    • 双端队列(Deque)
    • 从队头取任务
    • 线程私有,不共享

image.png

  • ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头
    • 工作线程以LIFO的顺序来处理它队列中的任务

image.png

  • 为了最大化CPU利用率,空闲的线程将从其他线程的队列中“窃取”任务来执行
    • 从工作队列的队尾“窃取”任务,以减少竞争
    • 任务的“窃取”是以FIFO顺序进行的,因为先放入的任务往往表示更大的工作量
      • 支持“窃取”线程进行进一步的递归分解

image.png

  • WorkQueue双端队列最小化任务“窃取”的竞争
    • push()/pop()仅在其所有者工作线程中调用 这些操作都是通过CAS来实现的,是Wait-free的
    • poll() 则由其他工作线程来调用“窃取”任务
      • 可能不是wait-free

image.png

Fork-Join最佳实践

  • 最适合的是计算密集型的任务
  • 在需要阻塞工作线程时,可以使用 ManagedBlocker。是否内联方法
    • If one of the FJ Threads has to block, a new thread can be started to take its place
  • 不应该在RecursiveTask的内部使用ForkJoinPool.invoke()