https://blog.csdn.net/boling_cavalry/article/details/106299167
https://www.modb.pro/db/155268

ProcessFunction

ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。
ProcessFunction是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:

  • events:数据流中的元素
  • state:状态,用于容错和一致性,仅用于keyed stream
  • timers:定时器,支持事件时间和处理时间,仅用于keyed stream

Flink提供了8个Process Function:

  • ProcessFunction:dataStream
  • KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理
  • CoProcessFunction:用于connect连接的流
  • ProcessJoinFunction:用于join流操作
  • BroadcastProcessFunction:用于广播
  • KeyedBroadcastProcessFunction:keyBy之后的广播
  • ProcessWindowFunction:窗口增量聚合
  • ProcessAllWindowFunction:全窗口聚合

可以将ProcessFunction看作是一个具有key state和定时器(timer)访问权的FlatMapFunction。对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件。
对于容错状态,ProcessFunction 可以通过 RuntimeContext 访问Flink的keyed state,这与其他有状态函数访问keyed state的方式类似。
定时器可让应用程序对在处理时间和事件时间中的变化进行响应。每次调用 processElement(…)函数时都可以获得一个Context对象,通过该对象可以访问元素的事件时间(event time)时间戳以及 TimerService。可以使用TimerService为将来的事件时间/处理时间实例注册回调。对于事件时间计时器,当当前水印被提升到或超过计时器的时间戳时,将调用onTimer(…)方法,而对于处理时间计时器,当挂钟时间达到指定时间时,将调用onTimer(…)方法。在调用期间,所有状态的范围再次限定为创建定时器所用的key,从而允许定时器操作keyed state。

一. KeyedProcessFunction

这个是相对比较常用的ProcessFunction,根据名字就可以知道是用在keyedStream上的。
KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction还额外提供了两个方法:

  1. processElement(I value, Context ctx, Collector out),流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的 key ,以及TimerService 时间服务。 Context 还可以将结果输出到别的流(side outputs)。
  2. onTimer(long timestamp, OnTimerContext ctx, Collector out),是一个回调函数。当之前注册的定时器触发时调用。参数timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext和processElement的Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。

实验目标

