概念
Stream 是一个高级的迭代器,并不是一种数据结构,也不是一个集合,也不能存放数据,Stream 关注的是怎么把数据高效的处理,其实就是把数据在一个流水线中处理,就是一个 pipeline,在一端输出数据,在另一端可以得到结果,在中间有一系列的操作
外部迭代和内部迭代
外部迭代,就是我们传统的 for,while 方式进行数据迭代;
内部迭代,语法更加简洁,就是告诉 Stream 我们要做什么,并不关注如何实现细节;
public class StreamDemo01 {public static void main(String[] args) {// 外部迭代int[] nums = {1, 2, 3, 4, 5, 6, 7, 8, 9};int sum = 0;for (int num : nums) {sum += num;}System.out.println("sum = " + sum);// 使用 stream 进行内部迭代int sum1 = IntStream.of(nums).sum();System.out.println("sum1 = " + sum1);}}
中间操作 / 终止操作
是 中间操作 还是 终止操作,主要是看返回值类型,如果返回 stream 就是 中间操作,其他都是终止操作
惰性求值:所有的中间操作都不会执行,只有调用了终止操作,才会真正的开始执行,这就叫惰性求值
public static void main(String[] args) {// 外部迭代int[] nums = {1, 2, 3, 4, 5, 6, 7, 8, 9};int sum = 0;for (int num : nums) {sum += num;}System.out.println("sum = " + sum);// 使用 stream 进行内部迭代int sum1 = IntStream.of(nums).sum();System.out.println("sum1 = " + sum1);// map 就是中间操作(返回 stream 的操作)// sum 就是终止操作// 是 中间操作 还是 终止操作,主要是看返回值类型,如果返回 stream 就是 中间操作,其他都是 终止操作System.out.println(IntStream.of(nums).map(i -> i * 2).sum());System.out.println(IntStream.of(nums).map(StreamDemo01::doubleNum).sum());System.out.println("====================");// 惰性求值就是,终止操作没有调用的情况下,中间操作并不会执行IntStream intStream = IntStream.of(nums).map(StreamDemo01::doubleNum);System.out.println("惰性求值: " + intStream.sum());}public static int doubleNum(int i) {System.out.println("执行了乘以2");return i * 2;}
Stream 流的创建
流的操作通常包括:创建流 => 中间操作 **=> 中间操作 => … => 终止操作**
| 类型 | 相关方法 |
|---|---|
| 集合 | Collection.stream / parallelStream |
| 数组 | Arrarys.stream |
| 数字 Stream | IntStream / LongStream. range/rangeClosed |
| 自定义 | Stream.generate / iterate |
public static void main(String[] args) {// 从集合创建流List<String> list = new ArrayList<>();list.stream();list.parallelStream();// 从数组创建Arrays.stream(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9});// 创建数字流IntStream.of(1, 2, 3);IntStream.rangeClosed(1, 10);// 使用 Random 创建一个数字流new Random().ints().limit(10);// 自己产生流Stream.generate(() -> 10).limit(20);}
Stream 流的中间操作
| 状态 | 相关方法 | 说明 |
|---|---|---|
| 无状态操作 | map / mapToXxx | 转化 |
| flatMap / flatMapToXxx | 拉平操作,多层属性,且属性是一个集合 | |
| filter | 过滤 | |
| peek | 获取 | |
| unordered | 与并行流配合使用 | |
| 有状态操作 | distinct | 去重 |
| sorted | 排序 | |
| limit / skip | 限流 / 跳过 |
public static void main(String[] args) {String str = "my name is 007";// 把每个单词的长度打印出来Stream.of(str.split(" ")).map(String::length).forEach(System.out::println);System.out.println("================");// 过滤Stream.of(str.split(" ")).filter(s -> s.length() > 2).map(String::length).forEach(System.out::println);System.out.println("================");// flatMap A -> B 属性(是个集合),最终得到所有 A 元素里面的所有 B 属性的集合// IntStream / LongStream 并不是 Stream 的子类,所以需要进行装箱Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(i -> System.out.println((char) i.intValue()));System.out.println("================");// peek 通常用于 debug,是一个中间操作,而 forEach 是中止操作Stream.of(str.split(" ")).peek(System.out::println).forEach(System.out::println);System.out.println("================");// limit 主要用于无限流的限流操作new Random().ints().filter(i -> i > 100 && i < 1000).limit(10).forEach(System.out::println);}
Stream 流的终止操作
| 状态 | 相关方法 | 说明 |
|---|---|---|
| 非短路操作 | forEach / forEachOrdered | 迭代 / 迭代(与并行流相关,可以保证顺序) |
| collect / toArray | 收集为指定集合 / 转数组 | |
| reduce | 计算 | |
| min / max / count | 最小 / 最大 / 计数 | |
| 短路操作 | findFirst / findAny | 找第一个 / 任意 |
| allMatch / anyMatch / noneMatch | 完全匹配 / 任意匹配 / 完全匹配取反 |
public static void main(String[] args) {String str = "my name is 007";// 使用并行流str.chars().parallel().forEach(i -> System.out.print((char) i)); // is m 07nm0eyaSystem.out.println();// 使用并行流,forEachOrdered 可以包装顺序str.chars().parallel().forEachOrdered(i -> System.out.print((char) i)); // my name is 007System.out.println();// 收集到 ListList<String> collect = Stream.of(str.split(" ")).collect(Collectors.toList());System.out.println(collect); // [my, name, is, 007]System.out.println();// 使用 reduce 拼接字符串Optional<String> letters = Stream.of(str.split(" ")).reduce((s1, s2) -> {System.out.println("s1 = " + s1);System.out.println("s2 = " + s2);return s1 + " - " + s2;});System.out.println(letters.orElse("")); // my - name - is - 007// 使用 reduce 计算长度Integer length = Stream.of(str.split(" ")).map(s -> s.length()).reduce(0, (s1, s2) -> s1 + s2);System.out.println(length); // 11// max 的使用Optional<String> max = Stream.of(str.split(" ")).max((s1, s2) -> s1.length() - s2.length());System.out.println(max.get()); // name// count 的使用long count = Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).count();System.out.println(count); // 11// findFirst 短路操作OptionalInt first = new Random().ints().findFirst();System.out.println(first.getAsInt()); // 1626709974// findAny 短路操作OptionalInt any = new Random().ints().findAny();System.out.println(any.getAsInt()); // -1933666460// anyMatch,只要有一个符合条件,就返回 trueboolean b = new Random().ints().anyMatch(i -> i > 0);System.out.println(b); // true}
并行流
public static void main(String[] args) throws InterruptedException {// long count = IntStream.range(0, 10).peek(Stream_parallel::debug).count();// System.out.println(count);// 调用 parallel 产生一个并行流,并行流使用的线程池使用的是 ForkJoinPool.commonPool// 默认的线程数是当前机器 CPU 的个数// System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2"); // 手动修改线程数// long count1 = IntStream.range(0, 100).parallel().peek(Stream_parallel::debug).count();// System.out.println(count1);// 自己创建线程池,不使用默认线程池,防止任务阻塞ForkJoinPool pool = new ForkJoinPool(16);pool.submit(() -> IntStream.range(0, 100).parallel().peek(Stream_parallel::debug).count());pool.shutdown();synchronized (pool) {pool.wait();}// 想要实现这样的一个效果,第一步并行,第二步串行// 经过测试,多次调用 parallel 和 sequential 以最后一次为准IntStream.range(0, 10)// 调用 parallel 产生并行流.parallel().peek(Stream_parallel::debug)// 调用 sequential 产生串行流.sequential().peek(Stream_parallel::error).count();}public static void debug(int i) {System.out.println(Thread.currentThread().getName() + " debug " + i);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}}public static void error(int i) {System.err.println(Thread.currentThread().getName() + " error " + i);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}
收集器
@Dataclass Student {enum Gender {MALE, FEMALE}enum Grade {ONE, TOW, THREE, FOUR}public Student(String name, int age, Gender gender, Grade grade){this.name = name;this.age = age;this.gender = gender;this.grade = grade;}private String name;private int age;private Gender gender;private Grade grade;}public class Stream_collect {public static void main(String[] args) {List<Student> students = Arrays.asList(new Student("小米", 10, Student.Gender.FEMALE, Student.Grade.ONE),new Student("小黑", 9, Student.Gender.MALE, Student.Grade.THREE),new Student("小白", 8, Student.Gender.FEMALE, Student.Grade.TOW),new Student("小绿", 13, Student.Gender.MALE, Student.Grade.FOUR),new Student("小明", 6, Student.Gender.FEMALE, Student.Grade.ONE),new Student("小红", 6, Student.Gender.FEMALE, Student.Grade.FOUR),new Student("小紫", 9, Student.Gender.MALE, Student.Grade.TOW),new Student("小王", 12, Student.Gender.MALE, Student.Grade.TOW),new Student("小李", 11, Student.Gender.FEMALE, Student.Grade.FOUR),new Student("小张", 14, Student.Gender.MALE, Student.Grade.THREE));// 得到所有学生的年龄列表List<Integer> ageList = students.stream().map(Student::getAge).collect(Collectors.toList());System.out.println("所有学生的年龄: " + ageList);// 统计汇总信息IntSummaryStatistics ageStatistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));System.out.println("统计信息为: " + ageStatistics);// 分块(只能分成两块)Map<Boolean, List<Student>> partitionByGender = students.stream().collect(Collectors.partitioningBy(s -> s.getGender() == Student.Gender.MALE));// System.out.println("男女学生列表: " + partitionByGender);MapUtils.verbosePrint(System.out, "男女学生列表", partitionByGender);// 分组Map<Student.Grade, List<Student>> groupByGrade = students.stream().collect(Collectors.groupingBy(Student::getGrade));MapUtils.verbosePrint(System.out, "按班级分组", groupByGrade);// 得到所有班级的学生个数Map<Student.Grade, Long> countByGrade = students.stream().collect(Collectors.groupingBy(Student::getGrade, Collectors.counting()));MapUtils.verbosePrint(System.out, "所有班级的学生个数", countByGrade);}}
Stream 的运行机制
- 所有操作都是链式调用,一个元素只迭代一次
- 每一个中间操作,返回一个新的流,流里面有一个属性,sourceStage 都指向同一个地方 Head
- Stream 流的结构是:Head -> nextStage -> nextStage -> … -> null
- 有状态操作会把无状态操作截断,单独处理
- 并行环境下,有状态的中间操作并不一定能并行操作
parallel / sequential 这两个操作也是中间操作(也是返回 stream 流),但是他们不创建流,他们只需改 Head 的并行标志,所以多次调用改函数,都以最后一次为准 ```java public static void main(String[] args) { Random random = new Random(); Stream
stream = Stream.generate(random::nextInt) // 产生 500 个(无限流需要短路操作).limit(500)// 第一个无状态操作.peek(s -> println("peek1: " + s))// 第二个无状态操作.filter(s -> {println("filter: " + s);return s > 10000;})// 有状态操作.sorted((i1, i2) -> {println("sorted: " + i1 + ", " + i2);return i1.compareTo(i2);})// 又一个无状态操作.peek(s -> println("peek2: " + s))// 并行环境.parallel();
stream.count();
}
public static void println(String str) { System.out.println(Thread.currentThread().getName() + “: “ + str); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } ```
总结
- 惰性求值
- 中间操作:有状态 / 无状态操作,通常只有一个参数的,就是无状态,有两个参数的就是无状态,有状态的不能进行并行操作,无状态的可以并行操作
- 终止操作:短路操作,通常在无限流中使用短路操作
- parallel / sequential:不创建流,只修改 head 中的并行标志
- 收集器
- 分组
- 统计
- 运行机制
- 链式操作,Head -> nextStage -> nextStage -> … -> null
- 并行,使用同一个线程池,fork / join
