算子集合

map

作用:遍历集合里的每个元素,可以在方法内对每个元素进行处理,输出结果会形成一个新的集合
输入类型: DataStream
输出类型: DataStream

  1. DataStream<Integer> dataStream = //...
  2. dataStream.map(new MapFunction<Integer, Integer>() {
  3. @Override
  4. public Integer map(Integer value) throws Exception {
  5. return 2 * value;
  6. }
  7. });

flatMap

作用:遍历集合里的每个元素,可以在方法内对元素进行分解,分解出0~N个元素,输出结果会形成一个新的集合
输入类型: DataStream
输出类型: DataStream
举个例子:我现在有这么一个数组 ["a b c", "e e a", "a g z"]flatMap 会遍历里面的每个元素,我们可以把元素压扁(flat)形成(map,当作)一个或多个元素,然后把这几个元素收集起来(丢到Collector里)。比如我把 "a b c" 分解为 "a""b""c" ,然后收集起来;然后再把 "e e a" 分解为 "e""e""a" ,然后收集起来,最后输出 ["a","b","c","e","e","a"]

  1. dataStream.flatMap(new FlatMapFunction<String, String>() {
  2. @Override
  3. public void flatMap(String value, Collector<String> out)
  4. throws Exception {
  5. for(String word: value.split(" ")){
  6. out.collect(word);
  7. }
  8. }
  9. });

filter

作用:遍历集合里的每个元素,把返回值为 true 的元素保留下来,最后形成一个新的集合
输入类型: DataStream
输出类型: DataStream

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

keyBy

点击查看【processon】
作用:按照集合里给定的键划分区域,键相同的元素进入同一个区域
输入类型: DataStream
输出类型: KeyedStream

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

注意,以下两种类型无法执行 keyBy()

  • key的元素类型为 POJO ,而该类型没有重新实现 hashCode() 而是使用默认的 Object.hashCode() (因为默认的 Object.hashCode() 是一个对象一个hash,所以key为对象必须要覆盖)
  • key的元素类型是一个数组

    只能保证一类数据不会出现在另外一边,不能保证一边只有一类数据。比如 A1、B1、A2、C1,B2 如果keyBy 将它分成了 A1A2C1B1B2 。也就是 AC 不会出现在 B 那边。

Rolling Aggregation

滚动聚合算子只能用于 KeyedStream ,也就是用了 keyBy() 之后才能用:

sum()

点击查看【processon】
对不同的分组进行某个字段的 sum 。如上图所示, keyBy() 会拿第一个字段作为 key 进行分组;执行 sum() 后,每个分组里的所有元素相加合并为一个元素。

max()

点击查看【processon】
对不同的分组进行某个字段的最小值判断。 keyBy() 会拿第一个字段作为 key 进行分组;执行 max() 后,取每个分组里所有元素的指定字段的最大值。当然这是书面上的意思,实际做起来还是有区别的:
如上图所示,在处理第一个元素时会保存第一个元素 A,1,2 ;当处理第二个元素时,会比较 A,1,2A,3,4 的下标为2的值,值更大的替换值更小的。注意,这里真的仅仅是值得替换。比较后,结果是 A,1,4 ,中间的值并没有变!举个例子:

a,1,2
a,3,4
a,4,5
a,7,8
a,8,9
c,10,11

max() 之后:

(c,10,11)
(a,1,2)
(a,1,4)
(a,1,5)
(a,1,8)
(a,1,9)

min()

max() ,取最小值

maxBy()

点击查看【processon】
作用和 max() 一样,不过它是用更大的一个整个元素替换小的元素

minBy()

作用和 min() 类似。

reduce()

点击查看【processon】
将数据合并成一个新的数据,返回单个结果值,并且 reduce 操作每处理一个元素总是创建一个新值。

split(deprecated)

该函数用来给流打上标签。相比 keyBy()split() 能够自由获取某一满足条件的流(一个元素可以打上多个标签标签之间是 **or** 的关系,只要一个数据里有一个标签满足,就输出这个数据),而 keyBy() 只能根据已有的 key 来获取。
点击查看【processon】
代码戳这里:👉SplitTransformExample.java
上面代码分别给了两种输出:

  1. 输出1,只查高温的数据,即查询标签 high ,这里可以看到筛选出了所有带 high 标签的数据:

    5> (hangzhou,37)
    8> (beijing,36)
    6> (beijing,37)
    7> (hangzhou,38)
    1> (hangzhou,37)
    
  2. 输出2,查杭州的、高温的数据,即查询标签 hzhigh ,这里可以看到它把带有 hz 标签的数据带有 high 标签的数据都输出了!也就是说两个及以上的标签不是表示AND的意思!

    8> (beijing,37)
    3> (hangzhou,37)
    3> (hangzhou,32)
    7> (hangzhou,37)
    1> (hangzhou,34)
    1> (hangzhou,38)
    5> (hangzhou,35)
    2> (beijing,36)
    

    注意! SplitStream 已经被标记为 @Deprecated 了,所以以后尽量避免使用 split() 算子。

    sideOutput

    作用和 split 类似,能够将流划分为多个,只不过 split 是通过给数据打标签,而 sideOutput 是通过将数据再收集起来。
    代码戳这里:👉SideOutputExample.java
    两者的区别:

  • sideOutput :不限制划分数量,不限制划分类型
  • split :不限制划分数量,限制划分类型,做数据集合操作方便(因为是基于标签的,可以通过标签数组把某几种数据合并到一起使用)