监听本机9999端口,获取字符串; 将每个字符串用空格分隔,转成Tuple2实例,f0是分隔后的单词,f1等于1; 上述Tuple2实例用f0字段分区,得到KeyedStream; KeyedSteam转入自定义KeyedProcessFunction处理; 自定义KeyedProcessFunction的作用,是记录每个单词最新一次出现的时间,然后建一个十秒的定时器,十秒后如果发现这个单词没有再次出现,就把这个单词和它出现的总次数发送到下游算子;

  1. package com.bolingcavalry.keyedprocessfunction;
  2. import com.bolingcavalry.Splitter;
  3. import org.apache.flink.api.common.state.ValueState;
  4. import org.apache.flink.api.common.state.ValueStateDescriptor;
  5. import org.apache.flink.api.java.tuple.Tuple;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.configuration.Configuration;
  8. import org.apache.flink.streaming.api.TimeCharacteristic;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
  12. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  13. import org.apache.flink.streaming.api.watermark.Watermark;
  14. import org.apache.flink.util.Collector;
  15. import java.text.SimpleDateFormat;
  16. import java.util.Date;
  17. /**
  18. * @author will
  19. * @email zq2599@gmail.com
  20. * @date 2020-05-17 13:43
  21. * @description 体验KeyedProcessFunction类(时间类型是处理时间)
  22. */
  23. public class ProcessTime {
  24. /**
  25. * KeyedProcessFunction的子类,作用是将每个单词最新出现时间记录到backend,并创建定时器,
  26. * 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子
  27. */
  28. static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {
  29. // 自定义状态
  30. private ValueState<CountWithTimestamp> state;
  31. @Override
  32. public void open(Configuration parameters) throws Exception {
  33. // 初始化状态,name是myState
  34. state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
  35. }
  36. @Override
  37. public void processElement(
  38. Tuple2<String, Integer> value,
  39. Context ctx,
  40. Collector<Tuple2<String, Long>> out) throws Exception {
  41. // 取得当前是哪个单词
  42. Tuple currentKey = ctx.getCurrentKey();
  43. // 从backend取得当前单词的myState状态
  44. CountWithTimestamp current = state.value();
  45. // 如果myState还从未没有赋值过,就在此初始化
  46. if (current == null) {
  47. current = new CountWithTimestamp();
  48. current.key = value.f0;
  49. }
  50. // 单词数量加一
  51. current.count++;
  52. // 取当前元素的时间戳,作为该单词最后一次出现的时间
  53. current.lastModified = ctx.timestamp();
  54. // 重新保存到backend,包括该单词出现的次数,以及最后一次出现的时间
  55. state.update(current);
  56. // 为当前单词创建定时器,十秒后后触发
  57. long timer = current.lastModified + 10000;
  58. ctx.timerService().registerProcessingTimeTimer(timer);
  59. // 打印所有信息,用于核对数据正确性
  60. System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)\n\n",
  61. currentKey.getField(0),
  62. current.count,
  63. current.lastModified,
  64. time(current.lastModified),
  65. timer,
  66. time(timer)));
  67. }
  68. /**
  69. * 定时器触发后执行的方法
  70. * @param timestamp 这个时间戳代表的是该定时器的触发时间
  71. * @param ctx
  72. * @param out
  73. * @throws Exception
  74. */
  75. @Override
  76. public void onTimer(
  77. long timestamp,
  78. OnTimerContext ctx,
  79. Collector<Tuple2<String, Long>> out) throws Exception {
  80. // 取得当前单词
  81. Tuple currentKey = ctx.getCurrentKey();
  82. // 取得该单词的myState状态
  83. CountWithTimestamp result = state.value();
  84. // 当前元素是否已经连续10秒未出现的标志
  85. boolean isTimeout = false;
  86. // timestamp是定时器触发时间,如果等于最后一次更新时间+10秒,就表示这十秒内已经收到过该单词了,
  87. // 这种连续十秒没有出现的元素,被发送到下游算子
  88. if (timestamp == result.lastModified + 10000) {
  89. // 发送
  90. out.collect(new Tuple2<String, Long>(result.key, result.count));
  91. isTimeout = true;
  92. }
  93. // 打印数据,用于核对是否符合预期
  94. System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %s\n\n",
  95. currentKey.getField(0),
  96. result.count,
  97. result.lastModified,
  98. time(result.lastModified),
  99. timestamp,
  100. time(timestamp),
  101. String.valueOf(isTimeout)));
  102. }
  103. }
  104. public static void main(String[] args) throws Exception {
  105. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  106. // 并行度1
  107. env.setParallelism(1);
  108. // 处理时间
  109. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  110. // 监听本地9999端口,读取字符串
  111. DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);
  112. // 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到
  113. DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream
  114. // 对收到的字符串用空格做分割,得到多个单词
  115. .flatMap(new Splitter())
  116. // 设置时间戳分配器,用当前时间作为时间戳
  117. .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {
  118. @Override
  119. public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
  120. // 使用当前系统时间作为时间戳
  121. return System.currentTimeMillis();
  122. }
  123. @Override
  124. public Watermark getCurrentWatermark() {
  125. // 本例不需要watermark,返回null
  126. return null;
  127. }
  128. })
  129. // 将单词作为key分区
  130. .keyBy(0)
  131. // 按单词分区后的数据,交给自定义KeyedProcessFunction处理
  132. .process(new CountWithTimeoutFunction());
  133. // 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来
  134. timeOutWord.print();
  135. env.execute("ProcessFunction demo : KeyedProcessFunction");
  136. }
  137. public static String time(long timeStamp) {
  138. return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
  139. }
  140. }

二. TimerService和定时器(Timers)

Context 和OnTimerContext 所持有的TimerService 对象拥有以下方法:

  1. long currentProcessingTime() 返回当前处理时间
  2. long currentWatermark() 返回当前watermark 的时间戳
  3. void registerProcessingTimeTimer( long timestamp) 会注册当前key的processing time的定时器。当processing time 到达定时时间时,触发timer。
  4. void registerEventTimeTimer(long timestamp) 会注册当前key 的event time 定时器。当Watermark水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
  5. void deleteProcessingTimeTimer(long timestamp) 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
  6. void deleteEventTimeTimer(long timestamp) 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

当定时器timer 触发时,会执行回调函数onTimer()。注意定时器timer 只能在keyed streams 上面使用。

