概念

Stream 是一个高级的迭代器,并不是一种数据结构,也不是一个集合,也不能存放数据,Stream 关注的是怎么把数据高效的处理,其实就是把数据在一个流水线中处理,就是一个 pipeline,在一端输出数据,在另一端可以得到结果,在中间有一系列的操作

外部迭代和内部迭代

外部迭代,就是我们传统的 for,while 方式进行数据迭代;
内部迭代,语法更加简洁,就是告诉 Stream 我们要做什么,并不关注如何实现细节;

  1. public class StreamDemo01 {
  2. public static void main(String[] args) {
  3. // 外部迭代
  4. int[] nums = {1, 2, 3, 4, 5, 6, 7, 8, 9};
  5. int sum = 0;
  6. for (int num : nums) {
  7. sum += num;
  8. }
  9. System.out.println("sum = " + sum);
  10. // 使用 stream 进行内部迭代
  11. int sum1 = IntStream.of(nums).sum();
  12. System.out.println("sum1 = " + sum1);
  13. }
  14. }

中间操作 / 终止操作

是 中间操作 还是 终止操作,主要是看返回值类型,如果返回 stream 就是 中间操作,其他都是终止操作

惰性求值:所有的中间操作都不会执行,只有调用了终止操作,才会真正的开始执行,这就叫惰性求值

  1. public static void main(String[] args) {
  2. // 外部迭代
  3. int[] nums = {1, 2, 3, 4, 5, 6, 7, 8, 9};
  4. int sum = 0;
  5. for (int num : nums) {
  6. sum += num;
  7. }
  8. System.out.println("sum = " + sum);
  9. // 使用 stream 进行内部迭代
  10. int sum1 = IntStream.of(nums).sum();
  11. System.out.println("sum1 = " + sum1);
  12. // map 就是中间操作(返回 stream 的操作)
  13. // sum 就是终止操作
  14. // 是 中间操作 还是 终止操作,主要是看返回值类型,如果返回 stream 就是 中间操作,其他都是 终止操作
  15. System.out.println(IntStream.of(nums).map(i -> i * 2).sum());
  16. System.out.println(IntStream.of(nums).map(StreamDemo01::doubleNum).sum());
  17. System.out.println("====================");
  18. // 惰性求值就是,终止操作没有调用的情况下,中间操作并不会执行
  19. IntStream intStream = IntStream.of(nums).map(StreamDemo01::doubleNum);
  20. System.out.println("惰性求值: " + intStream.sum());
  21. }
  22. public static int doubleNum(int i) {
  23. System.out.println("执行了乘以2");
  24. return i * 2;
  25. }

Stream 流的创建

流的操作通常包括:创建流 => 中间操作 **=> 中间操作 => … => 终止操作**

类型 相关方法
集合 Collection.stream / parallelStream
数组 Arrarys.stream
数字 Stream IntStream / LongStream. range/rangeClosed
自定义 Stream.generate / iterate
  1. public static void main(String[] args) {
  2. // 从集合创建流
  3. List<String> list = new ArrayList<>();
  4. list.stream();
  5. list.parallelStream();
  6. // 从数组创建
  7. Arrays.stream(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9});
  8. // 创建数字流
  9. IntStream.of(1, 2, 3);
  10. IntStream.rangeClosed(1, 10);
  11. // 使用 Random 创建一个数字流
  12. new Random().ints().limit(10);
  13. // 自己产生流
  14. Stream.generate(() -> 10).limit(20);
  15. }

Stream 流的中间操作

状态 相关方法 说明
无状态操作 map / mapToXxx 转化
flatMap / flatMapToXxx 拉平操作,多层属性,且属性是一个集合
filter 过滤
peek 获取
unordered 与并行流配合使用
有状态操作 distinct 去重
sorted 排序
limit / skip 限流 / 跳过
  1. public static void main(String[] args) {
  2. String str = "my name is 007";
  3. // 把每个单词的长度打印出来
  4. Stream.of(str.split(" ")).map(String::length).forEach(System.out::println);
  5. System.out.println("================");
  6. // 过滤
  7. Stream.of(str.split(" ")).filter(s -> s.length() > 2).map(String::length).forEach(System.out::println);
  8. System.out.println("================");
  9. // flatMap A -> B 属性(是个集合),最终得到所有 A 元素里面的所有 B 属性的集合
  10. // IntStream / LongStream 并不是 Stream 的子类,所以需要进行装箱
  11. Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(i -> System.out.println((char) i.intValue()));
  12. System.out.println("================");
  13. // peek 通常用于 debug,是一个中间操作,而 forEach 是中止操作
  14. Stream.of(str.split(" ")).peek(System.out::println).forEach(System.out::println);
  15. System.out.println("================");
  16. // limit 主要用于无限流的限流操作
  17. new Random().ints().filter(i -> i > 100 && i < 1000).limit(10).forEach(System.out::println);
  18. }

