算子集合
map
作用:遍历集合里的每个元素,可以在方法内对每个元素进行处理,输出结果会形成一个新的集合
输入类型: DataStream
输出类型: DataStream
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
flatMap
作用:遍历集合里的每个元素,可以在方法内对元素进行分解,分解出0~N个元素,输出结果会形成一个新的集合
输入类型: DataStream
输出类型: DataStream
举个例子:我现在有这么一个数组 ["a b c", "e e a", "a g z"]
, flatMap
会遍历里面的每个元素,我们可以把元素压扁(flat)形成(map,当作)一个或多个元素,然后把这几个元素收集起来(丢到Collector里)。比如我把 "a b c"
分解为 "a"
、 "b"
、 "c"
,然后收集起来;然后再把 "e e a"
分解为 "e"
、 "e"
、 "a"
,然后收集起来,最后输出 ["a","b","c","e","e","a"]
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
filter
作用:遍历集合里的每个元素,把返回值为 true
的元素保留下来,最后形成一个新的集合
输入类型: DataStream
输出类型: DataStream
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
keyBy
点击查看【processon】
作用:按照集合里给定的键划分区域,键相同的元素进入同一个区域
输入类型: DataStream
输出类型: KeyedStream
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
注意,以下两种类型无法执行 keyBy()
- key的元素类型为
POJO
,而该类型没有重新实现hashCode()
而是使用默认的Object.hashCode()
(因为默认的Object.hashCode()
是一个对象一个hash,所以key为对象必须要覆盖) - key的元素类型是一个数组
只能保证一类数据不会出现在另外一边,不能保证一边只有一类数据。比如 A1、B1、A2、C1,B2 如果
keyBy
将它分成了A1
、A2
、C1
;B1
、B2
。也就是A
、C
不会出现在B
那边。
Rolling Aggregation
滚动聚合算子只能用于 KeyedStream
,也就是用了 keyBy()
之后才能用:
sum()
点击查看【processon】
对不同的分组进行某个字段的 sum
。如上图所示, keyBy()
会拿第一个字段作为 key
进行分组;执行 sum()
后,每个分组里的所有元素相加合并为一个元素。
max()
点击查看【processon】
对不同的分组进行某个字段的最小值判断。 keyBy()
会拿第一个字段作为 key
进行分组;执行 max()
后,取每个分组里所有元素的指定字段的最大值。当然这是书面上的意思,实际做起来还是有区别的:
如上图所示,在处理第一个元素时会保存第一个元素 A,1,2
;当处理第二个元素时,会比较 A,1,2
和 A,3,4
的下标为2的值,值更大的替换值更小的。注意,这里真的仅仅是值得替换。比较后,结果是 A,1,4
,中间的值并没有变!举个例子:
a,1,2
a,3,4
a,4,5
a,7,8
a,8,9
c,10,11
max()
之后:
(c,10,11)
(a,1,2)
(a,1,4)
(a,1,5)
(a,1,8)
(a,1,9)
min()
同 max()
,取最小值
maxBy()
点击查看【processon】
作用和 max()
一样,不过它是用更大的一个整个元素替换小的元素。
minBy()
作用和 min()
类似。
reduce()
点击查看【processon】
将数据合并成一个新的数据,返回单个结果值,并且 reduce 操作每处理一个元素总是创建一个新值。
split(deprecated)
该函数用来给流打上标签。相比 keyBy()
, split()
能够自由获取某一满足条件的流(一个元素可以打上多个标签,标签之间是 **or**
的关系,只要一个数据里有一个标签满足,就输出这个数据),而 keyBy()
只能根据已有的 key
来获取。
点击查看【processon】
代码戳这里:👉SplitTransformExample.java
上面代码分别给了两种输出:
输出1,只查高温的数据,即查询标签
high
,这里可以看到筛选出了所有带high
标签的数据:5> (hangzhou,37) 8> (beijing,36) 6> (beijing,37) 7> (hangzhou,38) 1> (hangzhou,37)
输出2,查杭州的、高温的数据,即查询标签
hz
和high
,这里可以看到它把带有hz
标签的数据或带有high
标签的数据都输出了!也就是说两个及以上的标签不是表示AND的意思!8> (beijing,37) 3> (hangzhou,37) 3> (hangzhou,32) 7> (hangzhou,37) 1> (hangzhou,34) 1> (hangzhou,38) 5> (hangzhou,35) 2> (beijing,36)
注意!
SplitStream
已经被标记为@Deprecated
了,所以以后尽量避免使用split()
算子。sideOutput
作用和
split
类似,能够将流划分为多个,只不过split
是通过给数据打标签,而sideOutput
是通过将数据再收集起来。
代码戳这里:👉SideOutputExample.java
两者的区别:
sideOutput
:不限制划分数量,不限制划分类型split
:不限制划分数量,限制划分类型,做数据集合操作方便(因为是基于标签的,可以通过标签数组把某几种数据合并到一起使用)
多流转换算子
connect(SplitStream)
把两条 SplitStream
流合并成一条,仅仅只能是两条哦~不过没有类型一致性要求!如果想合并多条类型相同的流,可以去看 union 算子。使用了connect()
后,会将两条流封装成一个对象——ConnectedStream
,我们可以在这个对象上执行map(CoMapFunction)
、flatMap(CoFlatMapFunction)
等操作,这些操作可以将流的类型进行统一。
点击查看【processon】
map(CoMapFunction)
因为 ConnectStream
可能是由不同类型的Stream组成,那么通过 map(CoMapFunction)
就能将不同类型的Stream转成相同类型的Stream。如下图所示:
点击查看【processon】
多个 SplitStream
可以合并成一个 ConnectStream
,然后 ConnectStream
可以通过 map(CoMapFunction)
转换为统一类型的流输出(如果想要一个流里有多种类型,请使用 Object
作为输出结果)。 CoMapFunction()
如下所示:
// arg1:第一个流的类型 arg2:第二个流的类型 arg3:映射为哪一个类型
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT map1(IN1 value) throws Exception;
OUT map2(IN2 value) throws Exception;
}
我仍然拿上面的 split
例子作为举例,追加下面的代码:
SingleOutputStreamOperator<Object> connectResult = high.connect(split.select("low"))
.map(new CoMapFunction<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>, Object>() {
@Override
public Object map1(Tuple3<String, Integer, Integer> value) throws Exception {
return new Tuple2<>(value.f0, value.f1);
}
@Override
public Object map2(Tuple3<String, Integer, Integer> value) throws Exception {
return value.f1;
}
});
connectResult.print();
最后输出如下所示:
6> 1
1> (a,7)
2> (a,8)
8> 4
7> 3
3> (c,10)
union
可以合并两条及以上的相同类型的DataStream
为一条DataStream
。
算子总结
函数算子
Flink提供了两种粒度不同的接口供用于实现算子:
1. MapFunction
、 FilterFunction
、 FlatMapFunction
等粗粒度的 Funciton
接口
2. RichMapFunction
、 RichFilterFunction
、 RichFlatMapFunction
等细粒度 AbstractRichFunction
的抽象类
普通函数算子
我们前面使用的算子其实都算是 匿名函数算子(本质还是普通对象啦,只是这边这么叫。匿名函数算子不能重复使用,普通函数算子可以重复利用,仅此而已):
不要被我迷惑性的发言误导了,这里主要就是说匿名类只能一处使用,做成类可以多处使用。
Rich函数算子
下面分析 RichFunction
,拿 MapFilterFunction
举例来说:
static class MyRichMapFunction extends RichMapFunction<String, Tuple3<String, Integer, Integer>>{
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 在类中可以获取到流的上下文
RuntimeContext runtimeContext = getRuntimeContext();
// TODO 实现一些前置操作,比如当流打开时从Redis加载数据进来
}
@Override
public void close() throws Exception {
super.close();
// TODO 做一些收尾工作,比如当流结束后关闭Redis客户端
}
// 执行正式的工作
@Override
public Tuple3<String, Integer, Integer> map(String value) throws Exception {
String[] data = value.split(",");
return new Tuple3<>(data[0], Integer.parseInt(data[1]), Integer.parseInt(data[2]));
}
}
Rich函数可以获取运行时环境上下文,运行时环境上下文可以用来获取状态。
open()
,在真正执行功能 (比如调用map()
、filter()
) 前会调用open()
close()
,做一些清理、收尾工作getRuntimeContext()
,获取当前环境的运行时上下文open()
、close()
只会在执行前后各执行一次!不会执行多次!!相当于初始化这个算子,初始化完了就能一直用,直到用完后执行一遍清理算子。 运行时上下文主要用来获取状态,方便后续做状态编程