任务类型

  1. CPU密集型:计算密集,, 比如加密,解密,计算类型,需要用到CPU, 一般设置为CPU核数的 1- 2 倍。
    过多:线程上下文切换耗时
    过少:CPU 浪费
  2. IO 密集型: 比如读写操作,网络通信之类,不会特别消耗CPU,,IO操作耗时。
    可以设置为核数的多倍,IO 操作设计的CPU处理较少,

线程数计算方法

线程数 = CPU 核心数 *(1+平均等待时间/平均工作时间)
如果任务平均等待时间长,线程数可以越多,——-IO密集型
如果任务平均工作时间长,线程数就少 ——————CPU密集型

最准确的是压测,确定线程数

分治思想

归并,把规模为N的问题分解成K个小规模的子问题。求出子问题,归并总结就是原来的大问题
步骤

  1. 分解
  2. 求解
  3. 合并

image.png
image.png

应用场景

归并排序
快速排序
二分查找
大数据-MapReduce
Fork/Join

Fork/Join框架介绍

ThreadPoolExecutor的确定

  1. 大任务无法分解,只能串行
  2. 工作线程从队列获取任务存在竞争

核心类:ForkJoinPool类

是对ThreadPoolExecutor的补充,仅针对计算密集型的场景加强,
使用时需要对:任务总数,单任务执行耗时,并行数评估性能
结构:
image.png

Fork/Join的使用

Fork/Join 计算框架主要包含两部分,

  1. 分治任务的线程池 ForkJoinPool,
  2. 分治任务 ForkJoinTask

    ForkJoinPool

    用于执行ForkJoinTask任务的执行线程池。维护队列数据 WorkQueue(WorkQueue[])
    目的:减少提交任务的碰撞

ForkJoinPool构造器

image.png

  1. //
  2. public ForkJoinPool(int parallelism,
  3. ForkJoinWorkerThreadFactory factory,
  4. UncaughtExceptionHandler handler,
  5. boolean asyncMode) {
  6. this(checkParallelism(parallelism),
  7. checkFactory(factory),
  8. handler,
  9. asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
  10. "ForkJoinPool-" + nextPoolId() + "-worker-");
  11. checkPermission();
  12. }

ForkJoinPool的四个核心参数

  1. parallelism:并行级别,决定工作线程的数量,默认缺省:Runtime.getRuntime().availableProcessors()
  2. ForkJoinWorkerThreadFactory:创建线程,缺省:DefaultForkJoinWorkerThreadFactory创建线程
  3. UncaughtExceptionHandler:异常处理器
  4. _boolean _asyncMode :队列工作模式,asyncMode ? FIFO_QUEUE : LIFO_QUEUE, 先进先出,后进先出

按类型提交不同任务

任务提交是ForkJoinPool的核心能力之一,提交任务有三种方式:

返回值 方法
提交异步执行 void execute(ForkJoinTask task)
execute(Runnable task)
普通不可分割的任务
仅有任务窃取的性能提升
等待并获取结果 T invoke(ForkJoinTask task) 任务结束后发挥泛型结果,
如果task = null 报错
提交执行获取Future结果 ForkJoinTask submit(ForkJoinTask task)
submit(Callabletask)
submit(Runnable task)
submit(Runnable task, T result)
  1. //递归任务 用于计算数组总和
  2. LongSum ls = new LongSum(array, 0, array.length);
  3. // 构建ForkJoinPool
  4. ForkJoinPool fjp = new ForkJoinPool(12);
  5. //ForkJoin计算数组总和
  6. ForkJoinTask<Long> result = fjp.submit(ls);

ForkJoinTask

任务的实际载体
定义任务执行是的具体逻辑和拆分逻辑,
各个任务独立
image.png

ForkJoinPool 的工作原理

image.png

  • 内部维护多个工作队列,invoke,submit提交
  • 每个线程对应一个工作线程WorkQueue, 双端队列,存放对象是任务ForkJoinTask
  • 当每个线程产生新任务时,会放入工作队列的top,并且工作线程会处理自己队列里的任务是LIFO,从top取
  • 窃取任务时从队列的base位置,窃取别的线程的任务时FIFO
  • join时,如果join的任务未完成,会先处理其他任务,等待完成
  • 如果自己的完成,也没有窃取,会休眠

工作窃取