Stream 流的终止操作

状态 相关方法 说明
非短路操作 forEach / forEachOrdered 迭代 / 迭代(与并行流相关,可以保证顺序)
collect / toArray 收集为指定集合 / 转数组
reduce 计算
min / max / count 最小 / 最大 / 计数
短路操作 findFirst / findAny 找第一个 / 任意
allMatch / anyMatch / noneMatch 完全匹配 / 任意匹配 / 完全匹配取反
  1. public static void main(String[] args) {
  2. String str = "my name is 007";
  3. // 使用并行流
  4. str.chars().parallel().forEach(i -> System.out.print((char) i)); // is m 07nm0eya
  5. System.out.println();
  6. // 使用并行流,forEachOrdered 可以包装顺序
  7. str.chars().parallel().forEachOrdered(i -> System.out.print((char) i)); // my name is 007
  8. System.out.println();
  9. // 收集到 List
  10. List<String> collect = Stream.of(str.split(" ")).collect(Collectors.toList());
  11. System.out.println(collect); // [my, name, is, 007]
  12. System.out.println();
  13. // 使用 reduce 拼接字符串
  14. Optional<String> letters = Stream.of(str.split(" ")).reduce((s1, s2) -> {
  15. System.out.println("s1 = " + s1);
  16. System.out.println("s2 = " + s2);
  17. return s1 + " - " + s2;
  18. });
  19. System.out.println(letters.orElse("")); // my - name - is - 007
  20. // 使用 reduce 计算长度
  21. Integer length = Stream.of(str.split(" ")).map(s -> s.length()).reduce(0, (s1, s2) -> s1 + s2);
  22. System.out.println(length); // 11
  23. // max 的使用
  24. Optional<String> max = Stream.of(str.split(" ")).max((s1, s2) -> s1.length() - s2.length());
  25. System.out.println(max.get()); // name
  26. // count 的使用
  27. long count = Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).count();
  28. System.out.println(count); // 11
  29. // findFirst 短路操作
  30. OptionalInt first = new Random().ints().findFirst();
  31. System.out.println(first.getAsInt()); // 1626709974
  32. // findAny 短路操作
  33. OptionalInt any = new Random().ints().findAny();
  34. System.out.println(any.getAsInt()); // -1933666460
  35. // anyMatch,只要有一个符合条件,就返回 true
  36. boolean b = new Random().ints().anyMatch(i -> i > 0);
  37. System.out.println(b); // true
  38. }

