并行流

  • 在前面的章节中,我们学习了Stream和函数式的一些内容。
  • 也在前面提及,Java8的并行操作。在Java 7之前,并行处理数据集合非常麻烦。第一,你得明确地把包含数据的数据结构分成若干子部分。第二,你要给每个子部分分配一个独立的线程。第三,你需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分结果合并起来。Java 7引入了一个叫作分支/合并的框架,让这些操作更稳定、更不易出错。在后面会讲解fork/join框架。
  • 假设你需要写一个方法,接受数字n作为参数,并返回从1到给定参数的所有数字的和。一个直接(也许有点土)的方法是生成一个无穷大的数字流,把它限制到给定的数目,然后用对两个数字求和的BinaryOperator来归约这个流

    1. private static Long getSum(Long n) {
    2. return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);
    3. }
  • 上述的代码是顺序流,将顺序流转换为并行流,只需要下面这样做

    1. private static Long getParallelSum(Long n) {
    2. return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
    3. }
  • 在上面的代码中,对流中所有数字求和的归纳过程的执行方式之前的求和差不多,不同之处在于Stream在内部分成了几块。可以对不同的块进行独立归纳操作。

  • 并行归纳示意图

image.png

  • 配置并行流使用的线程池:并行流用的线程是从哪儿来的?有多少个?怎么自定义这个过程呢?
  • 并行流内部默认使用ForkJoinPool,默认的线程数量就是处理器数量。
  • 测试性能:测量对前n个自然数求和的函数的性能

    1. public class Test3 {
    2. public static void main(String[] args) {
    3. // 无包装
    4. System.out.println(measureSumPerf(o -> getForSum(o), 10000000) + ":ms");
    5. // 顺序流
    6. System.out.println(measureSumPerf(o -> getSum(o), 10000000) + ":ms");
    7. // 并行流
    8. System.out.println(measureSumPerf(o -> getParallelSum(o), 10000000) + ":ms");
    9. }
    10. private static Long measureSumPerf(Function<Long, Long> adder, long n) {
    11. long fastest = Long.MAX_VALUE;
    12. for (int i = 0; i < 10; i++) {
    13. long start = System.currentTimeMillis();
    14. adder.apply(n);
    15. long end = System.currentTimeMillis();
    16. if ((end - start) < fastest) {
    17. fastest = end - start;
    18. }
    19. }
    20. return fastest;
    21. }
    22. private static Long getForSum(Long n) {
    23. return LongStream.rangeClosed(0, n).sum();
    24. }
    25. private static Long getSum(Long n) {
    26. return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);
    27. }
    28. private static Long getParallelSum(Long n) {
    29. return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
    30. }
    31. }
    32. // 3:ms
    33. // 55:ms
    34. // 46:ms
  • 上面的测试结果,你会发发现,并行和串行的速度相差不大,但是不使用包装类进行计算,快了十几倍。iterate生成的是装箱的对象,必须拆箱成数字才能求和。

  • 但是还可以更快的求和 ```java public class Test3 { public static void main(String[] args) {

    1. // 串行无包装
    2. System.out.println(measureSumPerf(o -> getForSum(o), 10000000) + ":ms");
    3. // 并行无包装
    4. System.out.println(measureSumPerf(o -> getFastForSum(o), 10000000) + ":ms");
    5. // 顺序流
    6. System.out.println(measureSumPerf(o -> getSum(o), 10000000) + ":ms");
    7. // 并行流
    8. System.out.println(measureSumPerf(o -> getParallelSum(o), 10000000) + ":ms");

    }

    private static Long measureSumPerf(Function adder, long n) {

    1. long fastest = Long.MAX_VALUE;
    2. for (int i = 0; i < 10; i++) {
    3. long start = System.currentTimeMillis();
    4. adder.apply(n);
    5. long end = System.currentTimeMillis();
    6. if ((end - start) < fastest) {
    7. fastest = end - start;
    8. }
    9. }
    10. return fastest;

    }

    private static Long getForSum(Long n) {

    1. return LongStream.rangeClosed(0, n).sum();

    }

    private static Long getFastForSum(Long n) {

    1. return LongStream.rangeClosed(0, n).parallel().sum();

    }

    private static Long getSum(Long n) {

    1. return Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);

    }

    private static Long getParallelSum(Long n) {

    1. return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);

    } }

// 3:ms // 1:ms 新加的,快到飞起有没有 // 58:ms // 50:ms

  1. - 效果显而易见,如上。
  2. - 在使用并行Stream加速代码之前,你必须确保用得对;如果结果错了,算得快就毫无意义了
  3. - **如何正确使用并行流**
  4. - 错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。下面是另一种实现对前n个自然数求和的方法,但这会改变一个共享累加器
  5. ```java
  6. public class Test4 {
  7. public static void main(String[] args) {
  8. // 计算准确
  9. System.out.println(sideEffectSum(10000000));
  10. // 计算不准确
  11. System.out.println(sideEffectParallelSum(10000000));
  12. }
  13. public static long sideEffectSum(long n) {
  14. Accumulator accumulator = new Accumulator();
  15. LongStream.rangeClosed(0, n).forEach(accumulator::add);
  16. return accumulator.total;
  17. }
  18. public static long sideEffectParallelSum(long n) {
  19. Accumulator accumulator = new Accumulator();
  20. LongStream.rangeClosed(0, n).parallel().forEach(accumulator::add);
  21. return accumulator.total;
  22. }
  23. }
  24. class Accumulator {
  25. public long total = 0;
  26. public void add(long value) {
  27. total += value;
  28. }
  29. }
  30. // 50000005000000
  31. // 9462163250051
  • 这是由于多个线程在同时访问累加器,执行total += value,而这一句虽然看似简单,却不是一个原子操作。问题的根源在于,forEach中调用的方法有副作用,它会改变多个线程共享的对象的可变状态。
  • 如何高效使用并行流
  • 如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事。我们在本节中已经指出,并行流并不总是比顺序流快。此外,并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的基准来检查其性能。
  • 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(IntStream、LongStream、DoubleStream)来避免这种操作,但凡有可能都应该用这些流。
  • 些操作本身在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny会比findFirst性能好,因为它不一定要按顺序来执行。你总是可以调用unordered方法来把有序流变成无序流。那么,如果你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用limit可能会比单个有序流(比如数据源是一个List)更高效。
  • 还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。
  • 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
  • 要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。最后,你将在7.3节中学到,你可以自己实现Spliterator来完全掌控分解过程。
  • 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。
  • 还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。
  • 流的数据源和可分解性 | | 可分解性 | | —- | —- | | ArrayList | 极佳 | | LinkedList | 差 | | InteStream.range | 极佳 | | Stream.iterate | 差 | | HashSet | 好 | | TreeSet | 好 |

  • 并行流背后使用的基础架构是Java 7中引入的分支/合并框架。并行汇总的示例证明了要想正确使用并行流,了解它的内部原理至关重要,所以我们会在下一节仔细研究分支/合并框架。

    分支/合并框架

  • 分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。首先来看看如何定义任务和子任务。

  • 使用RecursiveTask
  • 要把任务提交到这个池,必须创建RecursiveTask的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型。

    1. public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    2. // 唯一的抽象方法
    3. protected abstract V compute();
    4. }
  • forkjoin实现类伪代码

    1. if(任务足够小或不可切分){
    2. 顺序执行任务
    3. }else{
    4. 将任务分成两个子任务
    5. 递归调用本方法,拆分每个子任务,等待所有子任务完成
    6. 合并每个子任务的结果
    7. }

    image.png

  • 这只不过是著名的分治算法的并行版本而已

  • 用分支/合并框架执行并行求和;使用不同的操作执行。 ```java public class ForkJoinDemo extends RecursiveTask {

    private Long start; private Long end;

    // 临界值 private long temp = 1000L;

    public ForkJoinDemo(long start, long end) {

    1. this.start = start;
    2. this.end = end;

    }

    // 计算方法 @Override protected Long compute() {

    1. Long sum = 0L;
    2. if ((end - start) < temp) {
    3. for (Long i = start; i <= end; i++) {
    4. sum += i;
    5. }
    6. return sum;
    7. } else {
    8. // 分支合并计算
    9. long middle = (start + end) / 2;
    10. ForkJoinDemo forkJoin1 = new ForkJoinDemo(start, middle);
    11. // 拆分任务 把任务压入线程队列
    12. forkJoin1.fork();
    13. ForkJoinDemo forkJoin2 = new ForkJoinDemo(middle + 1, end);
    14. // 拆分任务 把任务压入线程队列
    15. forkJoin2.fork();
    16. return forkJoin1.join() + forkJoin2.join();
    17. }

    } }

class ForkJoinTest { public static void main(String[] args) { test1(); test2(); test3(); }

  1. /**
  2. * 普通程序员
  3. * sum = 500000000500000000 时间:2991 ms
  4. */
  5. public static void test1() {
  6. long start = System.currentTimeMillis();
  7. Long sum = 0L;
  8. for (Long i = 1L; i <= 10_0000_0000; i++) {
  9. sum += i;
  10. }
  11. long end = System.currentTimeMillis();
  12. System.out.println("sum = " + sum + " 时间:" + (end - start) + " ms");
  13. }
  14. /**
  15. * 中级程序员
  16. * sum = 500000000500000000 时间:1919 ms
  17. */
  18. public static void test2() {
  19. long start = System.currentTimeMillis();
  20. ForkJoinPool forkJoinPool = new ForkJoinPool();
  21. ForkJoinDemo forkJoinDemo = new ForkJoinDemo(1, 10_0000_0000);
  22. forkJoinPool.submit(forkJoinDemo);
  23. Long sum = 0L;
  24. try {
  25. sum = forkJoinDemo.get();
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. } catch (ExecutionException e) {
  29. e.printStackTrace();
  30. }
  31. long end = System.currentTimeMillis();
  32. System.out.println("sum = " + sum + " 时间:" + (end - start) + " ms");
  33. }
  34. /**
  35. * 高级程序员
  36. * sum = 500000000500000000 时间:147 ms
  37. */
  38. public static void test3() {
  39. long start = System.currentTimeMillis();
  40. // Stream 并行流
  41. long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
  42. long end = System.currentTimeMillis();
  43. System.out.println("sum = " + sum + " 时间:" + (end - start) + " ms");
  44. }

} // sum = 500000000500000000 时间:2911 ms // sum = 500000000500000000 时间:1926 ms // sum = 500000000500000000 时间:165 ms

  1. - 在实际应用时,使用多个ForkJoinPool是没有什么意义的。正是出于这个原因,一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用了。
  2. - 当把ForkJoinDemo任务传给ForkJoinPool时,这个任务就由池中的一个线程执行,这个线程会调用任务的compute方法。该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的ForkJoinDemo,而它们也由ForkJoinPool安排执行。因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步拆分的条件。
  3. - 这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。
  4. - 请看示意图:分支/合并算法
  5. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/1806904/1646546554769-5efbabf4-bff9-4e42-989f-c66ded023024.png#clientId=uc450be4b-40b2-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=434&id=uaadac0e8&margin=%5Bobject%20Object%5D&name=image.png&originHeight=868&originWidth=1570&originalType=binary&ratio=1&rotation=0&showTitle=false&size=92467&status=done&style=none&taskId=u00c902e8-f72e-4647-9853-b8e0c3e6027&title=&width=785)
  6. - **使用分支/合并框架的最佳做法**
  7. - 对一个任务调用join方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。
  8. - 不应该在RecursiveTask内部使用ForkJoinPoolinvoke方法。相反,你应该始终直接调用computefork方法,只有顺序代码才应该用invoke来启动并行计算。
  9. - 对子任务调用fork方法可以把它排进ForkJoinPool。同时对左边和右边的子任务调用它似乎很自然,但这样做的效率要比直接对其中一个调用compute低。这样做你可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。
  10. - 调试使用分支/合并框架的并行计算可能有点棘手。特别是你平常都在你喜欢的IDE里面看栈跟踪(stack trace)来找问题,但放在分支-合并计算上就不行了,因为调用compute的线程并不是概念上的调用方,后者是调用fork的那个。
  11. - 和并行流一样,你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计算快。我们已经说过,一个任务可以分解成多个独立的子任务,才能让性能在并行化时有所提升。所有这些子任务的运行时间都应该比分出新任务所花的时间长;一个惯用方法是把输入/输出放在一个子任务里,计算放在另一个里,这样计算就可以和输入/输出同时进行。此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑。就像任何其他Java代码一样,分支/合并框架需要“预热”或者说要执行几遍才会被JIT编译器优化。这就是为什么在测量性能之前跑几遍程序很重要,我们的测试框架就是这么做的。同时还要知道,编译器内置的优化可能会为顺序版本带来一些优势(例如执行死码分析——删去从未被使用的计算)
  12. - **工作窃取**
  13. - 分支/合并框架用工作窃取,每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。
  14. - 基于某些原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。
  15. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/1806904/1646547146791-12b64f4b-543d-4331-ad06-28753abb31f0.png#clientId=uc450be4b-40b2-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=283&id=u63e821e6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=566&originWidth=1448&originalType=binary&ratio=1&rotation=0&showTitle=false&size=59138&status=done&style=none&taskId=ub0ea6fe2-6b8f-4125-ad3e-2bcdae87026&title=&width=724)
  16. - 但是如果使用并行流,那么就不需要我们手动来进行拆分了。
  17. <a name="mX3bR"></a>
  18. # Spliterator
  19. - SpliteratorJava 8中加入的另一个新接口;这个名字代表“可分迭代器”(splitable iterator)。和Iterator一样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。
  20. - Java 8已经为集合框架中包含的所有数据结构提供了一个默认的Spliterator实现。集合实现了Spliterator接口,接口提供了一个spliterator方法。这个接口定义了若干方法,如下面的代码清单所示。
  21. ```java
  22. // T是Spliterator遍历的元素的类型
  23. public interface Spliterator<T> {
  24. // tryAdvance方法的行为类似于普通的Iterator,
  25. // 因为它会按顺序一个一个使用Spliterator中的元素,并且如果还有其他元素要遍历就返回true
  26. boolean tryAdvance(Consumer<? super T> action);
  27. // 但trySplit是专为Spliterator接口设计的,因为它可以把一些元素划出去分给第二个Spliterator(由该方法返回),让它们两个并行处理。
  28. Spliterator<T> trySplit();
  29. // Spliterator还可通过estimateSize方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值也有助于让拆分均匀一点。
  30. long estimateSize();
  31. int characteristics();
  32. }
  • 拆分过程
  • 第一步是对第一个Spliterator调用trySplit,生成第二个Spliterator。第二步对这两个Spliterator调用trysplit,这样总共就有了四个Spliterator。这个框架不断对Spliterator调用trySplit直到它返回null,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的Spliterator在调用trySplit时都返回了null。

