如何学习使用

  1. 从 List 的操作开始,先尝试把遍历 List 来筛选数据和转换数据的操作,使用 Stream 的 filter 和 map 实现,这是 Stream 最常用、最基本的两个 API。你可以重点看 看接下来两节的内容来入门
  2. 利用IDea 配置检测规则,将匿名类型 使用 Lambda 替换的检测规则,设置为 Error 级别严 重程度:

image.png

  1. 如果你不知道如何把匿名类转换为 Lambda 表达式,可以借助 IDE 来重构:

image.png

lambda 表达式

简化匿名类的语法, 使 Java 走向函数式编程。对于匿名类,虽然没有类名,但还是要给 出方法定义。

  1. //匿名类
  2. new Thread(new Runnable(){
  3. @Override
  4. public void run(){
  5. System.out.println("hello1");
  6. }
  7. }).start();
  8. //Lambda表达式
  9. new Thread(() -> System.out.println("hello2")).start();

,Lambda 表达式通过 函数式接口 匹配 Java 的类型系统

函数式接口

java.util.function 包中定义了各种函数式接口。
函数式接口是一种只有单一抽象方法的接口,使用 @FunctionalInterface 来描述,可以隐 式地转换成 Lambda 表达式。使用 Lambda 表达式来实现函数式接口,不需要提供类名和 方法定义,通过一行代码提供函数式接口的实例,就可以让函数成为程序中的头等公民,可 以像普通数据一样作为参数传递,而不是作为一个固定的类中的固定方法。

  1. @FunctionalInterface
  2. public interface Supplier<T> {
  3. /**
  4. * Gets a result.
  5. *
  6. * @return a result
  7. */
  8. T get();
  9. }
  10. //以使用 Lambda 表达式或方法引用,来得到 Supplier 接口的实例:
  11. //使用Lambda表达式提供Supplier接口实现,返回OK字符串
  12. Supplier<String> stringSupplier = ()->"OK";
  13. //使用方法引用提供Supplier接口实现,返回空字符串
  14. Supplier<String> supplier = String::new;

Predicate、Function 等函数式接口,还使用 default 关键字实现了几个默认方法。这样一 来,它们既可以满足函数式接口只有一个抽象方法,又能为接口提供额外的功能:

  1. @FunctionalInterface
  2. public interface Function<T, R> {
  3. R apply(T t);
  4. default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
  5. Objects.requireNonNull(before);
  6. return (V v) -> apply(before.apply(v));
  7. }
  8. default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
  9. Objects.requireNonNull(after);
  10. return (T t) -> after.apply(apply(t));
  11. }
  12. static <T> Function<T, T> identity() {
  13. return t -> t;
  14. }
  15. }

方法引用 ::

::调用 已经存在的方法

Stream 简化集合操作

  1. map 方法传入的是一个 Function,可以实现对象转换;
  2. filter 方法传入一个 Predicate,实现对象的布尔判断,只保留返回 true 的数据;
  3. mapToDouble 用于把对象转换为 double;
  4. 通过 average 方法返回一个 OptionalDouble,代表可能包含值也可能不包含值的可空 double。

    Optional 简化判空逻辑

    类似 OptionalDouble、OptionalInt、OptionalLong 等,是服务于基本类型的可 空对象。此外,Java8 还定义了用于引用类型的 Optional 类。使用 Optional,不仅可以 避免使用 Stream 进行级联调用的空指针问题;更重要的是,它提供了一些实用的方法帮我 们避免判空逻辑。
    image.png

    并行流操作

    通过 parallel 方法,一键把 Stream 转换为并行操作提交到线程 池处理。
    ```java

IntStream.rangeClosed(1,100).parallel().forEach(i->{ System.out.println(LocalDateTime.now() + “ : “ + i); try { Thread.sleep(1000); } catch (InterruptedException e) { } });

  1. <a name="JfaQ1"></a>
  2. ### 多线程操作常用的五种实现方式:
  3. 一般而言,使用线程池(第二种)和直接使用并行流(第四种)的方式在业务代码中比较常用。但需要注意的是,我们通常会重用线程池,而不会像 Demo 中那样在业务逻辑中直接声明新的线程池,等操作完成后再关闭。
  4. <a name="yCHt3"></a>
  5. #### 一:使用线程。直接把任务按照线程数均匀分割,分配到不同的线程执行,使用 CountDownLatch 来阻塞主线程,直到所有线程都完成操作
  6. ```java
  7. private int thread(int taskCount, int threadCount) throws InterruptedException {
  8. //总操作次数计数器
  9. AtomicInteger atomicInteger = new AtomicInteger();
  10. //使用CountDownLatch来等待所有线程执行完成
  11. CountDownLatch countDownLatch = new CountDownLatch(threadCount);
  12. //使用IntStream把数字直接转为Thread
  13. IntStream.rangeClosed(1, threadCount).mapToObj(i -> new Thread(() -> {
  14. //手动把taskCount分成taskCount份,每一份有一个线程执行
  15. IntStream.rangeClosed(1, taskCount / threadCount).forEach(j -> increment(atomicInteger));
  16. //每一个线程处理完成自己那部分数据之后,countDown一次
  17. countDownLatch.countDown();
  18. })).forEach(Thread::start);
  19. //等到所有线程执行完成
  20. countDownLatch.await();
  21. //查询计数器当前值
  22. return atomicInteger.get();
  23. }

