算子集合
map
作用:遍历集合里的每个元素,可以在方法内对每个元素进行处理,输出结果会形成一个新的集合
输入类型: DataStream
输出类型: DataStream
DataStream<Integer> dataStream = //...dataStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return 2 * value;}});
flatMap
作用:遍历集合里的每个元素,可以在方法内对元素进行分解,分解出0~N个元素,输出结果会形成一个新的集合
输入类型: DataStream
输出类型: DataStream
举个例子:我现在有这么一个数组 ["a b c", "e e a", "a g z"] , flatMap 会遍历里面的每个元素,我们可以把元素压扁(flat)形成(map,当作)一个或多个元素,然后把这几个元素收集起来(丢到Collector里)。比如我把 "a b c" 分解为 "a" 、 "b" 、 "c" ,然后收集起来;然后再把 "e e a" 分解为 "e" 、 "e" 、 "a" ,然后收集起来,最后输出 ["a","b","c","e","e","a"]
dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out)throws Exception {for(String word: value.split(" ")){out.collect(word);}}});
filter
作用:遍历集合里的每个元素,把返回值为 true 的元素保留下来,最后形成一个新的集合
输入类型: DataStream
输出类型: DataStream
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
keyBy
点击查看【processon】
作用:按照集合里给定的键划分区域,键相同的元素进入同一个区域
输入类型: DataStream
输出类型: KeyedStream
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
注意,以下两种类型无法执行 keyBy()
- key的元素类型为
POJO,而该类型没有重新实现hashCode()而是使用默认的Object.hashCode()(因为默认的Object.hashCode()是一个对象一个hash,所以key为对象必须要覆盖) - key的元素类型是一个数组
只能保证一类数据不会出现在另外一边,不能保证一边只有一类数据。比如 A1、B1、A2、C1,B2 如果
keyBy将它分成了A1、A2、C1;B1、B2。也就是A、C不会出现在B那边。
Rolling Aggregation
滚动聚合算子只能用于 KeyedStream ,也就是用了 keyBy() 之后才能用:
sum()
点击查看【processon】
对不同的分组进行某个字段的 sum 。如上图所示, keyBy() 会拿第一个字段作为 key 进行分组;执行 sum() 后,每个分组里的所有元素相加合并为一个元素。
max()
点击查看【processon】
对不同的分组进行某个字段的最小值判断。 keyBy() 会拿第一个字段作为 key 进行分组;执行 max() 后,取每个分组里所有元素的指定字段的最大值。当然这是书面上的意思,实际做起来还是有区别的:
如上图所示,在处理第一个元素时会保存第一个元素 A,1,2 ;当处理第二个元素时,会比较 A,1,2 和 A,3,4 的下标为2的值,值更大的替换值更小的。注意,这里真的仅仅是值得替换。比较后,结果是 A,1,4 ,中间的值并没有变!举个例子:
a,1,2
a,3,4
a,4,5
a,7,8
a,8,9
c,10,11
max() 之后:
(c,10,11)
(a,1,2)
(a,1,4)
(a,1,5)
(a,1,8)
(a,1,9)
min()
同 max() ,取最小值
maxBy()
点击查看【processon】
作用和 max() 一样,不过它是用更大的一个整个元素替换小的元素。
minBy()
作用和 min() 类似。
reduce()
点击查看【processon】
将数据合并成一个新的数据,返回单个结果值,并且 reduce 操作每处理一个元素总是创建一个新值。
split(deprecated)
该函数用来给流打上标签。相比 keyBy() , split() 能够自由获取某一满足条件的流(一个元素可以打上多个标签,标签之间是 **or** 的关系,只要一个数据里有一个标签满足,就输出这个数据),而 keyBy() 只能根据已有的 key 来获取。
点击查看【processon】
代码戳这里:👉SplitTransformExample.java
上面代码分别给了两种输出:
输出1,只查高温的数据,即查询标签
high,这里可以看到筛选出了所有带high标签的数据:5> (hangzhou,37) 8> (beijing,36) 6> (beijing,37) 7> (hangzhou,38) 1> (hangzhou,37)输出2,查杭州的、高温的数据,即查询标签
hz和high,这里可以看到它把带有hz标签的数据或带有high标签的数据都输出了!也就是说两个及以上的标签不是表示AND的意思!8> (beijing,37) 3> (hangzhou,37) 3> (hangzhou,32) 7> (hangzhou,37) 1> (hangzhou,34) 1> (hangzhou,38) 5> (hangzhou,35) 2> (beijing,36)注意!
SplitStream已经被标记为@Deprecated了,所以以后尽量避免使用split()算子。sideOutput
作用和
split类似,能够将流划分为多个,只不过split是通过给数据打标签,而sideOutput是通过将数据再收集起来。
代码戳这里:👉SideOutputExample.java
两者的区别:
sideOutput:不限制划分数量,不限制划分类型split:不限制划分数量,限制划分类型,做数据集合操作方便(因为是基于标签的,可以通过标签数组把某几种数据合并到一起使用)
多流转换算子
connect(SplitStream)
把两条 SplitStream 流合并成一条,仅仅只能是两条哦~不过没有类型一致性要求!如果想合并多条类型相同的流,可以去看 union 算子。使用了connect()后,会将两条流封装成一个对象——ConnectedStream,我们可以在这个对象上执行map(CoMapFunction)、flatMap(CoFlatMapFunction)等操作,这些操作可以将流的类型进行统一。
点击查看【processon】
map(CoMapFunction)
因为 ConnectStream 可能是由不同类型的Stream组成,那么通过 map(CoMapFunction) 就能将不同类型的Stream转成相同类型的Stream。如下图所示:
点击查看【processon】
多个 SplitStream 可以合并成一个 ConnectStream ,然后 ConnectStream 可以通过 map(CoMapFunction) 转换为统一类型的流输出(如果想要一个流里有多种类型,请使用 Object 作为输出结果)。 CoMapFunction() 如下所示:
// arg1:第一个流的类型 arg2:第二个流的类型 arg3:映射为哪一个类型
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT map1(IN1 value) throws Exception;
OUT map2(IN2 value) throws Exception;
}
我仍然拿上面的 split 例子作为举例,追加下面的代码:
SingleOutputStreamOperator<Object> connectResult = high.connect(split.select("low"))
.map(new CoMapFunction<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>, Object>() {
@Override
public Object map1(Tuple3<String, Integer, Integer> value) throws Exception {
return new Tuple2<>(value.f0, value.f1);
}
@Override
public Object map2(Tuple3<String, Integer, Integer> value) throws Exception {
return value.f1;
}
});
connectResult.print();
最后输出如下所示:
6> 1
1> (a,7)
2> (a,8)
8> 4
7> 3
3> (c,10)
union
可以合并两条及以上的相同类型的DataStream为一条DataStream。
算子总结
函数算子
Flink提供了两种粒度不同的接口供用于实现算子:
1. MapFunction 、 FilterFunction 、 FlatMapFunction 等粗粒度的 Funciton 接口
2. RichMapFunction 、 RichFilterFunction 、 RichFlatMapFunction 等细粒度 AbstractRichFunction 的抽象类
普通函数算子
我们前面使用的算子其实都算是 匿名函数算子(本质还是普通对象啦,只是这边这么叫。匿名函数算子不能重复使用,普通函数算子可以重复利用,仅此而已):
不要被我迷惑性的发言误导了,这里主要就是说匿名类只能一处使用,做成类可以多处使用。
Rich函数算子
下面分析 RichFunction ,拿 MapFilterFunction 举例来说:
static class MyRichMapFunction extends RichMapFunction<String, Tuple3<String, Integer, Integer>>{
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 在类中可以获取到流的上下文
RuntimeContext runtimeContext = getRuntimeContext();
// TODO 实现一些前置操作,比如当流打开时从Redis加载数据进来
}
@Override
public void close() throws Exception {
super.close();
// TODO 做一些收尾工作,比如当流结束后关闭Redis客户端
}
// 执行正式的工作
@Override
public Tuple3<String, Integer, Integer> map(String value) throws Exception {
String[] data = value.split(",");
return new Tuple3<>(data[0], Integer.parseInt(data[1]), Integer.parseInt(data[2]));
}
}
Rich函数可以获取运行时环境上下文,运行时环境上下文可以用来获取状态。
open(),在真正执行功能 (比如调用map()、filter()) 前会调用open()close(),做一些清理、收尾工作getRuntimeContext(),获取当前环境的运行时上下文open()、close()只会在执行前后各执行一次!不会执行多次!!相当于初始化这个算子,初始化完了就能一直用,直到用完后执行一遍清理算子。 运行时上下文主要用来获取状态,方便后续做状态编程
