类图

Stream 顺序流源码分析 - 图1

概念解释

Pipline和Stage

Pipline是流水线,表示一整个流程。Stage表示流水线的其中一个阶段。是一个比较抽象层面的描述,因为stage主要表示一种逻辑上的顺序关系,而具体每一个阶段要干嘛、怎么干,使用Sink来进行描述。

  1. new stream //stage 0
  2. .filter() //stage 1
  3. .sort() //stage 2

Stream 顺序流源码分析 - 图2

Sink

直译为水槽,生活中水槽的作用无非

  • 打开水龙头,知道有水要来
  • 水在水槽里, 可以进行一些操作
  • 打开水闸,放水

Java中的Sink核心功能为:

  • begin(): 告诉该水槽水流要来了,可以进行一些初始化操作
  • accept():接受水流,然后进行操作
  • end():水流全部处理完了。

看一个sort()的示例,sort这个stage的目的就是对所有水流进行排序,然后再流到下游。

  1. private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
  2. private T[] array; //要进行排序,需要一个数组进行缓存
  3. private int offset;
  4. SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
  5. super(sink, comparator);
  6. }
  7. @Override
  8. @SuppressWarnings("unchecked")
  9. public void begin(long size) {
  10. if (size >= Nodes.MAX_ARRAY_SIZE)
  11. throw new IllegalArgumentException(Nodes.BAD_SIZE);
  12. //上游调用begin(),通知sort进行初始化操作,生产一个数组
  13. array = (T[]) new Object[(int) size];
  14. }
  15. //上游调用end()方法,告诉sort水已经全部流过来了。sort开始执行操作
  16. @Override
  17. public void end() {
  18. //操作
  19. Arrays.sort(array, 0, offset, comparator);
  20. //告诉sort的下游准备接受水流
  21. downstream.begin(offset);
  22. //一个个元素的传递给下游
  23. if (!cancellationWasRequested) {
  24. for (int i = 0; i < offset; i++)
  25. downstream.accept(array[i]);
  26. }
  27. else {
  28. for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
  29. downstream.accept(array[i]);
  30. }
  31. //告诉下游水流传递结束
  32. downstream.end();
  33. //缓存清空
  34. array = null;
  35. }
  36. //上游调用accept()方法,将水流存储到到sort的缓存数组中
  37. @Override
  38. public void accept(T t) {
  39. array[offset++] = t;
  40. }
  41. }

创建Head

几个疑问

  • 官方说Stream不存储数据,那么数据保存在那里呢?

    使用方式

    可以使用Stream.of()创建一个流,例如

    1. //创建方式 of()
    2. Stream<Integer> stream = Stream.of(1, 2, 3);

    源码分析

    of()方法调用

    1. StreamSupport.stream(Arrays.spliterator(arr, 0, arr.length), false);

    stream()方法逻辑:

    1. public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    2. Objects.requireNonNull(spliterator);
    3. return new ReferencePipeline.Head<>(spliterator,
    4. StreamOpFlag.fromCharacteristics(spliterator),
    5. parallel);
    6. }

    构造方法的主要了逻辑要一直super()到AbstractPipeline

    1. /**
    2. * The source spliterator. Only valid for the head pipeline.
    3. * Before the pipeline is consumed if non-null then {@code sourceSupplier}
    4. * must be null. After the pipeline is consumed if non-null then is set to
    5. * null.
    6. */
    7. private Spliterator<?> sourceSpliterator;
    8. /**
    9. * Constructor for the head of a stream pipeline.
    10. *
    11. * @param source {@code Spliterator} describing the stream source
    12. * @param sourceFlags the source flags for the stream source, described in
    13. * {@link StreamOpFlag}
    14. * @param parallel {@code true} if the pipeline is parallel
    15. */
    16. AbstractPipeline(Spliterator<?> source,
    17. int sourceFlags, boolean parallel) {
    18. this.previousStage = null;
    19. //使用一个字段指向数据集合的Spliterator,后续终结操作的时候,引用的方式操作数据
    20. this.sourceSpliterator = source;
    21. this.sourceStage = this;
    22. this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    23. // The following is an optimization of:
    24. // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    25. this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    26. this.depth = 0;
    27. this.parallel = parallel;
    28. }

    调用了ReferencePipeline.Head<>,返回一个Head对象。Head是ReferencePipeline的子类。可以理解为Head是流水线的第一个stage。
    Stream 顺序流源码分析 - 图3

疑问解答

  1. 官方说Stream不存储数据,那么数据保存在那里呢?

Head中保存数据源的Spliterator对象,后续操作Spliterator的方式操作数据

中间操作

