Java
Collection集合可以通过parallelStream()的得到一个并行流。

  1. Stream<Integer> stream = new ArrayList<Integer>().parallelStream();

串行流也可以通过parallel()方法转为并行流

  1. Stream<Integer> stream = new ArrayList<Integer>().stream().parallel();

对并行流的疑问

  • 串行流和并行流哪个效率更高?(这还有疑问吗?肯定是并行流呀?sure?)
  • 并行流得到的结果是否一定准确?
  • 它的实现机制是什么样的?
  • 开发中可以使用并行流嘛?

现在就来深入了解一下Java8的这个新特性——并行流

并行流的效率是否更高

在Java8以前,遍历一个长度非常大的集合往往非常麻烦,如需要使用多个线程配合synchronized,Lock和Atomic原子引用等进行遍历,且不说多线程之间的调度,多线程同步API的上手成本也比较高。
现在有更为简单的遍历方式,且不局限于遍历集合。
先往一个List添加10万条记录,代码比较简单,单条记录的内容使用UUID随机生成的英文字符串填充

  1. List<String> list = new ArrayList<String>();
  2. for (int i = 0; i < 100000; i++) {
  3. list.add(UUID.randomUUID().toString());
  4. }

普通for循环该List,然后将每条记录中的a替换成b

  1. for (int i = 0; i < list.size(); i++) {
  2. String s = list.get(i);
  3. String replace = s.replace("a", "b");
  4. }

注意:这里使用String replace = s.replace("a", "b");这一行代码作为简单的业务处理,而不是System.out.println(s),因为打印的时候存在synchronized同步机制,会严重影响并行流的效率!
增强for循环

  1. for (String s : list) {
  2. String replace = s.replace("a", "b");
  3. }

串行流

  1. list.stream().forEach((s)->{
  2. String replace = s.replace("a", "b");
  3. });

并行流

  1. list.parallelStream().forEach((s)->{
  2. String replace = s.replace("a", "b");
  3. });

在保证执行机器一样的情况下,上述遍历代码各执行十次,取执行时间的平均值,单位毫秒,结果如下:
Java8 中的 ParallelStream 并行流 - 图1
从结果中可知,在数据量较大的情况下,普通for,增强for和串行流的差距并不是很大,而并行流则以肉眼可见的差距领先于另外三者
数据量较大的情况下,并行流的遍历效率数倍于顺序遍历,在小数据量的情况下,并行流的效率还会那么高吗?
将上面10万的数据量改为1000,然后重复一百次取平均值,结果如下:
Java8 中的 ParallelStream 并行流 - 图2
对结果进行分析,现在开发中比较少见的普通for遍历集合的方式,居然是顺序遍历中速度最快的!而它的改进版增强for速度小逊于普通for。
究其原因,是增强for内部使用迭代器进行遍历,需要维护ArrayList中的size变量,故而增加了时间开销。
而串行流的时间开销确实有点迷,可能的原因是开启流和关闭流的时间开销比较大
并行流花费的时间任然优秀于另外的三种遍历方式!
不过,有一点需要注意的是,并行流在执行时,CPU的占用会比另外三者高
现在可以得到一个结论,并行流在大数据量时,对比其它的遍历方式有几倍的提升,而在数据量比较小时,提升不明显。

并行流处理结果是否准确

这个准确,举个例子来说,希望遍历打印一个存有0 1 2 3 4 5 6 7 8 9的list,如0 1 2 3 4 5 6 7 8 9,代码可能会这么写

  1. //数据
  2. List<Integer> list = new ArrayList<>();
  3. for (int i = 0; i < 10; i++) {
  4. list.add(i);
  5. }
  6. //遍历打印
  7. list.stream().forEach(i -> System.out.print(i + " "));

打印的结果如下:

  1. 0 1 2 3 4 5 6 7 8 9

结果没有任何问题,如果是并行流呢?遍历代码如下

  1. list.parallelStream().forEach(i -> System.out.print(i + " "));

打印的结果如下:

  1. 6 5 1 0 9 3 7 8 2 4

第二次打印的结果如下:

  1. 6 5 0 1 7 9 8 3 4 2

可以看到打印出来的顺序是混乱无规律的
那是什么原因导致的呢?
并行流内部使用了默认的ForkJoinPool线程池,所以它默认的线程数量就是处理器的数量,通过Runtime.getRuntime().availableProcessors()可以得到这个值。
测试电脑的线程数是12,这意味着并行流最多可以将任务划分为12个小模块进行处理,然后再合并计算得到结果
如将0~9这是个数字进行划分:

  1. 0 1 2 3 4 5 6 7 8 9
  2. 第一次划分得到两个小模块:
  3. 0 1 2 3 4
  4. 5 6 7 8 9
  5. 第二次划分得到四个小模块:
  6. 0 1 2
  7. 3 4
  8. 5 6 7
  9. 8 9
  10. 第三次划分得到八个小模块:
  11. 0 1
  12. 2
  13. 3
  14. 4
  15. 5 6
  16. 7
  17. 8
  18. 9
  19. 第三次划分时,2 3 4这些数据,明显已经不能再继续划分,故而2 3 4 这些数据可以先进行打印
  20. 第四次划分得到10个小模块:
  21. 0
  22. 1
  23. 2
  24. 3
  25. 4
  26. 5
  27. 6
  28. 7
  29. 8
  30. 9
  31. 这些小模块在无法继续细分后就会被打印,而打印处理的时候为了提高效率,不分先后顺序,故而造成打印的乱序

