出自 案例

流是什么

流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,你可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理,你无需写任何多线程代码了!我们会在第7章中详细解释流和并行化是怎么工作的。我们简单看看使用流的好处吧。下面两段代码都是用来返回低热量的菜肴名称的,并按照卡路里排序,一个是用Java 7写的,另一个是用Java 8的流写的。比较一下。不用太担心Java 8代码怎么写,我们在接下来的几节里会详细解释。

  1. List<Dish> menu = Arrays.asList(
  2. new Dish("pork", false, 800, Dish.Type.MEAT),
  3. new Dish("beef", false, 700, Dish.Type.MEAT),
  4. new Dish("chicken", false, 400, Dish.Type.MEAT),
  5. new Dish("french fries", true, 530, Dish.Type.OTHER),
  6. new Dish("rice", true, 350, Dish.Type.OTHER),
  7. new Dish("season fruit", true, 120, Dish.Type.OTHER),
  8. new Dish("pizza", true, 550, Dish.Type.OTHER),
  9. new Dish("prawns", false, 300, Dish.Type.FISH),
  10. new Dish("salmon", false, 450, Dish.Type.FISH) )

之前(Java 7):

  1. List<Dish> lowCaloricDishes = new ArrayList<>();
  2. for(Dish d: menu){
  3. if(d.getCalories() < 400){ ←─用累加器筛选元素
  4. lowCaloricDishes.add(d);
  5. }
  6. }
  7. Collections.sort(lowCaloricDishes, new Comparator<Dish>() { ←─用匿名类对菜肴排序
  8. public int compare(Dish d1, Dish d2){
  9. return Integer.compare(d1.getCalories(), d2.getCalories());
  10. }
  11. });
  12. List<String> lowCaloricDishesName = new ArrayList<>();
  13. for(Dish d: lowCaloricDishes){
  14. lowCaloricDishesName.add(d.getName()); ←─处理排序后的菜名列表
  15. }

在这段代码中,你用了一个“垃圾变量”lowCaloricDishes。它唯一的作用就是作为一次性的中间容器。在Java 8中,实现的细节被放在它本该归属的库里了。
之后(Java 8):

  1. import static java.util.Comparator.comparing;
  2. import static java.util.stream.Collectors.toList;
  3. List<String> lowCaloricDishesName =
  4. menu.stream()
  5. .filter(d -> d.getCalories() < 400) ←─选出400卡路里以下的菜肴
  6. .sorted(comparing(Dish::getCalories)) ←─按照卡路里排序
  7. .map(Dish::getName) ←─提取菜肴的名称
  8. .collect(toList()); ←─将所有名称保存在List

为了利用多核架构并行执行这段代码,你只需要把stream()换成parallelStream():

  1. List<String> lowCaloricDishesName =
  2. menu.parallelStream()
  3. .filter(d -> d.getCalories() < 400)
  4. .sorted(comparing(Dishes::getCalories))
  5. .map(Dish::getName)
  6. .collect(toList());

你可能会想,在调用parallelStream方法的时候到底发生了什么。用了多少个线程?对性能有多大提升?第7章会详细讨论这些问题。现在,你可以看出,从软件工程师的角度来看,新的方法有几个显而易见的好处。

1.代码是以声明性方式写的:说明想要完成什么(筛选热量低的菜肴)而不是说明如何实现一个操作(利用循环和if条件等控制流语句)。你在前面的章节中也看到了,这种方法加上行为参数化让你可以轻松应对变化的需求:你很容易再创建一个代码版本,利用Lambda表达式来筛选高卡路里的菜肴,而用不着去复制粘贴代码。

2 . 你可以把几个基础操作链接起来,来表达复杂的数据处理流水线(在filter后面接上sorted、map和collect操作),同时保持代码清晰可读。filter的结果被传给了sorted方法,再传给map方法,最后传给collect方法。

因为filter、sorted、map和collect等操作是与具体线程模型无关的高层次构件,所以它们的内部实现可以是单线程的,也可能透明地充分利用你的多核架构!在实践中,这意味着你用不着为了让某些数据处理任务并行而去操心线程和锁了,Stream API都替你做好了!

流的定义

简短的定义就是“从支持数据处理操作的源生成的元素序列”。让我们一步步剖析这个定义。

  • 元素序列——就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如ArrayList 与 LinkedList)。但流的目的在于表达计算,比如你前面见到的filter、sorted和map。集合讲的是数据,流讲的是计算。我们会在后面几节中详细解释这个思想。
  • 源——流会使用一个提供数据的源,如集合、数组或输入/输出资源。 请注意,从有序集合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。
  • 数据处理操作——流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中的常用操作,如filter、map、reduce、find、match、sort等。流操作可以顺序执行,也可并行执行。

此外,流操作有两个重要的特点。

  • 流水线——很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大的流水线。这让我们下一章中的一些优化成为可能,如延迟和短路。流水线的操作可以看作对数据源进行数据库式查询。
  • 内部迭代——与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。

让我们来看一段能够体现所有这些概念的代码:

  1. import static java.util.stream.Collectors.toList;
  2. List<String> threeHighCaloricDishNames =
  3. menu.stream() ←─从menu获得流(菜肴列表)
  4. .filter(d -> d.getCalories() > 300) ←─建立操作流水线:首先选出高热量的菜肴
  5. .map(Dish::getName) ←─获取菜名
  6. .limit(3) ←─只选择头三个
  7. .collect(toList()); ←─将结果保存在另一个List
  8. System.out.println(threeHighCaloricDishNames); ←─结果是[pork, beef,chicken]
  • filter——接受Lambda,从流中排除某些元素。在本例中,通过传递lambda d -> d.getCalories() > 300,选择出热量超过300卡路里的菜肴。
  • map——接受一个Lambda,将元素转换成其他形式或提取信息。在本例中,通过传递方法引用Dish::getName,相当于Lambda d -> d.getName(),提取了每道菜的菜名。
  • limit——截断流,使其元素不超过给定数量。
  • collect——将流转换为其他形式。在本例中,流被转换为一个列表。它看起来有点儿像变魔术,我们在第6章中会详细解释collect的工作原理。现在,你可以把collect看作能够接受各种方案作为参数,并将流中的元素累积成为一个汇总结果的操作。这里的toList()就是将流转换为列表的方案。
    在本例中,我们先是对menu调用stream方法,由菜单得到一个流。数据源是菜肴列表(菜单),它给流提供一个元素序列。接下来,对流应用一系列数据处理操作:filter、map、limit和collect。除了collect之外,所有这些操作都会返回另一个流,这样它们就可以接成一条流水线,于是就可以看作对源的一个查询。最后,collect操作开始处理流水线,并返回结果(它和别的操作不一样,因为它返回的不是流,在这里是一个List)。在调用collect之前,没有任何结果产生,实际上根本就没有从menu里选择元素。你可以这么理解:链中的方法调用都在排队等待,直到调用collect。

image.png

流只能遍历一次

请注意,和迭代器类似,流只能遍历一次。遍历完之后,我们就说这个流已经被消费掉了。你可以从原始数据源那里再获得一个新的流来重新遍历一遍,就像迭代器一样(这里假设它是集合之类的可重复的源,如果是I/O通道就没戏了)。例如,以下代码会抛出一个异常,说流已被消费掉了:

  1. List<String> title = Arrays.asList("Java8", "In", "Action");
  2. Stream<String> s = title.stream();
  3. s.forEach(System.out::println); //打印标题中的每个单词
  4. s.forEach(System.out::println); //java.lang.IllegalStateException:流已被操作或关闭