几个疑问

  1. 各个中间操作是如何进行关联的?
    • 一个个的操作封装成了一个个的statelessOpstateFulOp对象,以双向链表的方法串起来。
  2. 如何执行完一个中间操作,然后执行下一个?
    • Sink类负责流水线操作的承接上下游和执行操作的任务,核心方法为begain()、accept()、end()。
  3. 有状态的中间操作是怎么保存状态的?
    • 有状态的中间操作封装成stateFulOp对象,各自都有独立的逻辑,具体的参考sort()的实现逻辑。
  4. 懒加载如何实现的
    • 每个中间操作调用后,只是append在流程的尾部,保存了关联关系而已。
    • 流水线操作的启动,要交给wrapAndCopyInto()方法调用Head的Sink()操作,而wrapAndCopyInto()方法都需要由Ter

      使用方式

      ```java Stream st = headStream.filter(e-> e=1).distinct().sort();

//等同于

Stream afterFilter = headStream.filter(e -> e != 2); Stream afterDistinct = afterFilter.distinct(); Stream afterSort = afterDistinct.sort();

  1. <a name="BzfO4"></a>
  2. ### Filter
  3. 执行filter(op)会发生什么?
  4. ```java
  5. Stream<Integer> afterFilter = head.filter(e -> e = 1);

filter()方法定义在Stream类,实现在ReferencePipeline类。

  1. //ReferencePipeline.class
  2. @Override
  3. public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
  4. Objects.requireNonNull(predicate);
  5. // 返回一个StatelessOp类
  6. // 构造函数参数为(this,)
  7. return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
  8. StreamOpFlag.NOT_SIZED) {
  9. @Override
  10. Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
  11. return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
  12. @Override
  13. public void begin(long size) {
  14. downstream.begin(-1);
  15. }
  16. @Override
  17. public void accept(P_OUT u) {
  18. if (predicate.test(u))
  19. downstream.accept(u);
  20. }
  21. };
  22. }
  23. };
  24. }

返回一个StatelessOp类(因为filter是一个无状态操作),看下StatelessOp类,是一个静态抽象内部类,继承了ReferencePipeline类。

  1. //ReferencePipeline.class
  2. /**
  3. * Base class for a stateless intermediate stage of a Stream.
  4. *
  5. * @param <E_IN> type of elements in the upstream source
  6. * @param <E_OUT> type of elements in produced by this stage
  7. * @since 1.8
  8. */
  9. abstract static class StatelessOp<E_IN, E_OUT>
  10. extends ReferencePipeline<E_IN, E_OUT> {
  11. /**
  12. * Construct a new Stream by appending a stateless intermediate
  13. * operation to an existing stream.
  14. *
  15. * @param upstream The upstream pipeline stage
  16. * @param inputShape The stream shape for the upstream pipeline stage
  17. * @param opFlags Operation flags for the new stage
  18. */
  19. StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
  20. StreamShape inputShape,
  21. int opFlags) {
  22. super(upstream, opFlags);
  23. assert upstream.getOutputShape() == inputShape;
  24. }
  25. @Override
  26. final boolean opIsStateful() {
  27. return false;
  28. }
  29. }

中间super()会执行AbstractPipeline�类的构造方法, 连接stage之间的关系

  1. //AbstractPipeline.class
  2. /**
  3. * Constructor for appending an intermediate operation stage onto an
  4. * existing pipeline.
  5. *
  6. * @param previousStage the upstream pipeline stage
  7. * @param opFlags the operation flags for the new stage, described in
  8. * {@link StreamOpFlag}
  9. */
  10. AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
  11. if (previousStage.linkedOrConsumed)
  12. throw new IllegalStateException(MSG_STREAM_LINKED);
  13. previousStage.linkedOrConsumed = true;
  14. previousStage.nextStage = this;
  15. this.previousStage = previousStage;
  16. this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
  17. this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
  18. this.sourceStage = previousStage.sourceStage;
  19. if (opIsStateful())
  20. sourceStage.sourceAnyStateful = true;
  21. this.depth = previousStage.depth + 1;
  22. }

Stream 顺序流源码分析 - 图4

�Distinct

示例

  1. Stream<Integer> afterDistinct = afterFilter.distinct();

distinct的方法实现在ReferencePipeline类下

  1. @Override
  2. public final Stream<P_OUT> distinct() {
  3. return DistinctOps.makeRef(this);
  4. }