二: 使用 Executors.newFixedThreadPool 来获得固定线程数的线程池,使用 execute 提交所有任务到线程池执行,最后关闭线程池等待所有任务执行完成:

  1. private int threadpool(int taskCount, int threadCount) throws InterruptedException {
  2. //总操作次数计数器
  3. AtomicInteger atomicInteger = new AtomicInteger();
  4. //初始化一个线程数量=threadCount的线程池
  5. ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
  6. //所有任务直接提交到线程池处理
  7. IntStream.rangeClosed(1, taskCount).forEach(i -> executorService.execute(() -> increment(atomicInteger)));
  8. //提交关闭线程池申请,等待之前所有任务执行完成
  9. executorService.shutdown();
  10. executorService.awaitTermination(1, TimeUnit.HOURS);
  11. //查询计数器当前值
  12. return atomicInteger.get();
  13. }

三: 使用 ForkJoinPool 而不是普通线程池执行任务。

ForkJoinPool 和传统的 ThreadPoolExecutor 区别在于,前者对于 n 并行度有 n 个独立队列,后者是共享队列。如果有大量执行耗时比较短的任务,ThreadPoolExecutor 的单队列就可能会成为瓶颈。这时,使用 ForkJoinPool 性能会更好。因此,ForkJoinPool 更适合大任务分割成许多小任务并行执行的场景,而 ThreadPoolExecutor 适合许多独立任务并发执行的场景。

  1. private int forkjoin(int taskCount, int threadCount) throws InterruptedException {
  2. //总操作次数计数器
  3. AtomicInteger atomicInteger = new AtomicInteger();
  4. //自定义一个并行度=threadCount的ForkJoinPool
  5. ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
  6. //所有任务直接提交到线程池处理
  7. forkJoinPool.execute(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(i -> increment(atomicInteger)));
  8. //提交关闭线程池申请,等待之前所有任务执行完成
  9. forkJoinPool.shutdown();
  10. forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
  11. //查询计数器当前值
  12. return atomicInteger.get();
  13. }

四:直接使用并行流,并行流使用公共的 ForkJoinPool,也就是 ForkJoinPool.commonPool()。

公共的 ForkJoinPool 默认的并行度是 CPU 核心数 -1,原因是对于 CPU 绑定的任务分配超过 CPU 个数的线程没有意义。由于并行流还会使用主线程执行任务,也会占用一个 CPU 核心,所以公共 ForkJoinPool 的并行度即使 -1 也能用满所有 CPU 核心。这里,我们通过配置强制指定(增大)了并行数,但因为使用的是公共 ForkJoinPool,所以可能会存在干扰,你可以回顾下第 3 讲有关线程池混用产生的问题:

  1. private int stream(int taskCount, int threadCount) {
  2. //设置公共ForkJoinPool的并行度
  3. System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(threadCount));
  4. //总操作次数计数器
  5. AtomicInteger atomicInteger = new AtomicInteger();
  6. //由于我们设置了公共ForkJoinPool的并行度,直接使用parallel提交任务即可
  7. IntStream.rangeClosed(1, taskCount).parallel().forEach(i -> increment(atomicInteger));
  8. //查询计数器当前值
  9. return atomicInteger.get();
  10. }

