Java对Stream的定义

A sequence of elements supporting sequential and parallel aggregate operations.

解读一下:

  1. Stream是元素的集合,这点让Stream看起来用些类似Iterator;
  2. 可以支持顺序和并行的对原Stream进行汇聚的操作;

大家可以把Stream当成一个高级版本的Iterator。原始版本的Iterator,用户只能一个一个的遍历元素并对其执行某些操作;高级版本的Stream,用户只要给出需要对其包含的元素执行什么操作,比如“过滤掉长度大于10的字符串”、“获取每个字符串的首字母”等,具体这些操作如何应用到每个元素上,就给Stream就好了。

一、创建流的几种方式

  1. java.util.Arrays#stream(T[])
  2. java.util.stream.Stream#of(T...)底层调用的也是Arrays.stream
  3. java.util.Collection#stream
  4. java.util.stream.Stream#iterate
  5. java.util.stream.Stream#generate

1-3,是根据具体的数组或者集合对象,创建的流,在创建流之前,这些对象的大小(长度)已经确认,所以这个种方式的流,也被成为有限流。
而4-5中,创建流的方式,是无限大小的流(generate 最大是Long.MAX_VALUE),也被成为无限流,那么我们不可能就这样放任对象被无限创建,直到内存溢出,这样的无限流,也是配合limit使用,指定这个流生成的元素的个数。

二、流的中间操作和终端操作

通过查看Stream接口的抽象方法的定义,我们可以看到,这些方法,可以分成两种类型,一种返回类型为接口本身的Stream,另外一种是返回其他对象类型的,返回接口类型的,我们称这些方法为中间操作,返回其他具体类型的,我们称为终端操作;

中间操作,是什么操作? 我们先看下字符串操作StringBuilder的append的方法

  1. @Override
  2. public StringBuilder append(String str) {
  3. super.append(str);
  4. return this; //就是这样,返回对象本身;然后我们就可以像操作StringBuilder的append一样,可以连接操作;
  5. }
  6. StringBuilder sb = new StringBuilder();
  7. sb.append("a").append("b").append("c");

终端操作,是指返回最终的结果,例如我们常用的forEach,内部迭代;

操作 类型 返回类型 使用的类型/函数式接口 函数描述符
filter 中间 Stream Predicate T -> boolean
distinct 中间
Stream
skip 中间
Stream long
limit 中间
Stream long
map 中间 Stream Function T -> R
flatMap 中间 Stream Function> T -> Stream
sorted 中间
Stream Comparator (T,T) -> int
anyMatch 终端 boolean Predicate T -> boolean
noneMatch 终端 boolean Predicate T -> boolean
allMatch 终端 boolean Predicate T -> boolean
findAny 终端 Optional
findFirst 终端 Optional
forEach 终端 void Comsumer T -> void
collect 终端 R Collector
reduce 终端
Optional BinaryOperator (T,T) -> T
count 终端 long

三、stream的操作

filter操作

  1. Stream<T> filter(Predicate<? super T> predicate);
  2. 这个方法,传入一个Predicate的函数接口,这个接口传入一个泛型参数T,做完操作之后,返回一个boolean值;
  3. filter方法的作用,是对这个boolean做判断,返回判断之后的对象
  4. String[] dd = { "a", "b", "c" };
  5. Stream<String> stream = Arrays.stream(dd);
  6. stream.filter(str -> str.equals("a")).forEach(System.out::println);//返回字符串为a的值

map操作

  1. <R> Stream<R> map(Function<? super T, ? extends R> mapper);
  2. 这个方法传入一个Function的函数式接口,这个接口,接收一个泛型T,返回泛型R
  3. map函数的定义,返回的流,表示的泛型是R对象,这个表示,调用这个函数后,可以改变返回的类型
  4. public class TestJava8 {
  5. public static void main(String[] args) {
  6. Integer[] dd = { 1, 2, 3 };
  7. Stream<Integer> stream = Arrays.stream(dd);
  8. stream.map(str -> Integer.toString(str)).forEach(str -> {
  9. System.out.println(str);// 1 ,2 ,3
  10. System.out.println(str.getClass());// class java.lang.String
  11. });
  12. List<Emp> list = Arrays.asList(new Emp("a"), new Emp("b"), new Emp("c"));
  13. list.stream().map(emp -> emp.getName()).forEach(str -> {
  14. System.out.println(str);
  15. });
  16. }
  17. @Data
  18. public static class Emp {
  19. private String name;
  20. public Emp() {
  21. super();
  22. }
  23. public Emp(String name) {
  24. super();
  25. this.name = name;
  26. }
  27. }
  28. }

