Java对Stream的定义
A sequence of elements supporting sequential and parallel aggregate operations.
解读一下:
- Stream是元素的集合,这点让Stream看起来用些类似Iterator;
- 可以支持顺序和并行的对原Stream进行汇聚的操作;
大家可以把Stream当成一个高级版本的Iterator。原始版本的Iterator,用户只能一个一个的遍历元素并对其执行某些操作;高级版本的Stream,用户只要给出需要对其包含的元素执行什么操作,比如“过滤掉长度大于10的字符串”、“获取每个字符串的首字母”等,具体这些操作如何应用到每个元素上,就给Stream就好了。
一、创建流的几种方式
java.util.Arrays#stream(T[])java.util.stream.Stream#of(T...)底层调用的也是Arrays.streamjava.util.Collection#streamjava.util.stream.Stream#iteratejava.util.stream.Stream#generate
1-3,是根据具体的数组或者集合对象,创建的流,在创建流之前,这些对象的大小(长度)已经确认,所以这个种方式的流,也被成为有限流。
而4-5中,创建流的方式,是无限大小的流(generate 最大是Long.MAX_VALUE),也被成为无限流,那么我们不可能就这样放任对象被无限创建,直到内存溢出,这样的无限流,也是配合limit使用,指定这个流生成的元素的个数。
二、流的中间操作和终端操作
通过查看Stream接口的抽象方法的定义,我们可以看到,这些方法,可以分成两种类型,一种返回类型为接口本身的Stream
中间操作,是什么操作? 我们先看下字符串操作StringBuilder的append的方法
@Overridepublic StringBuilder append(String str) {super.append(str);return this; //就是这样,返回对象本身;然后我们就可以像操作StringBuilder的append一样,可以连接操作;}StringBuilder sb = new StringBuilder();sb.append("a").append("b").append("c");
终端操作,是指返回最终的结果,例如我们常用的forEach,内部迭代;
| 操作 | 类型 | 返回类型 | 使用的类型/函数式接口 | 函数描述符 |
|---|---|---|---|---|
| filter | 中间 | Stream |
Predicate |
T -> boolean |
| distinct | 中间 |
Stream |
||
| skip | 中间 |
Stream |
long | |
| limit | 中间 |
Stream |
long | |
| map | 中间 | Stream |
Function |
T -> R |
| flatMap | 中间 | Stream |
Function |
T -> Stream |
| sorted | 中间 |
Stream |
Comparator |
(T,T) -> int |
| anyMatch | 终端 | boolean | Predicate |
T -> boolean |
| noneMatch | 终端 | boolean | Predicate |
T -> boolean |
| allMatch | 终端 | boolean | Predicate |
T -> boolean |
| findAny | 终端 | Optional |
||
| findFirst | 终端 | Optional |
||
| forEach | 终端 | void | Comsumer |
T -> void |
| collect | 终端 | R | Collector |
|
| reduce | 终端 |
Optional |
BinaryOperator |
(T,T) -> T |
| count | 终端 | long |
三、stream的操作
filter操作
Stream<T> filter(Predicate<? super T> predicate);这个方法,传入一个Predicate的函数接口,这个接口传入一个泛型参数T,做完操作之后,返回一个boolean值;filter方法的作用,是对这个boolean做判断,返回判断之后的对象String[] dd = { "a", "b", "c" };Stream<String> stream = Arrays.stream(dd);stream.filter(str -> str.equals("a")).forEach(System.out::println);//返回字符串为a的值
map操作
<R> Stream<R> map(Function<? super T, ? extends R> mapper);这个方法传入一个Function的函数式接口,这个接口,接收一个泛型T,返回泛型R,map函数的定义,返回的流,表示的泛型是R对象,这个表示,调用这个函数后,可以改变返回的类型public class TestJava8 {public static void main(String[] args) {Integer[] dd = { 1, 2, 3 };Stream<Integer> stream = Arrays.stream(dd);stream.map(str -> Integer.toString(str)).forEach(str -> {System.out.println(str);// 1 ,2 ,3System.out.println(str.getClass());// class java.lang.String});List<Emp> list = Arrays.asList(new Emp("a"), new Emp("b"), new Emp("c"));list.stream().map(emp -> emp.getName()).forEach(str -> {System.out.println(str);});}@Datapublic static class Emp {private String name;public Emp() {super();}public Emp(String name) {super();this.name = name;}}}
flatMap操作
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);这个接口,跟map一样,接收一个Fucntion的函数式接口,不同的是,Function接收的泛型参数,第二个参数是一个Stream流;方法,返回的也是泛型R,具体的作用是把两个流,变成一个流返回
其他中间操作
//去重复Stream<T> distinct();//排序Stream<T> sorted();//根据属性排序Stream<T> sorted(Comparator<? super T> comparator);//对对象的进行操作Stream<T> peek(Consumer<? super T> action);//截断--取先maxSize个对象Stream<T> limit(long maxSize);//截断--忽略前N个对象Stream<T> skip(long n);
终端操作forEachOrdered和forEach
void forEach(Consumer<? super T> action);void forEachOrdered(Consumer<? super T> action);
这两个函数都是对集合的流,进行遍历操作,是属于内部迭代。
List<String> strs = Arrays.asList("a", "b", "c");strs.stream().forEachOrdered(System.out::print);//abcSystem.out.println();strs.stream().forEach(System.out::print);//abcSystem.out.println();strs.parallelStream().forEachOrdered(System.out::print);//abcSystem.out.println();strs.parallelStream().forEach(System.out::print);//bca
先看第一段输出和第二段输出,使用的是stream的流,这个是一个串行流,也就是程序是串行执行的,所有看到遍历的结果都是按照集合的元素放入的顺序;
看第三段和第四段输出,使用的parallelStream的流,这个流表示一个并行流,也就是在程序内部迭代的时候,会帮你免费的并行处理
第三段代码的forEachOrdered表示严格按照顺序取数据,forEach在并行中,随机排列了;这个也可以看出来,在并行的程序中,如果对处理之后的数据,没有顺序的要求,使用forEach的效率,肯定是要更好的
规约操作reduce
Stream的reduce方法,翻译过来是聚合或者是汇聚成一个的意思,由于Stream本身就代表着一堆数据,那stream.reduce()方法顾名思义就是把一堆数据聚合成一个数据。
流底层核心其实是Spliterator接口的一个实现,而这个Spliterator接口其实本身就是Fork/Join并行框架的一个实现,所以归根结底要明白流的工作方式,就要明白一下Fork/Join框架的基本思想,即:以递归的方式将可以并行的任务拆分成更小的子任务,然后将每个子任务的结果合并起来生成整体的最后结果。
**
理解了方法本身的意思以及流的工作方式,再结合到一起理解一下stream.reduce()方法,即用Fork/Join的方式把一堆数据聚合成一个数据,因此可以画出reduce方法的运行草图
T reduce(T identity, BinaryOperator<T> accumulator);这个函数,接受2个参数,第一个表示初始值,第二个值,传入的是一个函数式接口BinaryOperator,这个接口继承BiFunction;计算的表达式的规则;
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);BinaryOperator是供多线程使用的,如果不在Stream中声明使用多线程,就不会使用子任务,自然也不会调用到该方法。另外多线程下使用BinaryOperator的时候是需要考虑线程安全的问题。另外自问自答下为什么需要BinaryOperator因为这个重载的方法和其他两个不相同,允许改变返回值,所以返回值并不一定是Collection的子类;因此必须显示的声明如何拼接两个子任务产生的结果。
List<Integer> numbers = Stream.iterate(1, x -> x + 1).limit(10).collect(Collectors.toList());Integer aa = 0;for (Integer i : numbers) {aa += i;}Integer dd = numbers.stream().reduce(0, (a, b) -> a + b, (a, b) -> a - b);Optional<Integer> dd1 = numbers.stream().reduce((a, b) -> a + b);System.out.println(aa);System.out.println(dd);System.out.println(dd1.get());
规约操作collect
package com.meinergy.vppd.business.api.test;import java.util.ArrayList;import java.util.Comparator;import java.util.IntSummaryStatistics;import java.util.List;import java.util.Map;import java.util.Optional;import java.util.Set;import java.util.function.Function;import java.util.stream.Collectors;import org.junit.Test;public class TestJava8 {public static List<Emp> list = new ArrayList<>();static {list.add(new Emp("上海", "小名", 17));list.add(new Emp("北京", "小红", 18));list.add(new Emp("深圳", "小蓝", 19));list.add(new Emp("广州", "小灰", 20));list.add(new Emp("杭州", "小黄", 21));list.add(new Emp("贵阳", "小白", 22));}@Testpublic void test1() {// 转listList<String> names = list.stream().map(emp -> emp.getName()).collect(Collectors.toList());// 转setSet<String> address = list.stream().map(emp -> emp.getName()).collect(Collectors.toSet());// 转map,需要指定key和value,Function.identity()表示当前的Emp对象本身Map<String, Emp> map = list.stream().collect(Collectors.toMap(Emp::getName, Function.identity()));// 计算元素中的个数Long count = list.stream().collect(Collectors.counting());// 数据求和 summingInt summingLong,summingDoubleInteger sumAges = list.stream().collect(Collectors.summingInt(Emp::getAge));// 平均值 averagingInt,averagingDouble,averagingLongDouble aveAges = list.stream().collect(Collectors.averagingInt(Emp::getAge));// 综合处理的,求最大值,最小值,平均值,求和操作// summarizingInt,summarizingLong,summarizingDoubleIntSummaryStatistics intSummary = list.stream().collect(Collectors.summarizingInt(Emp::getAge));System.out.println(intSummary.getAverage());// 19.5System.out.println(intSummary.getMax());// 22System.out.println(intSummary.getMin());// 17System.out.println(intSummary.getSum());// 117// 连接字符串,当然也可以使用重载的方法,加上一些前缀,后缀和中间分隔符String strEmp = list.stream().map(emp -> emp.getName()).collect(Collectors.joining());String strEmp1 = list.stream().map(emp -> emp.getName()).collect(Collectors.joining("-中间的分隔符-"));String strEmp2 = list.stream().map(emp -> emp.getName()).collect(Collectors.joining("-中间的分隔符-", "前缀*", "&后缀"));System.out.println(strEmp);// 小名小红小蓝小灰小黄小白// 小名-中间的分隔符-小红-中间的分隔符-小蓝-中间的分隔符-小灰-中间的分隔符-小黄-中间的分隔符-小白System.out.println(strEmp1);// 前缀*小名-中间的分隔符-小红-中间的分隔符-小蓝-中间的分隔符-小灰-中间的分隔符-小黄-中间的分隔符-小白&后缀System.out.println(strEmp2);// maxBy 按照比较器中的比较结果刷选 最大值Optional<Integer> maxAge = list.stream().map(emp -> emp.getAge()).collect(Collectors.maxBy(Comparator.comparing(Function.identity())));// 最小值Optional<Integer> minAge = list.stream().map(emp -> emp.getAge()).collect(Collectors.minBy(Comparator.comparing(Function.identity())));System.out.println("max:" + maxAge);System.out.println("min:" + minAge);// 归约操作list.stream().map(emp -> emp.getAge()).collect(Collectors.reducing((x, y) -> x + y));list.stream().map(emp -> emp.getAge()).collect(Collectors.reducing(0, (x, y) -> x + y));// 分操作 groupingBy 根据地址,把原list进行分组Map<String, List<Emp>> mapGroup = list.stream().collect(Collectors.groupingBy(Emp::getAddress));// partitioningBy 分区操作 需要根据类型指定判断分区Map<Boolean, List<Integer>> partitioningMap = list.stream().map(emp -> emp.getAge()).collect(Collectors.partitioningBy(emp -> emp > 20));}@Datastatic class Emp {private String address;private String name;private Integer age;public Emp() {}public Emp(String address) {this.address = address;}public Emp(String name, Integer age) {this.name = name;this.age = age;}public Emp(String address, String name, Integer age) {super();this.address = address;this.name = name;this.age = age;}}}
四、Optional
很多的stream的终端操作,都返回了一个Optional
