1.1 介绍
Flink一般的转换算子是无法访问事件的时间戳信息和WaterMark信息的。例如 MapFunction 这样的 map 转换算子就无法访问时间戳或者当前事件的事件时间。而这在某些应用场景下,这些信息确极为重要。基于此 DataStream API 提供了一系列的 Low-Level 转换算子,可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。
Process Function 是用来构建事件驱动的应用以及实现自定义的业务逻辑 ( 使用之前的window 函数和转换算子无法实现 ) 。例如 Flink SQL 就是使用 Process Function 实现的。所有的 Process Function 都继承自 RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。
1.2 内部构件
ProcessFunction是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:
- events:数据流中的元素
- state:状态,用于容错和一致性,仅用于keyed stream
- timers:定时器,支持事件时间和处理时间,仅用于keyed stream
1.3 分类
Flink 提供了 8 个 Process Function:
- ProcessFunction:用于DataStream流数据处理
- KeyedProcessFunction:用于keyBy之后的KeyedStream流处理
- CoProcessFunction:用于connect连接的流处理
- ProcessJoinFunction:用于join流操作
- BroadcastProcessFunction:用于广播
- KeyedBroadcastProcessFunction:keyBy之后的广播
- ProcessWindowFunction:窗口增量聚合
- ProcessAllWindowFunction:全窗口增量聚合
2. ProcessFunction
2.1 ProcessFunction 介绍
- 可以使用 steam.process(ProcessFunction)的方式使用 ProcessFunction
- 从 ProcessFunction类图可见,它有RichFunction的特性open、close,也有两个重要的方法processElement和onTimer
2.2 ProcessFunction 源码
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
/**
* ProcessFunction处理数据的主要方法,处理输入流中的每个元素
* 此函数可以使用Collector输出零个或多个元素,
* 还可以使用Context参数更新内部状态或设置计时器。
*
* @param value 输入类型.
* @param ctx 上下文
* @param out 使用 collector 输出数据
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* 在使用TimerService设置的计时器触发时调用。
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
/**
* 上下文
*/
public abstract class Context {
/**
* 当前正在处理的元素的时间戳或触发计时器的时间戳
*/
public abstract Long timestamp();
/**
* 用于注册计时器和查询时间的TimerService
*/
public abstract TimerService timerService();
/**
* 向OutputTag标识的侧输出流发出记录
*
* @param outputTag 指定侧输出流
* @param value 发送的记录
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
/**
* Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
*/
public abstract class OnTimerContext extends Context {
/**
* 触发计时器的 TimeDomain
*/
public abstract TimeDomain timeDomain();
}
}
3. KeyedProcessFunction
3.1 介绍
- KeyedProcessFunction 用来处理 KeyedStream 中的数据。
- KeyedProcessFunction[KEY, IN, OUT] 还额外提供了两个方法:
```java
/**
- 流中的每一个元素都会在这个方法中进行处理
- 参数说明
- Collector :将处理完成的数据输出,可能输出 0 到多个结果
- Context:可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。
- : 还可以将结果输出到别的流(side outputs)。
- I :输入数据类型
*/
public abstract void processElement(I value, Context ctx, Collector
out) throws Exception;
/**
- onTimer是一个回调函数。当之前注册的定时器触发时调用。
- 参数说明
- timestamp:为定时器所设定的触发的时间戳。
- Collector:为输出结果的集合。
- OnTimerContext:和processElement的Context参数一样,提供上下文的一些信息,例如定时器触发的时间信息
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector
out) throws Exception {} ``` 4. CoProcessFunction
- DataStream API 提供了 CoProcessFunction 这样的 low-level 操作来对于两条输入流进行处理
- CoProcessFunction 提供了操作每一个输入流的方法: processElement1() 和 processElement2()
- 类似于 ProcessFunction 这两种方法都通过 Context 对象来调用。这个 Context 对象可以访问事件数据,定时器时间戳,TimerService,以及 side outputs。CoProcessFunction 也提供了 onTimer()回调函数。
5. ProcessJoinFunction
scala
leftKeyedStream // intervalJoin目前只支持Event Time .intervalJoin(rightKeyedStream) // 时间间隔,设定下界和上界 .between(Time.minutes(-10),Time.seconds(0)) // 不包含下界 //.lowerBoundExclusive() // 不包含上界 //.upperBoundExclusive() // 自定义ProcessJoinFunction 处理Join到的元素 .process(ProcessJoinFunction)6. BroadcastProcessFunction
参考Idea广播流代码
7. KeyedBroadcastProcessFunction
8. ProcessWindowFunction
ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性,但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。
9. ProcessAllWindowFunction
与 ProcessWindowFunction 功能类似,不过作用在 AllWindowedStream 之上。
10. SideOutput(侧输出流)
大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。processfunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象, X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。