五: 使用 CompletableFuture 来实现。CompletableFuture.runAsync 方法可 以指定一个线程池,一般会在使用 CompletableFuture 的时候用到:

  1. private int completableFuture(int taskCount, int threadCount) throws InterruptedException, ExecutionException {
  2. //总操作次数计数器
  3. AtomicInteger atomicInteger = new AtomicInteger();
  4. //自定义一个并行度=threadCount的ForkJoinPool
  5. ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
  6. //使用CompletableFuture.runAsync通过指定线程池异步执行任务
  7. CompletableFuture.runAsync(() -> IntStream.rangeClosed(1, taskCount).parallel().forEach(i -> increment(atomicInteger)), forkJoinPool).get();
  8. //查询计数器当前值
  9. return atomicInteger.get();
  10. }

Stream 操作

Stream 操作汇总

image.png

创建流

要使用流-需要先创建流,创建流一般有五种方式

  1. 通过 stream 方法把 List 或数组转换为流;
  2. 通过 Stream.of 方法直接传入多个元素构成一个流;
  3. 通过 Stream.iterate 方法使用迭代的方式构造一个无限流,然后使用 limit 限制流元素个数
  4. 通过 Stream.generate 方法从外部传入一个提供元素的 Supplier 来构造无限流,然后使用 limit 限制流元素个数;
  5. 通过 IntStream DoubleStream 构造基本类型的流。 ```java

//通过stream方法把List或数组转换为流 @Test public void stream() { Arrays.asList(“a1”, “a2”, “a3”).stream().forEach(System.out::println); Arrays.stream(new int[]{1, 2, 3}).forEach(System.out::println); }

//通过Stream.of方法直接传入多个元素构成一个流 @Test public void of() { String[] arr = {“a”, “b”, “c”}; Stream.of(arr).forEach(System.out::println); Stream.of(“a”, “b”, “c”).forEach(System.out::println); Stream.of(1, 2, “a”).map(item -> item.getClass().getName()).forEach(System.out::println); }

//通过Stream.iterate方法使用迭代的方式构造一个无限流,然后使用limit限制流元素个数 @Test public void iterate() { Stream.iterate(2, item -> item * 2).limit(10).forEach(System.out::println); Stream.iterate(BigInteger.ZERO, n -> n.add(BigInteger.TEN)).limit(10).forEach(System.out::println); }

//通过Stream.generate方法从外部传入一个提供元素的Supplier来构造无限流,然后使用limit限制流元素个数 @Test public void generate() { Stream.generate(() -> “test”).limit(3).forEach(System.out::println); Stream.generate(Math::random).limit(10).forEach(System.out::println); }

//通过IntStream或DoubleStream构造基本类型的流 @Test public void primitive() { //演示IntStream和DoubleStream IntStream.range(1, 3).forEach(System.out::println); IntStream.range(0, 3).mapToObj(i -> “x”).forEach(System.out::println);

  1. IntStream.rangeClosed(1, 3).forEach(System.out::println);
  2. DoubleStream.of(1.1, 2.2, 3.3).forEach(System.out::println);
  3. //各种转换,后面注释代表了输出结果
  4. System.out.println(IntStream.of(1, 2).toArray().getClass()); //class [I
  5. System.out.println(Stream.of(1, 2).mapToInt(Integer::intValue).toArray().getClass()); //class [I
  6. System.out.println(IntStream.of(1, 2).boxed().toArray().getClass()); //class [Ljava.lang.Object;
  7. System.out.println(IntStream.of(1, 2).asDoubleStream().toArray().getClass()); //class [D
  8. System.out.println(IntStream.of(1, 2).asLongStream().toArray().getClass()); //class [J
  9. //注意基本类型流和装箱后的流的区别
  10. Arrays.asList("a", "b", "c").stream() // Stream<String>
  11. .mapToInt(String::length) // IntStream
  12. .asLongStream() // LongStream
  13. .mapToDouble(x -> x / 10.0) // DoubleStream
  14. .boxed() // Stream<Double>
  15. .mapToLong(x -> 1L) // LongStream
  16. .mapToObj(x -> "") // Stream<String>
  17. .collect(Collectors.toList());

}

  1. <a name="bBSoc"></a>
  2. ### Filter 过滤
  3. filter 方法可以实现过滤操作,类似 SQL 中的 where。
  4. ```java
  5. //最近半年的金额大于40的订单
  6. orders.stream()
  7. .filter(Objects::nonNull) //过滤null值
  8. .filter(order -> order.getPlacedAt().isAfter(LocalDateTime.now().minusMonths(6))) //最近半年的订单
  9. .filter(order -> order.getTotalPrice() > 40) //金额大于40的订单
  10. .forEach(System.out::println);

