第四章 引入流

  • 什么是流

流是Java API的新成员,它允许以声明性方式处理数据集合(就相当于没有mybatis和hibernate类似框架之前,只能手写sql去数据查询数据,流允许通过查询语句来表达,而不是临时编写一种实现);此外,流还提供了API可对集合进行并行处理。
示例:筛选集合中大于10的数字,并对筛选后的数字进行排序,最后生成一个新的集合

  1. List<Integer> intLists = Arrays.asList(1, 35, 21, 531, 32);
  2. List<Integer> newLists = intLists
  3. .stream()
  4. .filter(i -> i > 10) // 筛选
  5. .sorted() // 排序
  6. .collect(Collectors.toList()); // 收集
  • 集合与流
    • 类比SQL,集合是全量查询,而流则是分页操作
    • 流只能遍历一次
    • 遍历方式的区别:集合属于外部迭代,流则是内部迭代
  • 中间操作和终端操作
    • 中间操作:中间操作会返回另一个流,中间操作不会立即执行,只有调用终端操作时才会一次性全部处理
    • 终端操作:终端操作会从流的流水线生成结果,其结果是任何不是流的值
  • 按需计算:流中的元素是按需计算的(类比SQL分页)

    第五章 使用流

  • 筛选: Stream filter(Predicate<? super T> predicate)

筛选出符合条件的元素

  1. List<Integer> intLists = Arrays.asList(1, 35, 21, 531, 32);
  2. List<Integer> newLists = intLists
  3. .stream()
  4. .filter(i -> i > 10)
  5. .collect(Collectors.toList());
  • 截断、跳过:Stream limit(long n); Stream skip(long m);

截取前3个元素,跳过前2个元素

  1. List<Integer> newLists = intLists
  2. .stream()
  3. .filter(i -> i > 10)
  4. .skip(2)
  5. .limit(3)
  6. .collect(Collectors.toList());
  • 映射 Stream map(Function<? super T, ? extends R> mapper);

对流中的每一个元素应用函数
示例:将String类型的集合映射(转换)为Integer类型

  1. List<String> lists = Arrays.asList("123", "312", "523", "123");
  2. List<Integer> intLists = lists.stream()
  3. .map(Integer::valueOf)
  4. .collect(Collectors.toList());
  • 流的扁平化 Stream flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);

把一个流中的每个值都换成另外一个流,然后把所有的流连接起来成为一个流

  1. List<String> lists = Arrays.asList("hello", "world", "lambda");
  2. List<String> collect = lists.stream()
  3. .map(w -> w.split(""))
  4. .flatMap(Arrays::stream)
  5. .collect(Collectors.toList());
  • 查找和匹配
    • allMatch:是否至少匹配一个元素
    • anyMatch:是否匹配所有元素
    • noneMatch:是否不匹配所有元素
    • findFirst:查找第一个元素
    • findAny:返回流中的任意元素,常与filter配合使用
  • 规约
    • 元素求和
    • 最大值、最小值

reduce接受两个参数,一个是初始值,一个BinaryOperator来将两个元素结合起来产生一个新值
T reduce(T identity, BinaryOperator accumulator);
重载方法
Optional reduce(BinaryOperator accumulator);

  1. List<Integer> numbers = Arrays.asList(1, 42, 54, 52, 421);
  2. // for循环求和
  3. int sum = 0;
  4. for (int n : numbers) {
  5. sum +=n;
  6. }
  7. System.out.println(sum);
  8. // reduce求和
  9. int reduceSum = numbers.stream().reduce(0, Integer::sum);
  10. System.out.println(reduceSum);
  11. // reduce求最大值
  12. Optional<Integer> max = numbers.stream().reduce(Integer::max);
  13. // reduce求最小值
  14. Optional<Integer> min = numbers.stream().reduce(Integer::min);
  • 总结

Stream支持两种类型的操作:中间操作和终端操作。中间操作可以连接起来,将一个流转换为另外一个流,这些操作不会消耗流,其目的是建立一条流水线。终端操作则会消耗流,以产生一个结果。

第六章 用流收集数据

  • 收集器