flatMap操作

  1. <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
  2. 这个接口,跟map一样,接收一个Fucntion的函数式接口,不同的是,Function接收的泛型参数,第二个参数是一个Stream流;
  3. 方法,返回的也是泛型R,具体的作用是把两个流,变成一个流返回

其他中间操作

  1. //去重复
  2. Stream<T> distinct();
  3. //排序
  4. Stream<T> sorted();
  5. //根据属性排序
  6. Stream<T> sorted(Comparator<? super T> comparator);
  7. //对对象的进行操作
  8. Stream<T> peek(Consumer<? super T> action);
  9. //截断--取先maxSize个对象
  10. Stream<T> limit(long maxSize);
  11. //截断--忽略前N个对象
  12. Stream<T> skip(long n);

终端操作forEachOrdered和forEach

  1. void forEach(Consumer<? super T> action);
  2. void forEachOrdered(Consumer<? super T> action);

这两个函数都是对集合的流,进行遍历操作,是属于内部迭代。

  1. List<String> strs = Arrays.asList("a", "b", "c");
  2. strs.stream().forEachOrdered(System.out::print);//abc
  3. System.out.println();
  4. strs.stream().forEach(System.out::print);//abc
  5. System.out.println();
  6. strs.parallelStream().forEachOrdered(System.out::print);//abc
  7. System.out.println();
  8. strs.parallelStream().forEach(System.out::print);//bca

先看第一段输出和第二段输出,使用的是stream的流,这个是一个串行流,也就是程序是串行执行的,所有看到遍历的结果都是按照集合的元素放入的顺序;
看第三段和第四段输出,使用的parallelStream的流,这个流表示一个并行流,也就是在程序内部迭代的时候,会帮你免费的并行处理
第三段代码的forEachOrdered表示严格按照顺序取数据,forEach在并行中,随机排列了;这个也可以看出来,在并行的程序中,如果对处理之后的数据,没有顺序的要求,使用forEach的效率,肯定是要更好的

规约操作reduce

Streamreduce方法,翻译过来是聚合或者是汇聚成一个的意思,由于Stream本身就代表着一堆数据,那stream.reduce()方法顾名思义就是把一堆数据聚合成一个数据。

流底层核心其实是Spliterator接口的一个实现,而这个Spliterator接口其实本身就是Fork/Join并行框架的一个实现,所以归根结底要明白流的工作方式,就要明白一下Fork/Join框架的基本思想,即:以递归的方式将可以并行的任务拆分成更小的子任务,然后将每个子任务的结果合并起来生成整体的最后结果。
**
理解了方法本身的意思以及流的工作方式,再结合到一起理解一下stream.reduce()方法,即用Fork/Join的方式把一堆数据聚合成一个数据,因此可以画出reduce方法的运行草图

  1. T reduce(T identity, BinaryOperator<T> accumulator);
  2. 这个函数,接受2个参数,第一个表示初始值,第二个值,传入的是一个函数式接口BinaryOperator,这个接口继承BiFunction;计算的表达式的规则;
  1. <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
  2. BinaryOperator是供多线程使用的,如果不在Stream中声明使用多线程,就不会使用子任务,自然也不会调用到该方法。另外多线程下使用BinaryOperator的时候是需要考虑线程安全的问题。
  3. 另外自问自答下为什么需要BinaryOperator
  4. 因为这个重载的方法和其他两个不相同,允许改变返回值,所以返回值并不一定是Collection的子类;因此必须显示的声明如何拼接两个子任务产生的结果。
  1. List<Integer> numbers = Stream.iterate(1, x -> x + 1).limit(10).collect(Collectors.toList());
  2. Integer aa = 0;
  3. for (Integer i : numbers) {
  4. aa += i;
  5. }
  6. Integer dd = numbers.stream().reduce(0, (a, b) -> a + b, (a, b) -> a - b);
  7. Optional<Integer> dd1 = numbers.stream().reduce((a, b) -> a + b);
  8. System.out.println(aa);
  9. System.out.println(dd);
  10. System.out.println(dd1.get());

