Java对Stream的定义
A sequence of elements supporting sequential and parallel aggregate operations.
解读一下:
- Stream是元素的集合,这点让Stream看起来用些类似Iterator;
- 可以支持顺序和并行的对原Stream进行汇聚的操作;
大家可以把Stream当成一个高级版本的Iterator。原始版本的Iterator,用户只能一个一个的遍历元素并对其执行某些操作;高级版本的Stream,用户只要给出需要对其包含的元素执行什么操作,比如“过滤掉长度大于10的字符串”、“获取每个字符串的首字母”等,具体这些操作如何应用到每个元素上,就给Stream就好了。
一、创建流的几种方式
java.util.Arrays#stream(T[])
java.util.stream.Stream#of(T...)
底层调用的也是Arrays.stream
java.util.Collection#stream
java.util.stream.Stream#iterate
java.util.stream.Stream#generate
1-3,是根据具体的数组或者集合对象,创建的流,在创建流之前,这些对象的大小(长度)已经确认,所以这个种方式的流,也被成为有限流。
而4-5中,创建流的方式,是无限大小的流(generate 最大是Long.MAX_VALUE),也被成为无限流,那么我们不可能就这样放任对象被无限创建,直到内存溢出,这样的无限流,也是配合limit使用,指定这个流生成的元素的个数。
二、流的中间操作和终端操作
通过查看Stream
接口的抽象方法的定义,我们可以看到,这些方法,可以分成两种类型,一种返回类型为接口本身的Stream
中间操作,是什么操作? 我们先看下字符串操作StringBuilder的append的方法
@Override
public StringBuilder append(String str) {
super.append(str);
return this; //就是这样,返回对象本身;然后我们就可以像操作StringBuilder的append一样,可以连接操作;
}
StringBuilder sb = new StringBuilder();
sb.append("a").append("b").append("c");
终端操作,是指返回最终的结果,例如我们常用的forEach,内部迭代;
操作 | 类型 | 返回类型 | 使用的类型/函数式接口 | 函数描述符 |
---|---|---|---|---|
filter | 中间 | Stream |
Predicate |
T -> boolean |
distinct | 中间 |
Stream |
||
skip | 中间 |
Stream |
long | |
limit | 中间 |
Stream |
long | |
map | 中间 | Stream |
Function |
T -> R |
flatMap | 中间 | Stream |
Function |
T -> Stream |
sorted | 中间 |
Stream |
Comparator |
(T,T) -> int |
anyMatch | 终端 | boolean | Predicate |
T -> boolean |
noneMatch | 终端 | boolean | Predicate |
T -> boolean |
allMatch | 终端 | boolean | Predicate |
T -> boolean |
findAny | 终端 | Optional |
||
findFirst | 终端 | Optional |
||
forEach | 终端 | void | Comsumer |
T -> void |
collect | 终端 | R | Collector |
|
reduce | 终端 |
Optional |
BinaryOperator |
(T,T) -> T |
count | 终端 | long |
三、stream的操作
filter操作
Stream<T> filter(Predicate<? super T> predicate);
这个方法,传入一个Predicate的函数接口,这个接口传入一个泛型参数T,做完操作之后,返回一个boolean值;
filter方法的作用,是对这个boolean做判断,返回判断之后的对象
String[] dd = { "a", "b", "c" };
Stream<String> stream = Arrays.stream(dd);
stream.filter(str -> str.equals("a")).forEach(System.out::println);//返回字符串为a的值
map操作
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
这个方法传入一个Function的函数式接口,这个接口,接收一个泛型T,返回泛型R,
map函数的定义,返回的流,表示的泛型是R对象,这个表示,调用这个函数后,可以改变返回的类型
public class TestJava8 {
public static void main(String[] args) {
Integer[] dd = { 1, 2, 3 };
Stream<Integer> stream = Arrays.stream(dd);
stream.map(str -> Integer.toString(str)).forEach(str -> {
System.out.println(str);// 1 ,2 ,3
System.out.println(str.getClass());// class java.lang.String
});
List<Emp> list = Arrays.asList(new Emp("a"), new Emp("b"), new Emp("c"));
list.stream().map(emp -> emp.getName()).forEach(str -> {
System.out.println(str);
});
}
@Data
public static class Emp {
private String name;
public Emp() {
super();
}
public Emp(String name) {
super();
this.name = name;
}
}
}
flatMap操作
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
这个接口,跟map一样,接收一个Fucntion的函数式接口,不同的是,Function接收的泛型参数,第二个参数是一个Stream流;
方法,返回的也是泛型R,具体的作用是把两个流,变成一个流返回
其他中间操作
//去重复
Stream<T> distinct();
//排序
Stream<T> sorted();
//根据属性排序
Stream<T> sorted(Comparator<? super T> comparator);
//对对象的进行操作
Stream<T> peek(Consumer<? super T> action);
//截断--取先maxSize个对象
Stream<T> limit(long maxSize);
//截断--忽略前N个对象
Stream<T> skip(long n);
终端操作forEachOrdered和forEach
void forEach(Consumer<? super T> action);
void forEachOrdered(Consumer<? super T> action);
这两个函数都是对集合的流,进行遍历操作,是属于内部迭代。
List<String> strs = Arrays.asList("a", "b", "c");
strs.stream().forEachOrdered(System.out::print);//abc
System.out.println();
strs.stream().forEach(System.out::print);//abc
System.out.println();
strs.parallelStream().forEachOrdered(System.out::print);//abc
System.out.println();
strs.parallelStream().forEach(System.out::print);//bca
先看第一段输出和第二段输出,使用的是stream的流,这个是一个串行流,也就是程序是串行执行的,所有看到遍历的结果都是按照集合的元素放入的顺序;
看第三段和第四段输出,使用的parallelStream的流,这个流表示一个并行流,也就是在程序内部迭代的时候,会帮你免费的并行处理
第三段代码的forEachOrdered表示严格按照顺序取数据,forEach在并行中,随机排列了;这个也可以看出来,在并行的程序中,如果对处理之后的数据,没有顺序的要求,使用forEach的效率,肯定是要更好的
规约操作reduce
Stream
的reduce
方法,翻译过来是聚合或者是汇聚成一个的意思,由于Stream
本身就代表着一堆数据,那stream.reduce()
方法顾名思义就是把一堆数据聚合成一个数据。
流底层核心其实是Spliterator
接口的一个实现,而这个Spliterator
接口其实本身就是Fork/Join并行框架的一个实现,所以归根结底要明白流的工作方式,就要明白一下Fork/Join框架的基本思想,即:以递归的方式将可以并行的任务拆分成更小的子任务,然后将每个子任务的结果合并起来生成整体的最后结果。
**
理解了方法本身的意思以及流的工作方式,再结合到一起理解一下stream.reduce()
方法,即用Fork/Join的方式把一堆数据聚合成一个数据,因此可以画出reduce
方法的运行草图
T reduce(T identity, BinaryOperator<T> accumulator);
这个函数,接受2个参数,第一个表示初始值,第二个值,传入的是一个函数式接口BinaryOperator,这个接口继承BiFunction;计算的表达式的规则;
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
BinaryOperator是供多线程使用的,如果不在Stream中声明使用多线程,就不会使用子任务,自然也不会调用到该方法。另外多线程下使用BinaryOperator的时候是需要考虑线程安全的问题。
另外自问自答下为什么需要BinaryOperator
因为这个重载的方法和其他两个不相同,允许改变返回值,所以返回值并不一定是Collection的子类;因此必须显示的声明如何拼接两个子任务产生的结果。
List<Integer> numbers = Stream.iterate(1, x -> x + 1).limit(10).collect(Collectors.toList());
Integer aa = 0;
for (Integer i : numbers) {
aa += i;
}
Integer dd = numbers.stream().reduce(0, (a, b) -> a + b, (a, b) -> a - b);
Optional<Integer> dd1 = numbers.stream().reduce((a, b) -> a + b);
System.out.println(aa);
System.out.println(dd);
System.out.println(dd1.get());
规约操作collect
package com.meinergy.vppd.business.api.test;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.Test;
public class TestJava8 {
public static List<Emp> list = new ArrayList<>();
static {
list.add(new Emp("上海", "小名", 17));
list.add(new Emp("北京", "小红", 18));
list.add(new Emp("深圳", "小蓝", 19));
list.add(new Emp("广州", "小灰", 20));
list.add(new Emp("杭州", "小黄", 21));
list.add(new Emp("贵阳", "小白", 22));
}
@Test
public void test1() {
// 转list
List<String> names = list.stream().map(emp -> emp.getName()).collect(Collectors.toList());
// 转set
Set<String> address = list.stream().map(emp -> emp.getName()).collect(Collectors.toSet());
// 转map,需要指定key和value,Function.identity()表示当前的Emp对象本身
Map<String, Emp> map = list.stream().collect(Collectors.toMap(Emp::getName, Function.identity()));
// 计算元素中的个数
Long count = list.stream().collect(Collectors.counting());
// 数据求和 summingInt summingLong,summingDouble
Integer sumAges = list.stream().collect(Collectors.summingInt(Emp::getAge));
// 平均值 averagingInt,averagingDouble,averagingLong
Double aveAges = list.stream().collect(Collectors.averagingInt(Emp::getAge));
// 综合处理的,求最大值,最小值,平均值,求和操作
// summarizingInt,summarizingLong,summarizingDouble
IntSummaryStatistics intSummary = list.stream().collect(Collectors.summarizingInt(Emp::getAge));
System.out.println(intSummary.getAverage());// 19.5
System.out.println(intSummary.getMax());// 22
System.out.println(intSummary.getMin());// 17
System.out.println(intSummary.getSum());// 117
// 连接字符串,当然也可以使用重载的方法,加上一些前缀,后缀和中间分隔符
String strEmp = list.stream().map(emp -> emp.getName()).collect(Collectors.joining());
String strEmp1 = list.stream().map(emp -> emp.getName()).collect(Collectors.joining("-中间的分隔符-"));
String strEmp2 = list.stream().map(emp -> emp.getName()).collect(Collectors.joining("-中间的分隔符-", "前缀*", "&后缀"));
System.out.println(strEmp);// 小名小红小蓝小灰小黄小白
// 小名-中间的分隔符-小红-中间的分隔符-小蓝-中间的分隔符-小灰-中间的分隔符-小黄-中间的分隔符-小白
System.out.println(strEmp1);
// 前缀*小名-中间的分隔符-小红-中间的分隔符-小蓝-中间的分隔符-小灰-中间的分隔符-小黄-中间的分隔符-小白&后缀
System.out.println(strEmp2);
// maxBy 按照比较器中的比较结果刷选 最大值
Optional<Integer> maxAge = list.stream().map(emp -> emp.getAge())
.collect(Collectors.maxBy(Comparator.comparing(Function.identity())));
// 最小值
Optional<Integer> minAge = list.stream().map(emp -> emp.getAge())
.collect(Collectors.minBy(Comparator.comparing(Function.identity())));
System.out.println("max:" + maxAge);
System.out.println("min:" + minAge);
// 归约操作
list.stream().map(emp -> emp.getAge()).collect(Collectors.reducing((x, y) -> x + y));
list.stream().map(emp -> emp.getAge()).collect(Collectors.reducing(0, (x, y) -> x + y));
// 分操作 groupingBy 根据地址,把原list进行分组
Map<String, List<Emp>> mapGroup = list.stream().collect(Collectors.groupingBy(Emp::getAddress));
// partitioningBy 分区操作 需要根据类型指定判断分区
Map<Boolean, List<Integer>> partitioningMap = list.stream().map(emp -> emp.getAge())
.collect(Collectors.partitioningBy(emp -> emp > 20));
}
@Data
static class Emp {
private String address;
private String name;
private Integer age;
public Emp() {
}
public Emp(String address) {
this.address = address;
}
public Emp(String name, Integer age) {
this.name = name;
this.age = age;
}
public Emp(String address, String name, Integer age) {
super();
this.address = address;
this.name = name;
this.age = age;
}
}
}
四、Optional
很多的stream的终端操作,都返回了一个Optional