调用DistinctOps类的makeRef()方法,返回一个StatefulOp类,并重写了4个方法,实现逻辑在opWrapSink()中:

  1. /**
  2. * Appends a "distinct" operation to the provided stream, and returns the
  3. * new stream.
  4. *
  5. * @param <T> the type of both input and output elements
  6. * @param upstream a reference stream with element type T
  7. * @return the new stream
  8. */
  9. static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
  10. // 返回一个StatefulOp类
  11. return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
  12. StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
  13. // 重写了以下几个方法,内容省略...
  14. <P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {...}
  15. @Override
  16. <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
  17. Spliterator<P_IN> spliterator,
  18. IntFunction<T[]> generator) {...}
  19. @Override
  20. <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {...}
  21. @Override
  22. Sink<T> opWrapSink(int flags, Sink<T> sink) {
  23. Objects.requireNonNull(sink);
  24. if (StreamOpFlag.DISTINCT.isKnown(flags)) {
  25. ...
  26. } else if (StreamOpFlag.SORTED.isKnown(flags)) {
  27. ...
  28. } else {
  29. // 返回一个SinkChainedReference类
  30. return new Sink.ChainedReference<T, T>(sink) {
  31. //使用一个Set缓存数据,进行去重
  32. Set<T> seen;
  33. //当上游通知begin的时候,初始化Set
  34. @Override
  35. public void begin(long size) {
  36. seen = new HashSet<>();
  37. downstream.begin(-1);
  38. }
  39. //略
  40. @Override
  41. public void end() {
  42. seen = null;
  43. downstream.end();
  44. }
  45. //如果已经存在,之间抛弃
  46. @Override
  47. public void accept(T t) {
  48. if (!seen.contains(t)) {
  49. seen.add(t);
  50. downstream.accept(t);
  51. }
  52. }
  53. };
  54. }
  55. }
  56. };
  57. }

StatefulOp类与StatelessOp类相似,都是继承了ReferencePipeline类,然后中间super()页会执行AbstractPipeline�类的构造方法, 连接stage之间的关系

  1. /**
  2. * Base class for a stateful intermediate stage of a Stream.
  3. *
  4. * @param <E_IN> type of elements in the upstream source
  5. * @param <E_OUT> type of elements in produced by this stage
  6. * @since 1.8
  7. */
  8. abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
  9. //省略
  10. }

Stream 顺序流源码分析 - 图5
至于其他的中间操作,套路都是类似的,操作逻辑封装在opWrapSink()方法里, 可以慢慢的看。

终结操作

几个疑问

  1. 终结方法是如何进行操作的?
    1. 终结操作的实现里面都有调用evaluate()方法,这个方法最后会warp所有操作变成一串sink,然后从头开始执行begin(),accept(),end()方法
  2. 如何实现由终结操作触发流的运作的?
    1. 触发的开关是wrapAndCopyInto(),这个方法只有在终结操作中才有被调用。
  3. 如何保证一个流一次只能执行一个终结方法?
    1. evaluate()方法中执行一次后linkedOrConsumed设为true,后续再出发evaluate()方法就会异常。�

      使用方式

      列举四种终结操作,在Stream提供的API中,也是四类: Stream 顺序流源码分析 - 图6```java // 计算流中元素数量,FindOP long count = afterLimit.count();

// 遍历所有元素,ForEachOp afterLimit.forEach(System.out::printl);

// 获取第一个元素,MatchOp Optional any = afterLimit.findFirst();

// 判断是否,ReduceOp boolean flag = afterLimit.anyMatch(i -> i == 1);

  1. <a name="RhEbI"></a>
  2. ### count()
  3. 在`ReferencePipeline`类中实现
  4. ```java
  5. @Override
  6. public final long count() {
  7. // 调用mapToLong将所有元素变成1,然后计算sum
  8. return mapToLong(e -> 1L).sum();
  9. }

maoToLong()

mapToLong()方法,是一个中间操作

  1. @Override
  2. public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
  3. Objects.requireNonNull(mapper);
  4. return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
  5. StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
  6. @Override
  7. Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
  8. return new Sink.ChainedReference<P_OUT, Long>(sink) {
  9. @Override
  10. public void accept(P_OUT u) {
  11. //
  12. downstream.accept(mapper.applyAsLong(u));
  13. }
  14. };
  15. }
  16. };
  17. }

ToLongFunction是一个函数式接口类, accept()里的逻辑便是e -> 1L.

  1. @FunctionalInterface
  2. public interface ToLongFunction<T> {
  3. /**
  4. * Applies this function to the given argument.
  5. *
  6. * @param value the function argument
  7. * @return the function result
  8. */
  9. long applyAsLong(T value);
  10. }

看下Sum()方法,在LongPipeline类中,传入参数是一个Long::sum, sum的作用是相加两个值。

  1. @Override
  2. public final long sum() {
  3. // use better algorithm to compensate for intermediate overflow?
  4. return reduce(0, Long::sum);
  5. }
  6. //public static long sum(long a, long b) {
  7. // return a + b;
  8. //}

reduce()

