1 对于Stream最终的收集操作有两个重载的版本:

  1. <R, A> R collect(Collector<? super T, A, R> collector);
  2. <R> R collect(Supplier<R> supplier,
  3. BiConsumer<R, ? super T> accumulator,
  4. BiConsumer<R, R> combiner);

要理解这两个方法,先看看 Collector 的几个方法。

3 Collector【重要】 Collectors工具类:他的内部类CollectorImpl就是实现了Collector并重写了以下方法的

Collector接口提供了几个方法,这里的方法都是Stream.collect()方法中真实要用的,组合起来就是创建一个空结果容器,将元素添加到容器中,将结果世纪到容器之后,再转换成目标类型返回;(如果是并行的话,中间还有一步是合并中间结果)

  1. /**
  2. * @param <T> Stream中的元素类型
  3. * @param <A> 收集过程中的临时中间类型
  4. * @param <R> 收集完成后输出结果的类型
  5. **/
  6. public interface Collector<T, A, R> {
  7. /**
  8. * 提供一个结果容器。
  9. */
  10. Supplier<A> supplier();
  11. /**
  12. * 两个参数的消费型接口。将迭代中的当前元素添加到结果容器中。
  13. */
  14. BiConsumer<A, T> accumulator();
  15. /**
  16. * 转换型接口,两个相同输入,同类型的输出。
  17. * 在并行处理中,将各个子Stream返回结果合并。
  18. */
  19. BinaryOperator<A> combiner();
  20. /**
  21. * 收集操作的最后一步。
  22. * 转换型接口,用于将收集过程中的临时中间类型元素(A)转换为结果类型(R)。
  23. */
  24. Function<A, R> finisher();
  25. /**
  26. * 返回一个集合。描述了被收集的流的一些特性。
  27. * 比如:是否是顺序相关的、支不支持并行等。
  28. * 详情见:3.4 Characteristics
  29. */
  30. Set<Characteristics> characteristics();
  31. }

3.1 Stream.collect()源码

源码中是这样的:

  1. public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
  2. A container;
  3. if (
  4. // 并行流(stream().parallel()方法被调用)的情况
  5. isParallel()
  6. &&
  7. // 优化提示中包含 CONCURRENT 属性
  8. (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
  9. &&
  10. // 流是无序的
  11. (!isOrdered()
  12. ||
  13. // 优化提示中包含 UNORDERED 属性
  14. collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
  15. container = collector.supplier().get();
  16. BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
  17. forEach(u -> accumulator.accept(container, u));
  18. }
  19. else {
  20. container = evaluate(ReduceOps.makeRef(collector));
  21. }
  22. return
  23. // 优化提示中包含 IDENTITY_FINISH(恒等函数) 属性?
  24. collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
  25. ?
  26. // 直接将中间临时元素(A)当做最终结果(R)
  27. (R) container
  28. :
  29. // 使用 finisher 转换成最终结果
  30. collector.finisher().apply(container);
  31. }

从以上源码可以大致得出下面的两个流程图:

3.2 串行流的执行过程

串行流执行过程.png

3.3 并行流的执行过程

并行流的执行过程.png

3.4 Characteristics

  1. enum Characteristics {
  2. /**
  3. * 支持多线程调用,并行收集。
  4. */
  5. CONCURRENT,
  6. /**
  7. * 流中元素是无序的,收集过程不受元素先后顺序的影响。
  8. */
  9. UNORDERED,
  10. /**
  11. * 恒等函数。
  12. * 此时不再使用finisher再转换一次元素。直接将中间元素(A)当做最终结果(R)。
  13. */
  14. IDENTITY_FINISH
  15. }

这三个特性的常用组合在 Collectors 工具类中也有定义:

  1. static final Set<Collector.Characteristics> CH_CONCURRENT_ID
  2. = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT,
  3. Collector.Characteristics.UNORDERED,
  4. Collector.Characteristics.IDENTITY_FINISH));
  5. static final Set<Collector.Characteristics> CH_CONCURRENT_NOID
  6. = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT,
  7. Collector.Characteristics.UNORDERED));
  8. static final Set<Collector.Characteristics> CH_ID
  9. = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
  10. static final Set<Collector.Characteristics> CH_UNORDERED_ID
  11. = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.UNORDERED,
  12. Collector.Characteristics.IDENTITY_FINISH));
  13. static final Set<Collector.Characteristics> CH_NOID = Collections.emptySet();