三. 侧输出流(SideOutput)

一个数据可以被多个window包含,只有其不被任何window包含的时候(包含该数据的所有window都关闭之后),才会被丢到侧输出流。
简言之,如果一个数据被丢到侧输出流,那么所有包含该数据的window都由于已经超过了”允许的迟到时间”而关闭了,进而新来的迟到数据只能被丢到侧输出流!
大部分的DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。
processfunction 的side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。
一个side output 可以定义为OutputTag[X]对象,X 是输出流的数据类型。
processfunction 可以通过Context 对象发射一个事件到一个或者多个side outputs。

四. CoProcessFunction

我们在DataStream API部分曾提到使用connect()将两个数据流的合并,如果想从更细的粒度在两个数据流进行一些操作,可以使用CoProcessFunction或KeyedCoProcessFunction。这两个函数都有processElement1()和processElement2()方法,分别对第一个数据流和第二个数据流的每个元素进行处理。第一个数据流类型、第二个数据流类型和经过函数处理后的输出类型可以互不相同。尽管数据来自两个不同的流,但是他们可以共享同样的状态,所以可以参考下面的逻辑来实现两个数据流上的Join:

  • 创建一到多个状态,两个数据流都能访问到这些状态,这里以状态a为例。
  • processElement1()方法处理第一个数据流,更新状态a。
  • processElement2()方法处理第二个数据流,根据状态a中的数据,生成相应的输出。

我们这次将股票价格结合媒体评价两个数据流一起讨论,假设对于某支股票有一个媒体评价数据流,媒体评价数据流包含了对该支股票的正负评价。两支数据流一起流入KeyedCoProcessFunction,processElement2()方法处理流入的媒体数据,将媒体评价更新到状态mediaState上,processElement1()方法处理流入的股票交易数据,获取mediaState状态,生成到新的数据流。两个方法分别处理两个数据流,共享一个状态,通过状态来通信。

  1. // 读入股票数据流
  2. DataStream<StockPrice> stockStream = ...
  3. // 读入媒体评价数据流
  4. DataStream<Media> mediaStream = ...
  5. DataStream<StockPrice> joinStream = stockStream.connect(mediaStream)
  6. .keyBy("symbol", "symbol")
  7. // 调用process函数
  8. .process(new JoinStockMediaProcessFunction());
  1. /**
  2. * 四个泛型:Key,第一个流类型,第二个流类型,输出。
  3. */
  4. public static class JoinStockMediaProcessFunction extends KeyedCoProcessFunction<String, StockPrice, Media, StockPrice> {
  5. // mediaState
  6. private ValueState<String> mediaState;
  7. @Override
  8. public void open(Configuration parameters) throws Exception {
  9. // 从RuntimeContext中获取状态
  10. mediaState = getRuntimeContext().getState(
  11. new ValueStateDescriptor<String>("mediaStatusState", Types.STRING));
  12. }
  13. @Override
  14. public void processElement1(StockPrice stock, Context context, Collector<StockPrice> collector) throws Exception {
  15. String mediaStatus = mediaState.value();
  16. if (null != mediaStatus) {
  17. stock.mediaStatus = mediaStatus;
  18. collector.collect(stock);
  19. }
  20. }
  21. @Override
  22. public void processElement2(Media media, Context context, Collector<StockPrice> collector) throws Exception {
  23. // 第二个流更新mediaState
  24. mediaState.update(media.status);
  25. }
  26. }

这个例子比较简单,没有使用Timer,实际的业务场景中状态一般用到Timer将过期的状态清除。两个数据流的中间数据放在状态中,为避免状态的无限增长,需要使用Timer清除过期数据。
很多互联网APP的机器学习样本拼接都可能依赖这个函数来实现:服务端的机器学习特征是实时生成的,用户在APP上的行为是交互后产生的,两者属于两个不同的数据流,用户行为是机器学习所需要标注的正负样本,因此可以按照这个逻辑来将两个数据流拼接起来,通过拼接更快得到下一轮机器学习的样本数据。
使用Event Time时,两个数据流必须都设置好Watermark,只设置一个流的Event Time和Watermark,无法在CoProcessFunction和KeyedCoProcessFunction中使用Timer功能,因为process算子无法确定自己应该以怎样的时间来处理数据。