结合以上的测试数据,可以得到这样一个结论,当需要遍历的数据,存在强顺序性时,不能使用并行流,如顺序打印0~9;不要求顺序性时,可以使用并行流以提高效率,如将集合中的字符串中的”a”替换成”b”

并行流的实现机制

在Java7时,就已经提供了一个并发执行任务的API,Fork/Join,将一个大任务,拆分成若干个小任务, 再将各个小任务的运行结果汇总成最终的结果。
Java8 中的 ParallelStream 并行流 - 图3
而在java8提供的并行流中,在实现Fork/Join的基础上,还用了工作窃取模式来获取各个小模块的运行结果,使之效率更高!
也可以使用Fock/Join机制,模仿一下并行流的实现过程。
如:进行数据的累加

  1. public class ForkJionCalculate extends RecursiveTask<Long> {
  2. private long start;
  3. private long end;
  4. /**
  5. * 临界值
  6. */
  7. private static final long THRESHOLD = 10000L;
  8. public ForkJionCalculate(long start, long end) {
  9. this.start = start;
  10. this.end = end;
  11. }
  12. /**
  13. * 计算方法
  14. * @return
  15. */
  16. @Override
  17. protected Long compute() {
  18. long length = end - start;
  19. if (length <= THRESHOLD) {
  20. long sum = 0;
  21. for (long i = start; i <= end; i++) {
  22. sum += i;
  23. }
  24. return sum;
  25. } else {
  26. long middle = (start + end) / 2;
  27. ForkJionCalculate left = new ForkJionCalculate(start, middle);
  28. left.fork();//拆分,并将该子任务压入线程队列
  29. ForkJionCalculate right = new ForkJionCalculate(middle + 1, end);
  30. right.fork();
  31. return left.join() + right.join();
  32. }
  33. }
  34. }

处理类需要实现RecursiveTask<T>接口,还需指定一个临界值,临界值的作用就是指定将任务拆分到什么程度就不拆了
测试代码:

  1. public static void main(String[] args) {
  2. Instant start = Instant.now();
  3. ForkJoinPool pool = new ForkJoinPool();
  4. ForkJionCalculate task = new ForkJionCalculate(0, 10000000000L);
  5. Long sum = pool.invoke(task);
  6. System.out.println(sum);
  7. Instant end = Instant.now();
  8. System.out.println("耗费时间:" + Duration.between(start, end).toMillis());
  9. }

并行流的适用场景

其实Java这门编程语言其实有很多种用途,通过swing类库可以构建图形用户界面,配合ParallelGC进行一些科学计算任务,不过最广泛的用途,还是作为一门服务器语言,开发服务器应用,以这种方式进行测试。
使用SpringBoot构建一个工程,然后写一个控制器类,在控制器类中,如上进行1000和10万的数据量测试
另外使用PostMan发送1000并发调用该接口,取平均时间,单位为毫秒值
控制器类测试代码:

  1. @RequestMapping("/parallel")
  2. @ResponseBody
  3. public String parallel() {
  4. //生成测试数据
  5. List<String> list = new ArrayList<>();
  6. for (int i = 0; i < 1000; i++) {
  7. list.add(UUID.randomUUID().toString());
  8. }
  9. //普通for遍历
  10. for (int i = 0; i < 1000; i++) {
  11. String s = list.get(i);
  12. String replace = s.replace("a", "b");
  13. }
  14. return "SUCCESS";
  15. }

数据量1000时,每次请求消耗的时间
Java8 中的 ParallelStream 并行流 - 图4
数据量10W时,每次请求消耗的时间
Java8 中的 ParallelStream 并行流 - 图5
在之前的测试中,并行流对比其他的遍历方式都有两倍以上的差距,而在并发量较大的情况下,服务器线程本身就处于繁忙的状态,即使使用并行流,优化的空间也不是很大,而且CPU的占用率也会比较高。故而可以看到,并行流在数据量1000或者10万时,提升不是特别明显。
但是并不是说并行流不能用于平常的开发中,如CPU本身的负载不高的情况下,还是可以使用的;在一些定时任务的项目中,为了缩短定时任务的执行时间,也可以斟酌使用。
最后总结一下:在数据量比较大的情况下,CPU负载本身不是很高,不要求顺序执行的时候,可以使用并行流。