• 如此简洁强大的API是如何实现的呢?
  • Stream的效率如何,每次方法调用都会导致一次迭代吗?
  • 自动并行又是如何做到的,线程数是多少?

    实例

    假如我们现在有这样一个需求:找出字符串数组中以A开头的最长字符串。

方案1:

原理解析 - 图1
算法描述:

  • 循环数组,找出所有以A打头的字符串,生成list1
  • 循环list1,计算字符串长度,生成list2
  • 循环list2,找出长度最大值 -> 5

这样实现明显有两个弊端:

  1. 迭代次数多,迭代了3次和函数调用次数相同。
  2. 中间数据多,产生了list1和list2两个中间数据。

这样的时空效率明显不能够接受。

方案2:

  1. int longest = 0;
  2. for(String str : strings){
  3. if(str.startsWith("A")){ // 1. filter(), 保留以A开头的字符串
  4. int len = str.length(); // 2. mapToInt(), 转换成长度
  5. longest = Math.max(len, longest); // 3. max(), 保留最长的长度
  6. }
  7. }

我们通过对意图的理解,将3次循环转换为1次循环,也避免了存储中间结果,这就是流水线,也即一个对象通过1次流水线就完成了所有操作,而不用通过3次。

Stream流水线解决方案

我们大致能够想到,应当采取某种方式记录下用户的每一步操作,当用户调用结束时将之前的所有操作叠加在一起,在一个循环中全部执行掉,沿着这个思路,有几个问题需要解决。

  • 用户的操作如何记录?
  • 操作如何叠加?
  • 叠加之后如何执行?
  • 执行后的结果(如果有)将记录在哪里?

    操作如何记录?

    Stream中使用Stage的概念来描述一个完整的操作,具体是由<数据来源,操作,回调函数>构成的三元组,并用某种实例化后的PipelineHelper来代表Stage,将具有先后循序的各个Stage连到一起。就构成了流水线。

Stream相关接口和类的继承关系如下:
原理解析 - 图2
图中Head用于表示第一个Stage,即调用调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操作;StatelessOpStatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作。

方案1使用流水线技术可画为如下图:
原理解析 - 图3
图中通过Collection.stream()方法得到Head也就是stage0,紧接着调用一系列的中间操作,不断产生新的Stream。这些Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage都记录了前一个Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是Stream记录操作的方式。

操作如何叠加?

以上只是解决了操作记录的问题,要想让流水线起到应有的作用我们需要一种将所有操作叠加到一起的方案。你可能会觉得这很简单,只需要从流水线的head开始依次执行每一步的操作(包括回调函数)就行了。这听起来似乎是可行的,但是你忽略了前一个Stage并不知道下一个Stage的操作是什么(map,filter….?),也就无法调用。
换句话说当前Stage只包含自己的操作和回调函数(听起来像句废话,就像厕所隔间一样,你只知道自己的隔间是自己,下一个隔间是谁你并不知道),因此需要一种协议来协调Stage之间的调用关系。

这种协议由Sink接口完成,Sink接口包含的方法如下表所示:

方法名 作用
void begin(long size) 开始遍历元素之前调用该方法,通知Sink做好准备。
void end() 所有元素遍历完成之后调用,通知Sink没有更多的元素了。
boolean cancellationRequested() 是否可以结束操作,可以让短路操作尽早结束。
void accept(T t) 遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。

我们分析下面例子是如何执行的:

public class StreamDemo {
    public static void main(String[] args) {
        List<String> strings = List.of("Apple", "bug", "ABC", "Dog");
        strings = new ArrayList<>(strings);
        OptionalInt max
                = strings.stream()
                //无状态中间操作
                .filter(s -> s.startsWith("A"))
                //无状态中间操作
                .mapToInt(String::length)
                //有状态中间操作
                .sorted()
                //非短路终端操作
                .max();
    }
}

我们看一个复杂例子来理解Slink是如何协调有状态操作的。
Stream.sorted()方法将对Stream中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操作封装成Sink的呢?sorted()一种可能封装的Sink代码如下:

// Stream.sort()方法用到的Sink实现
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    private ArrayList<T> list;                      // 存放用于排序的元素
    // 构造函数
    RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
        super(downstream, comparator);// dowmstream 是下游Slink,comparator是比较类的回调函数
    }
    @Override
    public void begin(long size) {
        ...
        // 告诉Slink中间数据结构大小,创建一个存放排序元素的列表
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }

    @Override
    public void end() {
        list.sort(comparator);// 只有元素全部接收之后才能开始排序
        downstream.begin(list.size());
        if (!cancellationWasRequested) {// 下游Sink不包含短路操作
            list.forEach(downstream::accept);// 2. 将处理结果传递给流水线下游的Sink
        }
        else {// 下游Sink包含短路操作
            for (T t : list) {// 每次都调用cancellationRequested()询问是否可以结束处理。
                if (downstream.cancellationRequested()) break;
                downstream.accept(t);// 2. 将处理结果传递给流水线下游的Sink
            }
        }
        downstream.end();
        list = null;
    }

    @Override
    public void accept(T t) {
        list.add(t);// 1. 使用当前Sink包装动作处理t,只是简单的将元素添加到中间列表当中
    }
}

上述代码完美的展现了Sink的四个接口方法是如何协同工作的:

  1. 首先begin()方法告诉Sink参与排序的元素个数,方便确定中间结果容器的的大小;
  2. 之后通过accept()方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素;
  3. 最后end()方法告诉Sink所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink;
  4. 如果下游的Sink是短路操作,将结果传递给下游时不断询问下游cancellationRequested()是否可以结束处理。

https://blog.csdn.net/jpf254/article/details/79470986