多流转换算子

connect(SplitStream)

把两条 SplitStream 流合并成一条,仅仅只能是两条哦~不过没有类型一致性要求!如果想合并多条类型相同的流,可以去看 union 算子。使用了connect()后,会将两条流封装成一个对象——ConnectedStream,我们可以在这个对象上执行map(CoMapFunction)flatMap(CoFlatMapFunction)等操作,这些操作可以将流的类型进行统一。
点击查看【processon】

map(CoMapFunction)

因为 ConnectStream 可能是由不同类型的Stream组成,那么通过 map(CoMapFunction) 就能将不同类型的Stream转成相同类型的Stream。如下图所示:
点击查看【processon】
多个 SplitStream 可以合并成一个 ConnectStream ,然后 ConnectStream 可以通过 map(CoMapFunction) 转换为统一类型的流输出(如果想要一个流里有多种类型,请使用 Object 作为输出结果)。 CoMapFunction() 如下所示:

// arg1:第一个流的类型   arg2:第二个流的类型    arg3:映射为哪一个类型
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
    OUT map1(IN1 value) throws Exception;
    OUT map2(IN2 value) throws Exception;
}

我仍然拿上面的 split 例子作为举例,追加下面的代码:

SingleOutputStreamOperator<Object> connectResult = high.connect(split.select("low"))
    .map(new CoMapFunction<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>, Object>() {
    @Override
    public Object map1(Tuple3<String, Integer, Integer> value) throws Exception {
        return new Tuple2<>(value.f0, value.f1);
    }
    @Override
    public Object map2(Tuple3<String, Integer, Integer> value) throws Exception {
        return value.f1;
    }
});
connectResult.print();

最后输出如下所示:

6> 1
1> (a,7)
2> (a,8)
8> 4
7> 3
3> (c,10)

union

可以合并两条及以上相同类型DataStream为一条DataStream

算子总结

点击查看【processon】

函数算子

Flink提供了两种粒度不同的接口供用于实现算子:
1. MapFunctionFilterFunctionFlatMapFunction 等粗粒度的 Funciton 接口
2. RichMapFunctionRichFilterFunctionRichFlatMapFunction 等细粒度 AbstractRichFunction 的抽象类

普通函数算子

我们前面使用的算子其实都算是 匿名函数算子(本质还是普通对象啦,只是这边这么叫。匿名函数算子不能重复使用,普通函数算子可以重复利用,仅此而已)
image.png

不要被我迷惑性的发言误导了,这里主要就是说匿名类只能一处使用,做成类可以多处使用。

Rich函数算子

下面分析 RichFunction ,拿 MapFilterFunction 举例来说:

static class MyRichMapFunction extends RichMapFunction<String, Tuple3<String, Integer, Integer>>{
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 在类中可以获取到流的上下文
        RuntimeContext runtimeContext = getRuntimeContext();
        // TODO 实现一些前置操作,比如当流打开时从Redis加载数据进来
    }
    @Override
    public void close() throws Exception {
        super.close();
        // TODO 做一些收尾工作,比如当流结束后关闭Redis客户端
    }
    // 执行正式的工作
    @Override
    public Tuple3<String, Integer, Integer> map(String value) throws Exception {
        String[] data = value.split(",");
        return new Tuple3<>(data[0], Integer.parseInt(data[1]), Integer.parseInt(data[2]));
    }
}

Rich函数可以获取运行时环境上下文,运行时环境上下文可以用来获取状态。

  • open(),在真正执行功能 (比如调用map()filter()) 前会调用 open()
  • close(),做一些清理、收尾工作
  • getRuntimeContext(),获取当前环境的运行时上下文

    open()close()只会在执行前后各执行一次!不会执行多次!!相当于初始化这个算子,初始化完了就能一直用,直到用完后执行一遍清理算子。 运行时上下文主要用来获取状态,方便后续做状态编程