map 转换

map 操作可以做转换(或者说投影),类似 SQL 中的 select。为了对比,我用两种方式统计订单中所有商品的数量,前一种是通过两次遍历实现,后一种是通过两次 mapToLong+sum 方法实现:

  1. //计算所有订单商品数量
  2. //通过两次遍历实现
  3. LongAdder longAdder = new LongAdder();
  4. orders.stream().forEach(order ->
  5. order.getOrderItemList().forEach(orderItem -> longAdder.add(orderItem.getProductQuantity())));
  6. //使用两次mapToLong+sum方法实现
  7. assertThat(longAdder.longValue(), is(orders.stream().mapToLong(order ->
  8. order.getOrderItemList().stream()
  9. .mapToLong(OrderItem::getProductQuantity).sum()).sum()));

flatMap 扁平化操作

相当于 map+flat,通过 map 把每一个元素替换为一个流,然后展开这个流。

  1. //直接展开订单商品进行价格统计
  2. System.out.println(orders.stream()
  3. .flatMap(order -> order.getOrderItemList().stream())
  4. .mapToDouble(item -> item.getProductQuantity() * item.getProductPrice()).sum());
  5. //另一种方式flatMap+mapToDouble=flatMapToDouble
  6. System.out.println(orders.stream()
  7. .flatMapToDouble(order ->
  8. order.getOrderItemList()
  9. .stream().mapToDouble(item -> item.getProductQuantity() * item.getProductPrice()))
  10. .sum());

sorted 行内排序

sorted 操作可以用于行内排序的场景,类似 SQL 中的 order by。比如,要实现大于 50 元订单的按价格倒序取前 5,可以通过 Order::getTotalPrice 方法引用直接指定需要排序的依据字段,通过 reversed() 实现倒序:

  1. //大于50的订单,按照订单价格倒序前5
  2. orders.stream().filter(order -> order.getTotalPrice() > 50)
  3. .sorted(comparing(Order::getTotalPrice).reversed())
  4. .limit(5)
  5. .forEach(System.out::println);

distinct 去重

distinct 操作的作用是去重,类似 SQL 中的 distinct。比如下面的代码实现:查询去重后的下单用户。使用 map 从订单提取出购买用户,然后使用 distinct 去重。查询购买过的商品名。使用 flatMap+map 提取出订单中所有的商品名,然后使用 distinct 去重。

  1. //去重的下单用户
  2. System.out.println(orders.stream().map(order -> order.getCustomerName()).distinct().collect(joining(",")));
  3. //所有购买过的商品
  4. System.out.println(orders.stream()
  5. .flatMap(order -> order.getOrderItemList().stream())
  6. .map(OrderItem::getProductName)
  7. .distinct().collect(joining(",")));

skip & limit 用于分页

skip 和 limit 操作用于分页,类似 MySQL 中的 limit。其中,skip 实现跳过一定的项,limit 用于限制项总数。

collect 收集操作

  1. collect 是收集操作,对流进行终结(终止)操作,把流导出为我们需要的数据结构。“终结”是指,导出后,无法再串联使用其他中间操作,比如 f**iltermapflatmapsorteddistinctlimitskip**。在 Stream 操作中,collect 是最复杂的终结操作,比较简单的终结操作还有 **forEachtoArrayminmaxcountanyMatch** 等,我就不再展开了,你可以查询JDK 文档,搜索 terminal operation intermediate operation。<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/12671612/1653827643719-e20fa726-671a-4ab4-add0-2100e5c05b18.png#clientId=u0ff8bfd8-fc1c-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=2375&id=u506970ca&margin=%5Bobject%20Object%5D&name=image.png&originHeight=2375&originWidth=2923&originalType=binary&ratio=1&rotation=0&showTitle=false&size=556587&status=done&style=none&taskId=u34f6392a-9a33-4e3e-a785-d895ccbfd5a&title=&width=2923)