性能保证的关键之一,窃取繁忙线程的任务,减少线程竞争的可能性
work-strealing queues 窃取对了
内部类WorkQueue实现

  1. // Instance fields
  2. volatile int scanState; // versioned, <0: inactive; odd:scanning
  3. int stackPred; // pool stack (ctl) predecessor
  4. int nsteals; // number of steals
  5. int hint; // randomization and stealer index hint
  6. int config; // pool index and mode
  7. volatile int qlock; // 1: locked, < 0: terminate; else 0
  8. volatile int base; // index of next slot for poll
  9. int top; // index of next slot for push
  10. ForkJoinTask<?>[] array; // the elements (initially unallocated)
  11. final ForkJoinPool pool; // the containing pool (may be null)
  12. final ForkJoinWorkerThread owner; // owning thread or null if shared
  13. volatile Thread parker; // == owner during call to park; else null
  14. volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
  15. volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer

Deques的特殊形式
支持三种操作

  1. push 当前线程
  2. pop 当前线程
  3. poll 窃取 其他线程

工作窃取的运行流畅取
image.png

  • 优点:充分利用线程,进行并行计算,减少线程间的竞争
  • 缺点:只有一个任务时poll 与 pop竞争,多个线程,多个队列资源要求较高

思考:为什么这么设计,工作线程总是从头部获取任务,窃取线程从尾部获取任务?
自己取任务,取最近的,任务可能还在CPU缓存中,处理效率高,
而偷,为了避免竞争,从尾部降低竞争的可能性,

另外:由于任务时可分割的,较早的任务可能粒度较大,还未分割,交给空闲线程更适合


工作队列WorkQueue

  • 双向链表,
  • 每个ForkJoinWorkThread 线程都有属于自己的WorkQueue,但不是每个WorkQueue都有对应的线程
  • 没有ForkJoinWorkThread的WorkQueue保存的是 subminnion,l来自外部的提交,队列中下标是偶数位

image.png

ForkJoinWorkerThread

image.png
用来执行任务的线程,用来区分非ForkJoinWorkerThread线程提交的task
image.png
流程图:
https://www.processon.com/view/link/5db81f97e4b0c55537456e9a
image.png

总结

Fork/Join是一种基于分治算法的模型,在并发处理计算型任务时有着显著的优势。其效率的提升主要得益于两个方面:

  • 任务切分:将大的任务分割成更小粒度的小任务,让更多的线程参与执行;
  • 任务窃取:通过任务窃取,充分地利用空闲线程,并减少竞争。

在使用ForkJoinPool时,需要特别注意任务的类型是否为纯函数计算类型,也就是这些任务不应该关心状态或者外界的变化,这样才是最安全的做法。如果是阻塞类型任务,那么你需要谨慎评估技术方案。虽然ForkJoinPool也能处理阻塞类型任务,但可能会带来复杂的管理成本。

ForkJOinPool.common.externaPush 并行流用的是个,注意点

使用示例

  1. LongSum ls = new LongSum(array, 0, array.length);
  2. ForkJoinPool fjp = new ForkJoinPool(NCPU);
  3. ForkJoinTask<Long> result = fjp.submit(ls);
  4. public class LongSum extends RecursiveTask<Long> {
  5. // 任务拆分最小阈值
  6. static final int SEQUENTIAL_THRESHOLD = 10000000;
  7. int low;
  8. int high;
  9. int[] array;
  10. LongSum(int[] arr, int lo, int hi) {
  11. array = arr;
  12. low = lo;
  13. high = hi;
  14. }
  15. //需要写逻辑
  16. @Override
  17. protected Long compute() {
  18. //当任务拆分到小于等于阀值时开始求和
  19. if (high - low <= SEQUENTIAL_THRESHOLD) {
  20. long sum = 0;
  21. for (int i = low; i < high; ++i) {
  22. sum += array[i];
  23. }
  24. return sum;
  25. } else { // 任务过大继续拆分
  26. int mid = low + (high - low) / 2;
  27. LongSum left = new LongSum(array, low, mid);
  28. LongSum right = new LongSum(array, mid, high);
  29. // 提交任务
  30. left.fork();
  31. right.fork();
  32. //获取任务的执行结果,将阻塞当前线程直到对应的子任务完成运行并返回结果
  33. long rightAns = right.compute();
  34. long leftAns = left.join();
  35. return leftAns + rightAns;
  36. }
  37. }
  38. }