规约操作collect

  1. package com.meinergy.vppd.business.api.test;
  2. import java.util.ArrayList;
  3. import java.util.Comparator;
  4. import java.util.IntSummaryStatistics;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.Optional;
  8. import java.util.Set;
  9. import java.util.function.Function;
  10. import java.util.stream.Collectors;
  11. import org.junit.Test;
  12. public class TestJava8 {
  13. public static List<Emp> list = new ArrayList<>();
  14. static {
  15. list.add(new Emp("上海", "小名", 17));
  16. list.add(new Emp("北京", "小红", 18));
  17. list.add(new Emp("深圳", "小蓝", 19));
  18. list.add(new Emp("广州", "小灰", 20));
  19. list.add(new Emp("杭州", "小黄", 21));
  20. list.add(new Emp("贵阳", "小白", 22));
  21. }
  22. @Test
  23. public void test1() {
  24. // 转list
  25. List<String> names = list.stream().map(emp -> emp.getName()).collect(Collectors.toList());
  26. // 转set
  27. Set<String> address = list.stream().map(emp -> emp.getName()).collect(Collectors.toSet());
  28. // 转map,需要指定key和value,Function.identity()表示当前的Emp对象本身
  29. Map<String, Emp> map = list.stream().collect(Collectors.toMap(Emp::getName, Function.identity()));
  30. // 计算元素中的个数
  31. Long count = list.stream().collect(Collectors.counting());
  32. // 数据求和 summingInt summingLong,summingDouble
  33. Integer sumAges = list.stream().collect(Collectors.summingInt(Emp::getAge));
  34. // 平均值 averagingInt,averagingDouble,averagingLong
  35. Double aveAges = list.stream().collect(Collectors.averagingInt(Emp::getAge));
  36. // 综合处理的,求最大值,最小值,平均值,求和操作
  37. // summarizingInt,summarizingLong,summarizingDouble
  38. IntSummaryStatistics intSummary = list.stream().collect(Collectors.summarizingInt(Emp::getAge));
  39. System.out.println(intSummary.getAverage());// 19.5
  40. System.out.println(intSummary.getMax());// 22
  41. System.out.println(intSummary.getMin());// 17
  42. System.out.println(intSummary.getSum());// 117
  43. // 连接字符串,当然也可以使用重载的方法,加上一些前缀,后缀和中间分隔符
  44. String strEmp = list.stream().map(emp -> emp.getName()).collect(Collectors.joining());
  45. String strEmp1 = list.stream().map(emp -> emp.getName()).collect(Collectors.joining("-中间的分隔符-"));
  46. String strEmp2 = list.stream().map(emp -> emp.getName()).collect(Collectors.joining("-中间的分隔符-", "前缀*", "&后缀"));
  47. System.out.println(strEmp);// 小名小红小蓝小灰小黄小白
  48. // 小名-中间的分隔符-小红-中间的分隔符-小蓝-中间的分隔符-小灰-中间的分隔符-小黄-中间的分隔符-小白
  49. System.out.println(strEmp1);
  50. // 前缀*小名-中间的分隔符-小红-中间的分隔符-小蓝-中间的分隔符-小灰-中间的分隔符-小黄-中间的分隔符-小白&后缀
  51. System.out.println(strEmp2);
  52. // maxBy 按照比较器中的比较结果刷选 最大值
  53. Optional<Integer> maxAge = list.stream().map(emp -> emp.getAge())
  54. .collect(Collectors.maxBy(Comparator.comparing(Function.identity())));
  55. // 最小值
  56. Optional<Integer> minAge = list.stream().map(emp -> emp.getAge())
  57. .collect(Collectors.minBy(Comparator.comparing(Function.identity())));
  58. System.out.println("max:" + maxAge);
  59. System.out.println("min:" + minAge);
  60. // 归约操作
  61. list.stream().map(emp -> emp.getAge()).collect(Collectors.reducing((x, y) -> x + y));
  62. list.stream().map(emp -> emp.getAge()).collect(Collectors.reducing(0, (x, y) -> x + y));
  63. // 分操作 groupingBy 根据地址,把原list进行分组
  64. Map<String, List<Emp>> mapGroup = list.stream().collect(Collectors.groupingBy(Emp::getAddress));
  65. // partitioningBy 分区操作 需要根据类型指定判断分区
  66. Map<Boolean, List<Integer>> partitioningMap = list.stream().map(emp -> emp.getAge())
  67. .collect(Collectors.partitioningBy(emp -> emp > 20));
  68. }
  69. @Data
  70. static class Emp {
  71. private String address;
  72. private String name;
  73. private Integer age;
  74. public Emp() {
  75. }
  76. public Emp(String address) {
  77. this.address = address;
  78. }
  79. public Emp(String name, Integer age) {
  80. this.name = name;
  81. this.age = age;
  82. }
  83. public Emp(String address, String name, Integer age) {
  84. super();
  85. this.address = address;
  86. this.name = name;
  87. this.age = age;
  88. }
  89. }
  90. }

四、Optional

很多的stream的终端操作,都返回了一个Optional对象,这个对象,是用来解决空指针的问题,而产生的一个类。

五、参考

java8 教程总章