groupBy

groupBy 是分组统计操作,类似 SQL 中的 group by 子句。它和后面介绍的 partitioningBy 都是特殊的收集器,同样也是终结操作。分组操作比较复杂,

  1. //按照用户名分组,统计下单数量
  2. System.out.println(orders.stream().collect(groupingBy(Order::getCustomerName, counting()))
  3. .entrySet().stream().sorted(Map.Entry.<String, Long>comparingByValue().reversed()).collect(toList()));
  4. //按照用户名分组,统计订单总金额
  5. System.out.println(orders.stream().collect(groupingBy(Order::getCustomerName, summingDouble(Order::getTotalPrice)))
  6. .entrySet().stream().sorted(Map.Entry.<String, Double>comparingByValue().reversed()).collect(toList()));
  7. //按照用户名分组,统计商品采购数量
  8. System.out.println(orders.stream().collect(groupingBy(Order::getCustomerName,
  9. summingInt(order -> order.getOrderItemList().stream()
  10. .collect(summingInt(OrderItem::getProductQuantity)))))
  11. .entrySet().stream().sorted(Map.Entry.<String, Integer>comparingByValue().reversed()).collect(toList()));
  12. //统计最受欢迎的商品,倒序后取第一个
  13. orders.stream()
  14. .flatMap(order -> order.getOrderItemList().stream())
  15. .collect(groupingBy(OrderItem::getProductName, summingInt(OrderItem::getProductQuantity)))
  16. .entrySet().stream()
  17. .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
  18. .map(Map.Entry::getKey)
  19. .findFirst()
  20. .ifPresent(System.out::println);
  21. //统计最受欢迎的商品的另一种方式,直接利用maxBy
  22. orders.stream()
  23. .flatMap(order -> order.getOrderItemList().stream())
  24. .collect(groupingBy(OrderItem::getProductName, summingInt(OrderItem::getProductQuantity)))
  25. .entrySet().stream()
  26. .collect(maxBy(Map.Entry.comparingByValue()))
  27. .map(Map.Entry::getKey)
  28. .ifPresent(System.out::println);
  29. //按照用户名分组,选用户下的总金额最大的订单
  30. orders.stream().collect(groupingBy(Order::getCustomerName, collectingAndThen(maxBy(comparingDouble(Order::getTotalPrice)), Optional::get)))
  31. .forEach((k, v) -> System.out.println(k + "#" + v.getTotalPrice() + "@" + v.getPlacedAt()));
  32. //根据下单年月分组,统计订单ID列表
  33. System.out.println(orders.stream().collect
  34. (groupingBy(order -> order.getPlacedAt().format(DateTimeFormatter.ofPattern("yyyyMM")),
  35. mapping(order -> order.getId(), toList()))));
  36. //根据下单年月+用户名两次分组,统计订单ID列表
  37. System.out.println(orders.stream().collect
  38. (groupingBy(order -> order.getPlacedAt().format(DateTimeFormatter.ofPattern("yyyyMM")),
  39. groupingBy(order -> order.getCustomerName(),
  40. mapping(order -> order.getId(), toList())))));

partitionBy

partitioningBy 用于分区,分区是特殊的分组,只有 true 和 false 两组。比如,我们把用户按照是否下单进行分区,给 partitioningBy 方法传入一个 Predicate 作为数据分区的区分,输出是 Map>:

  1. public static <T>
  2. Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {
  3. return partitioningBy(predicate, toList());
  4. }

测试一下,partitioningBy 配合 anyMatch,可以把用户分为下过订单和没下过订单两组

  1. //根据是否有下单记录进行分区
  2. System.out.println(Customer.getData().stream().collect(
  3. partitioningBy(customer -> orders.stream().mapToLong(Order::getCustomerId)
  4. .anyMatch(id -> id == customer.getId()))));

流式编程如何测试

debug 工具窗口如何打开 ⌘ 5
此功能仅对项目文件可用。Java Stream调试器不能使用库或反编译代码。

  1. 在函数式编程中打断点

image.png

  1. 在调试工具窗口中单击“跟踪当前流链”按钮。
  2. 使用流跟踪对话框来分析流内部的操作。顶部的选项卡允许您在特定操作之间切换,并查看值如何随每个操作进行转换。

image.png