reduce方法,将操作函数op封装成一个Sink,makeLong()的作用就是会生产一个Sink

  1. @Override
  2. public final long reduce(long identity, LongBinaryOperator op) {
  3. return evaluate(ReduceOps.makeLong(identity, op));
  4. }
  1. /**
  2. * Constructs a {@code TerminalOp} that implements a functional reduce on
  3. * {@code long} values.
  4. *
  5. * @param identity the identity for the combining function
  6. * @param operator the combining function
  7. * @return a {@code TerminalOp} implementing the reduction
  8. */
  9. public static TerminalOp<Long, Long>
  10. makeLong(long identity, LongBinaryOperator operator) {
  11. Objects.requireNonNull(operator);
  12. class ReducingSink
  13. implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
  14. //state是一个用作记录的值
  15. private long state;
  16. @Override
  17. public void begin(long size) {
  18. state = identity;
  19. }
  20. //参数传进来的就是sun(),所以这里的accept()的作用就是对state不断进行累加
  21. @Override
  22. public void accept(long t) {
  23. state = operator.applyAsLong(state, t);
  24. }
  25. @Override
  26. public Long get() {
  27. return state;
  28. }
  29. @Override
  30. public void combine(ReducingSink other) {
  31. accept(other.state);
  32. }
  33. }
  34. return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
  35. @Override
  36. public ReducingSink makeSink() {
  37. return new ReducingSink();
  38. }
  39. };
  40. }

evaluate()

看回evaluate()方法,这个方法用来执行终结操作的

  1. final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
  2. assert getOutputShape() == terminalOp.inputShape();
  3. //判断流是否已被使用
  4. if (linkedOrConsumed)
  5. throw new IllegalStateException(MSG_STREAM_LINKED);
  6. //设置使用标记为true
  7. linkedOrConsumed = true;
  8. //根据流类型,执行相应的推断操作
  9. return isParallel()
  10. ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
  11. : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
  12. }

关注时序流的推断方法,可以看到这个方法的实现分为四种,对应上面提到的四类类型操作,count属于ReduceOp,进去看下。
image.png

  1. //from ReduceOps
  2. @Override
  3. public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
  4. Spliterator<P_IN> spliterator) {
  5. //调用wrapAndCopyInto()方法
  6. return helper.wrapAndCopyInto(makeSink(), spliterator).get();
  7. }

wrapAndCopyInto()

保证所有stage -> sink链表,然后执行copyInto()方法

  1. @Override
  2. final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
  3. copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
  4. return sink;
  5. }

warpSink()

就是在这里,从后向前,包装所有的stage阶段,形成一条sink链表。这样将之前一个个stage的链表结构包装成一个个Sink。

  1. @Override
  2. @SuppressWarnings("unchecked")
  3. final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
  4. Objects.requireNonNull(sink);
  5. //从后向前遍历
  6. for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
  7. //执行每个opWrapSink()方法,这个方法在每个操作类中都进行了重写
  8. sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
  9. }
  10. //返回头sink
  11. return (Sink<P_IN>) sink;
  12. }

copyInto()

这个方法是整个流水线的启动开关,流程如下:

  1. 调用第一个sink的begin()方法
  2. 执行数据源的spliterator.forEachRemaining(wrappendSink)方法遍历调用accept()方法
  3. end() 通知结束

    1. @Override
    2. final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    3. Objects.requireNonNull(wrappedSink);
    4. if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
    5. //通知第一个sink,做好准备接受流
    6. wrappedSink.begin(spliterator.getExactSizeIfKnown());
    7. //执行
    8. spliterator.forEachRemaining(wrappedSink);
    9. wrappedSink.end();
    10. }
    11. else {
    12. copyIntoWithCancel(wrappedSink, spliterator);
    13. }
    14. }

    forEachRemaining()

    在各个容器中都有实现forEachRemaining()这个方法,在ArrayList中:

    1. public boolean tryAdvance(Consumer<? super E> action) {
    2. if (action == null)
    3. throw new NullPointerException();
    4. int hi = getFence(), i = index;
    5. if (i < hi) {
    6. index = i + 1;
    7. @SuppressWarnings("unchecked") E e = (E)list.elementData[i];
    8. //执行accept()方法
    9. action.accept(e);
    10. if (list.modCount != expectedModCount)
    11. throw new ConcurrentModificationException();
    12. return true;
    13. }
    14. return false;
    15. }

    其他终结操作

    forEach()

    ReferencePipeline类中,实现了forEach()方法, ```java // from ReferencePipeline.class

@Override public void forEach(Consumer<? super P_OUT> action) { // ForEachOps.. evaluate(ForEachOps.makeRef(action, false)); }

  1. evaluate后面的逻辑与count后面的一样了,略。
  2. <a name="s1dNc"></a>
  3. #### findFirst() anyMatch()
  4. findFirst()和anyMatch()的逻辑也不再多说了,一个套路,看下实现
  5. ```java
  6. @Override
  7. public final Optional<P_OUT> findFirst() {
  8. return evaluate(FindOps.makeRef(true));
  9. }
  10. @Override
  11. public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
  12. return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
  13. }