https://blog.csdn.net/boling_cavalry/article/details/106299167
https://www.modb.pro/db/155268
ProcessFunction
ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。
ProcessFunction是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:
- events:数据流中的元素
- state:状态,用于容错和一致性,仅用于keyed stream
- timers:定时器,支持事件时间和处理时间,仅用于keyed stream
Flink提供了8个Process Function:
- ProcessFunction:dataStream
- KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理
- CoProcessFunction:用于connect连接的流
- ProcessJoinFunction:用于join流操作
- BroadcastProcessFunction:用于广播
- KeyedBroadcastProcessFunction:keyBy之后的广播
- ProcessWindowFunction:窗口增量聚合
- ProcessAllWindowFunction:全窗口聚合
可以将ProcessFunction看作是一个具有key state和定时器(timer)访问权的FlatMapFunction。对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件。
对于容错状态,ProcessFunction 可以通过 RuntimeContext 访问Flink的keyed state,这与其他有状态函数访问keyed state的方式类似。
定时器可让应用程序对在处理时间和事件时间中的变化进行响应。每次调用 processElement(…)函数时都可以获得一个Context对象,通过该对象可以访问元素的事件时间(event time)时间戳以及 TimerService。可以使用TimerService为将来的事件时间/处理时间实例注册回调。对于事件时间计时器,当当前水印被提升到或超过计时器的时间戳时,将调用onTimer(…)方法,而对于处理时间计时器,当挂钟时间达到指定时间时,将调用onTimer(…)方法。在调用期间,所有状态的范围再次限定为创建定时器所用的key,从而允许定时器操作keyed state。
一. KeyedProcessFunction
这个是相对比较常用的ProcessFunction,根据名字就可以知道是用在keyedStream上的。
KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction
- processElement(I value, Context ctx, Collector
out),流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的 key ,以及TimerService 时间服务。 Context 还可以将结果输出到别的流(side outputs)。 - onTimer(long timestamp, OnTimerContext ctx, Collector
out),是一个回调函数。当之前注册的定时器触发时调用。参数timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext和processElement的Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。
实验目标:
监听本机9999端口,获取字符串; 将每个字符串用空格分隔,转成Tuple2实例,f0是分隔后的单词,f1等于1; 上述Tuple2实例用f0字段分区,得到KeyedStream; KeyedSteam转入自定义KeyedProcessFunction处理; 自定义KeyedProcessFunction的作用,是记录每个单词最新一次出现的时间,然后建一个十秒的定时器,十秒后如果发现这个单词没有再次出现,就把这个单词和它出现的总次数发送到下游算子;
package com.bolingcavalry.keyedprocessfunction;import com.bolingcavalry.Splitter;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.streaming.api.watermark.Watermark;import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;import java.util.Date;/*** @author will* @email zq2599@gmail.com* @date 2020-05-17 13:43* @description 体验KeyedProcessFunction类(时间类型是处理时间)*/public class ProcessTime {/*** KeyedProcessFunction的子类,作用是将每个单词最新出现时间记录到backend,并创建定时器,* 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子*/static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {// 自定义状态private ValueState<CountWithTimestamp> state;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态,name是myStatestate = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));}@Overridepublic void processElement(Tuple2<String, Integer> value,Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 取得当前是哪个单词Tuple currentKey = ctx.getCurrentKey();// 从backend取得当前单词的myState状态CountWithTimestamp current = state.value();// 如果myState还从未没有赋值过,就在此初始化if (current == null) {current = new CountWithTimestamp();current.key = value.f0;}// 单词数量加一current.count++;// 取当前元素的时间戳,作为该单词最后一次出现的时间current.lastModified = ctx.timestamp();// 重新保存到backend,包括该单词出现的次数,以及最后一次出现的时间state.update(current);// 为当前单词创建定时器,十秒后后触发long timer = current.lastModified + 10000;ctx.timerService().registerProcessingTimeTimer(timer);// 打印所有信息,用于核对数据正确性System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)\n\n",currentKey.getField(0),current.count,current.lastModified,time(current.lastModified),timer,time(timer)));}/*** 定时器触发后执行的方法* @param timestamp 这个时间戳代表的是该定时器的触发时间* @param ctx* @param out* @throws Exception*/@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 取得当前单词Tuple currentKey = ctx.getCurrentKey();// 取得该单词的myState状态CountWithTimestamp result = state.value();// 当前元素是否已经连续10秒未出现的标志boolean isTimeout = false;// timestamp是定时器触发时间,如果等于最后一次更新时间+10秒,就表示这十秒内已经收到过该单词了,// 这种连续十秒没有出现的元素,被发送到下游算子if (timestamp == result.lastModified + 10000) {// 发送out.collect(new Tuple2<String, Long>(result.key, result.count));isTimeout = true;}// 打印数据,用于核对是否符合预期System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %s\n\n",currentKey.getField(0),result.count,result.lastModified,time(result.lastModified),timestamp,time(timestamp),String.valueOf(isTimeout)));}}public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 监听本地9999端口,读取字符串DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);// 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream// 对收到的字符串用空格做分割,得到多个单词.flatMap(new Splitter())// 设置时间戳分配器,用当前时间作为时间戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {// 使用当前系统时间作为时间戳return System.currentTimeMillis();}@Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark,返回nullreturn null;}})// 将单词作为key分区.keyBy(0)// 按单词分区后的数据,交给自定义KeyedProcessFunction处理.process(new CountWithTimeoutFunction());// 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来timeOutWord.print();env.execute("ProcessFunction demo : KeyedProcessFunction");}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}
二. TimerService和定时器(Timers)
Context 和OnTimerContext 所持有的TimerService 对象拥有以下方法:
- long currentProcessingTime() 返回当前处理时间
- long currentWatermark() 返回当前watermark 的时间戳
- void registerProcessingTimeTimer( long timestamp) 会注册当前key的processing time的定时器。当processing time 到达定时时间时,触发timer。
- void registerEventTimeTimer(long timestamp) 会注册当前key 的event time 定时器。当Watermark水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
- void deleteProcessingTimeTimer(long timestamp) 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
- void deleteEventTimeTimer(long timestamp) 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
当定时器timer 触发时,会执行回调函数onTimer()。注意定时器timer 只能在keyed streams 上面使用。
三. 侧输出流(SideOutput)
一个数据可以被多个window包含,只有其不被任何window包含的时候(包含该数据的所有window都关闭之后),才会被丢到侧输出流。
简言之,如果一个数据被丢到侧输出流,那么所有包含该数据的window都由于已经超过了”允许的迟到时间”而关闭了,进而新来的迟到数据只能被丢到侧输出流!
大部分的DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。
processfunction 的side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。
一个side output 可以定义为OutputTag[X]对象,X 是输出流的数据类型。
processfunction 可以通过Context 对象发射一个事件到一个或者多个side outputs。
四. CoProcessFunction
我们在DataStream API部分曾提到使用connect()将两个数据流的合并,如果想从更细的粒度在两个数据流进行一些操作,可以使用CoProcessFunction或KeyedCoProcessFunction。这两个函数都有processElement1()和processElement2()方法,分别对第一个数据流和第二个数据流的每个元素进行处理。第一个数据流类型、第二个数据流类型和经过函数处理后的输出类型可以互不相同。尽管数据来自两个不同的流,但是他们可以共享同样的状态,所以可以参考下面的逻辑来实现两个数据流上的Join:
- 创建一到多个状态,两个数据流都能访问到这些状态,这里以状态a为例。
- processElement1()方法处理第一个数据流,更新状态a。
- processElement2()方法处理第二个数据流,根据状态a中的数据,生成相应的输出。
我们这次将股票价格结合媒体评价两个数据流一起讨论,假设对于某支股票有一个媒体评价数据流,媒体评价数据流包含了对该支股票的正负评价。两支数据流一起流入KeyedCoProcessFunction,processElement2()方法处理流入的媒体数据,将媒体评价更新到状态mediaState上,processElement1()方法处理流入的股票交易数据,获取mediaState状态,生成到新的数据流。两个方法分别处理两个数据流,共享一个状态,通过状态来通信。
// 读入股票数据流DataStream<StockPrice> stockStream = ...// 读入媒体评价数据流DataStream<Media> mediaStream = ...DataStream<StockPrice> joinStream = stockStream.connect(mediaStream).keyBy("symbol", "symbol")// 调用process函数.process(new JoinStockMediaProcessFunction());
/*** 四个泛型:Key,第一个流类型,第二个流类型,输出。*/public static class JoinStockMediaProcessFunction extends KeyedCoProcessFunction<String, StockPrice, Media, StockPrice> {// mediaStateprivate ValueState<String> mediaState;@Overridepublic void open(Configuration parameters) throws Exception {// 从RuntimeContext中获取状态mediaState = getRuntimeContext().getState(new ValueStateDescriptor<String>("mediaStatusState", Types.STRING));}@Overridepublic void processElement1(StockPrice stock, Context context, Collector<StockPrice> collector) throws Exception {String mediaStatus = mediaState.value();if (null != mediaStatus) {stock.mediaStatus = mediaStatus;collector.collect(stock);}}@Overridepublic void processElement2(Media media, Context context, Collector<StockPrice> collector) throws Exception {// 第二个流更新mediaStatemediaState.update(media.status);}}
这个例子比较简单,没有使用Timer,实际的业务场景中状态一般用到Timer将过期的状态清除。两个数据流的中间数据放在状态中,为避免状态的无限增长,需要使用Timer清除过期数据。
很多互联网APP的机器学习样本拼接都可能依赖这个函数来实现:服务端的机器学习特征是实时生成的,用户在APP上的行为是交互后产生的,两者属于两个不同的数据流,用户行为是机器学习所需要标注的正负样本,因此可以按照这个逻辑来将两个数据流拼接起来,通过拼接更快得到下一轮机器学习的样本数据。使用Event Time时,两个数据流必须都设置好Watermark,只设置一个流的Event Time和Watermark,无法在CoProcessFunction和KeyedCoProcessFunction中使用Timer功能,因为process算子无法确定自己应该以怎样的时间来处理数据。