并行流

  1. public static void main(String[] args) throws InterruptedException {
  2. // long count = IntStream.range(0, 10).peek(Stream_parallel::debug).count();
  3. // System.out.println(count);
  4. // 调用 parallel 产生一个并行流,并行流使用的线程池使用的是 ForkJoinPool.commonPool
  5. // 默认的线程数是当前机器 CPU 的个数
  6. // System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2"); // 手动修改线程数
  7. // long count1 = IntStream.range(0, 100).parallel().peek(Stream_parallel::debug).count();
  8. // System.out.println(count1);
  9. // 自己创建线程池,不使用默认线程池,防止任务阻塞
  10. ForkJoinPool pool = new ForkJoinPool(16);
  11. pool.submit(() -> IntStream.range(0, 100).parallel().peek(Stream_parallel::debug).count());
  12. pool.shutdown();
  13. synchronized (pool) {
  14. pool.wait();
  15. }
  16. // 想要实现这样的一个效果,第一步并行,第二步串行
  17. // 经过测试,多次调用 parallel 和 sequential 以最后一次为准
  18. IntStream.range(0, 10)
  19. // 调用 parallel 产生并行流
  20. .parallel().peek(Stream_parallel::debug)
  21. // 调用 sequential 产生串行流
  22. .sequential().peek(Stream_parallel::error)
  23. .count();
  24. }
  25. public static void debug(int i) {
  26. System.out.println(Thread.currentThread().getName() + " debug " + i);
  27. try {
  28. TimeUnit.SECONDS.sleep(3);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. public static void error(int i) {
  34. System.err.println(Thread.currentThread().getName() + " error " + i);
  35. try {
  36. TimeUnit.SECONDS.sleep(1);
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. }

收集器

  1. @Data
  2. class Student {
  3. enum Gender {
  4. MALE, FEMALE
  5. }
  6. enum Grade {
  7. ONE, TOW, THREE, FOUR
  8. }
  9. public Student(String name, int age, Gender gender, Grade grade){
  10. this.name = name;
  11. this.age = age;
  12. this.gender = gender;
  13. this.grade = grade;
  14. }
  15. private String name;
  16. private int age;
  17. private Gender gender;
  18. private Grade grade;
  19. }
  20. public class Stream_collect {
  21. public static void main(String[] args) {
  22. List<Student> students = Arrays.asList(
  23. new Student("小米", 10, Student.Gender.FEMALE, Student.Grade.ONE),
  24. new Student("小黑", 9, Student.Gender.MALE, Student.Grade.THREE),
  25. new Student("小白", 8, Student.Gender.FEMALE, Student.Grade.TOW),
  26. new Student("小绿", 13, Student.Gender.MALE, Student.Grade.FOUR),
  27. new Student("小明", 6, Student.Gender.FEMALE, Student.Grade.ONE),
  28. new Student("小红", 6, Student.Gender.FEMALE, Student.Grade.FOUR),
  29. new Student("小紫", 9, Student.Gender.MALE, Student.Grade.TOW),
  30. new Student("小王", 12, Student.Gender.MALE, Student.Grade.TOW),
  31. new Student("小李", 11, Student.Gender.FEMALE, Student.Grade.FOUR),
  32. new Student("小张", 14, Student.Gender.MALE, Student.Grade.THREE)
  33. );
  34. // 得到所有学生的年龄列表
  35. List<Integer> ageList = students.stream().map(Student::getAge).collect(Collectors.toList());
  36. System.out.println("所有学生的年龄: " + ageList);
  37. // 统计汇总信息
  38. IntSummaryStatistics ageStatistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));
  39. System.out.println("统计信息为: " + ageStatistics);
  40. // 分块(只能分成两块)
  41. Map<Boolean, List<Student>> partitionByGender = students.stream().collect(Collectors.partitioningBy(s -> s.getGender() == Student.Gender.MALE));
  42. // System.out.println("男女学生列表: " + partitionByGender);
  43. MapUtils.verbosePrint(System.out, "男女学生列表", partitionByGender);
  44. // 分组
  45. Map<Student.Grade, List<Student>> groupByGrade = students.stream().collect(Collectors.groupingBy(Student::getGrade));
  46. MapUtils.verbosePrint(System.out, "按班级分组", groupByGrade);
  47. // 得到所有班级的学生个数
  48. Map<Student.Grade, Long> countByGrade = students.stream().collect(Collectors.groupingBy(Student::getGrade, Collectors.counting()));
  49. MapUtils.verbosePrint(System.out, "所有班级的学生个数", countByGrade);
  50. }
  51. }

Stream 的运行机制

  1. 所有操作都是链式调用,一个元素只迭代一次
  2. 每一个中间操作,返回一个新的流,流里面有一个属性,sourceStage 都指向同一个地方 Head
  3. Stream 流的结构是:Head -> nextStage -> nextStage -> … -> null
  4. 有状态操作会把无状态操作截断,单独处理
  5. 并行环境下,有状态的中间操作并不一定能并行操作
  6. parallel / sequential 这两个操作也是中间操作(也是返回 stream 流),但是他们不创建流,他们只需改 Head 的并行标志,所以多次调用改函数,都以最后一次为准 ```java public static void main(String[] args) { Random random = new Random(); Stream stream = Stream.generate(random::nextInt)

    1. // 产生 500 个(无限流需要短路操作)
    2. .limit(500)
    3. // 第一个无状态操作
    4. .peek(s -> println("peek1: " + s))
    5. // 第二个无状态操作
    6. .filter(s -> {
    7. println("filter: " + s);
    8. return s > 10000;
    9. })
    10. // 有状态操作
    11. .sorted((i1, i2) -> {
    12. println("sorted: " + i1 + ", " + i2);
    13. return i1.compareTo(i2);
    14. })
    15. // 又一个无状态操作
    16. .peek(s -> println("peek2: " + s))
    17. // 并行环境
    18. .parallel();

    stream.count();

}

public static void println(String str) { System.out.println(Thread.currentThread().getName() + “: “ + str); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } ```

总结

  • 惰性求值
    • 中间操作:有状态 / 无状态操作,通常只有一个参数的,就是无状态,有两个参数的就是无状态,有状态的不能进行并行操作,无状态的可以并行操作
    • 终止操作:短路操作,通常在无限流中使用短路操作
    • parallel / sequential:不创建流,只修改 head 中的并行标志
  • 收集器
    • 分组
    • 统计
  • 运行机制
    • 链式操作,Head -> nextStage -> nextStage -> … -> null
    • 并行,使用同一个线程池,fork / join