收集器会对元素应用一个转换函数,并将结果累积在一个数据结果中

  • 预定义收集器:Collectors类提供的工程方法创建的收集器,主要提供来三大功能
    • 将流元素归约和汇总为一个值
    • 元素分组
    • 元素分区
      • 归约汇总
  1. @Data
  2. class User{
  3. private String name;
  4. private int age;
  5. private int weight;
  6. }
  7. @Test
  8. public void maxTest(){
  9. List<User> users = new ArrayList<>();
  10. for(int i =0;i<20;i++){
  11. User user = new User();
  12. user.age = i+10;
  13. user.name = "userName"+i;
  14. user.weight = i+50;
  15. users.add(user);
  16. }
  17. // 查找最大值和最小值
  18. Comparator<User> userComparator = Comparator.comparingInt(User::getAge);
  19. Optional<User> maxAgeUser = users.stream().collect(maxBy(userComparator));
  20. Optional<User> minAgeUser = users.stream().collect(minBy(userComparator));
  21. // 汇总
  22. // 计算平均年龄
  23. Double aveAge = users.stream().collect(averagingInt(User::getAge));
  24. // 统计体重维度
  25. IntSummaryStatistics statistics = users.stream().collect(summarizingInt(User::getWeight));
  26. System.out.println(statistics);
  27. // 输出:IntSummaryStatistics{count=20, sum=1190, min=50, average=59.500000, max=69}
  28. // 分别对应总数、总体重、体重最轻的、体重最大的、平均体重
  29. // 连接字符串
  30. String userNames = users.stream().map(User::getName).collect(joining(","));
  31. System.out.println(userNames);
  32. }
  • 广义的规约汇总

上述的所有收集器,都可以用reducing工厂方法定义

  1. public static <T, U>
  2. Collector<T, ?, U> reducing(U identity,
  3. Function<? super T, ? extends U> mapper,
  4. BinaryOperator<U> op)

它需要三个参数

  • 第一个是规约操作的初始值,也是流中没有元素时的返回值
  • 第二个参数是希望规约的取值函数
  • 第三个参数是一个BinaryOperator,将两个项目累积成一个同类型的值

示例:

  1. // 计算所有人的总重量
  2. Integer totalWeight = users.stream().collect(reducing(0, User::getWeight, (a, b) -> a + b));
  3. // 找出年龄最小User
  4. Optional<User> minUser = users.stream().collect(reducing((u1, u2) -> u1.getAge() < u2.getAge() ? u1 : u2));
  • 分组:

    1. List<Integer> lists = Arrays.asList(123, 23, 4, 23, 2, 3, 1, 234, 24);
    2. // 对int数组进行分组,大于10一组,小于10的一组
    3. Map<Boolean, List<Integer>> results = lists
    4. .stream()
    5. .collect(Collectors.groupingBy(a -> a > 10));
    6. System.out.println(results);
    7. // 输出:{false=[4, 2, 3, 1], true=[123, 23, 23, 234, 24]}
    • 多级分组

      1. // 对int数组进行分组,大于10一组,小于10的一组,并且每组在进行奇偶分组
      2. Map<Boolean, Map<Boolean, List<Integer>>> results = lists.stream().collect(Collectors.groupingBy(a -> a > 10,
      3. Collectors.groupingBy(a -> a % 2 == 0)));
      4. System.out.println(results);
      5. // 输出:{false={false=[3, 1], true=[4, 2]}, true={false=[123, 23, 23], true=[234, 24]}}
    • 分组统计

  1. List<Integer> lists = Arrays.asList(123, 23, 4, 23, 2, 3, 1, 234, 24);
  2. // 分组统计大于10和小于10的元素个数
  3. Map<Boolean, Long> results = lists.stream().collect(Collectors.groupingBy(a -> a > 10, Collectors.counting()));
  4. System.out.println(results);
  5. // 输入:{false=4, true=5}
  • Collector接口分析
  1. public interface Collector<T, A, R> {
  2. // 创建并返回新的可变结果容器
  3. Supplier<A> supplier();
  4. // 将元素添加到结果容器中
  5. BiConsumer<A, T> accumulator();
  6. // 合并两个结果容器
  7. BinaryOperator<A> combiner();
  8. // 对结果容器应用最终转换
  9. Function<A, R> finisher();
  10. // 返回一个不可变的Characteristics集合,定义了收集器的行为
  11. Set<Characteristics> characteristics();
  12. }
  • T是流中要收集的项目的类型
  • A是累加器的类型,累加器是在收集过程中用户累积部分结果的对象
  • R是收集操作得到的对象的类型
  • Characteristics
    • CONCURRENT: accumulator返回的函数可以从多个线程同时调用,且该收集器可以并行规约流
    • UNORDERED: 规约结果不受流中项目的遍历和累积顺序的影响
    • IDENTITY_FINISH:finisher返回的函数是一个恒等函数,可以忽略

      第七章 并行数据处理与性能

  • 并行流使用

    1. Stream.iterate(1L,i->i+1)
    2. .limit(100)
    3. .parallel() // 开启并行流
    4. .reduce(0L,Long::sum);

    并行流内部使用了默认的ForkJoinPool,默认的线程数量为处理器的数量,可通过如下代码改变线程池的大小

    1. System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","10")

    这是一个全局设置,因此它将影响代码中所有的并行流。

  • 正确的使用并行流:注意共享状态变量引发的线程安全问题

  • 并行流核心
    • 分之/合并框架(ForkJoinTask)

分之/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来并生成整体结果

  • 可分迭代器(Spliterator)

定义了并行流如何拆分它要遍历的数据