image.png

  • 这个拆分过程也受Spliterator本身的特性影响,而特性是通过characteristics方法声明的。
  • Spliterator的特性 | 特性 | 含义 | | —- | —- | | ORDERED | 元素有既定的顺序(例如List),因此Spliterator在遍历和划分的时候也会遵循这一顺序 | | DISTINCT | 对于任意一对遍历过的元素x和y,x.equals(y) 返回false | | SORTED | 遍历的元素按照一个预定义的顺序排序 | | SIZED | 该Spliterator由一个已知大小的源建立(例如Set),因此estimateSize()返回的是准确值 | | NONNULL | 保证遍历的元素不会为null | | IMMUATABLE | Spliterator的数据源不能修改,这个意味着在遍历的时候不能添加、删除、修改任何元素 | | CONCURRENT | Spliterator的数据源可以被其他线程同时修改而无需同步 | | SUBSIZED | Spliterator和所有从他拆分出来的Spliterator都是SIZED |

自定义Spliterator

  • 我们要开发一个简单的方法来数数一个String中的单词数
  • 一个迭代式字数统计方法

    1. public class Test1 {
    2. public static void main(String[] args) {
    3. String s = "I am a Chinese I love her";
    4. System.out.println(countWords(s));
    5. }
    6. private static int countWords(String s) {
    7. int counter = 0;
    8. boolean lastSpace = true;
    9. for (char c : s.toCharArray()) {
    10. if (Character.isWhitespace(c)) {
    11. lastSpace = true;
    12. } else {
    13. if (lastSpace)
    14. counter++;
    15. lastSpace = false;
    16. }
    17. }
    18. return counter;
    19. }
    20. }
    21. // 7
  • 以函数式风格重写单词计数器 ```java public class Test2 { public static void main(String[] args) {

    1. String s = "I am a Chinese I love her";
    2. Stream<Character> stream = IntStream.range(0, s.length()).mapToObj(s::charAt);
    3. WordCounter reduce = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine);
    4. System.out.println(reduce.getCounter());

    } }

class WordCounter { private final int counter; private final boolean lastSpace;

  1. public WordCounter(int counter, boolean lastSpace) {
  2. this.counter = counter;
  3. this.lastSpace = lastSpace;
  4. }
  5. // 迭代方法
  6. public WordCounter accumulate(Character c) {
  7. if (Character.isWhitespace(c)) {
  8. return lastSpace ? this : new WordCounter(counter, true);
  9. } else {
  10. // 上一个字符是空格,而当前遍历的字符不是空格,单词计数器+1
  11. return lastSpace ? new WordCounter(counter + 1, false) : this;
  12. }
  13. }
  14. // 合并 2个 WordCounter
  15. public WordCounter combine(WordCounter wordCounter) {
  16. // 只关心总和
  17. return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
  18. }
  19. public int getCounter() {
  20. return counter;
  21. }

}

  1. - WordCounter并行工作
  2. ```java
  3. public class Test2 {
  4. public static void main(String[] args) {
  5. String s = "I am a Chinese I love her";
  6. Stream<Character> stream = IntStream.range(0, s.length()).mapToObj(s::charAt);
  7. WordCounter reduce = stream.parallel().reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine);
  8. System.out.println(reduce.getCounter());
  9. }
  10. }
  11. // 19
  • but 有错误,为啥呢,因为原始的String在任意位置拆分,所以有时一个词会被分为两个词,然后数了两次。这就说明,拆分流会影响结果,而把顺序流换成并行流就可能使结果出错。
  • 如何解决这个问题呢?解决方案就是要确保String不是在随机位置拆开的,而只能在词尾拆开。要做到这一点,你必须为Character实现一个Spliterator,它只能在两个词之间拆开String(如下所示),然后由此创建并行流。 ```java public class WordCounterSpliterator implements Spliterator { private final String string; private int currentChar = 0;

    public WordCounterSpliterator(String string) {

    1. this.string = string;

    }

    /**

    • tryAdvance方法把String中当前位置的Character传给了Consumer,并让位置加一。
    • 作为参数传递的Consumer是一个Java内部类,在遍历流时将要处理的Character传给了一系列要对其执行的函数。
    • 这里只有一个归约函数,即WordCounter类的accumulate方法。
    • 如果新的指针位置小于String的总长,且还有要遍历的Character,则tryAdvance返回true */ @Override public boolean tryAdvance(Consumer<? super Character> action) { // 处理当前字符 action.accept(string.charAt(currentChar++)); // 是否还有字符需要处理 return currentChar < string.length(); }

      /**

    • trySplit方法是Spliterator中最重要的一个方法,因为它定义了拆分要遍历的数据结构的逻辑。
    • 就像在代码清单7-1中实现的RecursiveTask的compute方法一样(分支/合并框架的使用方式),
    • 首先要设定不再进一步拆分的下限。
    • 这里用了一个非常低的下限——10个Character,
    • 仅仅是为了保证程序会对那个比较短的String做几次拆分。
    • 在实际应用中,就像分支/合并的例子那样,你肯定要用更高的下限来避免生成太多的任务。
    • 如果剩余的Character数量低于下限,你就返回null表示无需进一步拆分。
    • 相反,如果你需要执行拆分,就把试探的拆分位置设在要解析的String块的中间。
    • 但我们没有直接使用这个拆分位置,因为要避免把词在中间断开,于是就往前找,直到找到一个空格。
    • 一旦找到了适当的拆分位置,就可以创建一个新的Spliterator来遍历从当前位置到拆分位置的子串;把
    • 当前位置this设为拆分位置,因为之前的部分将由新Spliterator来处理,最后返回。 */ @Override public Spliterator trySplit() { int currentSize = string.length() - currentChar; if (currentSize < 10) {

      1. return null;

      } for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {

      1. if (Character.isWhitespace(string.charAt(splitPos))) {
      2. WordCounterSpliterator spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
      3. currentChar = splitPos;
      4. return spliterator;
      5. }

      } return null; }

      /**

    • 还需要遍历的元素的estimatedSize就是这个Spliterator解析的String的总长度和当前遍历的位置的差 */ @Override public long estimateSize() { return string.length() - currentChar; }

      /**

    • 最后,characteristic方法告诉框架
    • 这个Spliterator是
    • ORDERED(顺序就是String中各个Character的次序)、
    • SIZED(estimatedSize方法的返回值是精确的)、
    • SUBSIZED(trySplit方法创建的其他Spliterator也有确切大小)、
    • NONNULL(String中不能有为null的Character)、
    • IMMUTABLE(在解析String时不能再添加Character,因为String本身是一个不可变类)的 */ @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } }
  1. ```java
  2. public class Test3 {
  3. public static void main(String[] args) {
  4. String s = "I am a Chinese I love her";
  5. WordCounterSpliterator spliterator = new WordCounterSpliterator(s);
  6. Stream<Character> stream = StreamSupport.stream(spliterator, true);
  7. System.out.println(countWords(stream.parallel()));
  8. }
  9. private static int countWords(Stream<Character> stream) {
  10. WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine);
  11. return wordCounter.getCounter();
  12. }
  13. }

小结

  • 内部迭代让你可以并行处理一个流,而无需在代码中显式使用和协调不同的线程。
  • 虽然并行处理一个流很容易,却不能保证程序在所有情况下都运行得更快。并行软件的行为和性能有时是违反直觉的,因此一定要测量,确保你并没有把程序拖得更慢。
  • 像并行流那样对一个数据集并行执行操作可以提升性能,特别是要处理的元素数量庞大,或处理单个元素特别耗时的时候。
  • 从性能角度来看,使用正确的数据结构,如尽可能利用原始流而不是一般化的流,几乎总是比尝试并行化某些操作更为重要。
  • 分支/合并框架让你得以用递归方式将可以并行的任务拆分成更小的任务,在不同的线程上执行,然后将各个子任务的结果合并起来生成整体结果。
  • Spliterator定义了并行流如何拆分它要遍历的数据。

    参考文章

  • 《Java 8 in Action》

  • 《Java8函数式编程》