我们之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如:MapFunction 这样的 map 转换算子就无法访问时间戳或当前事件的事件时间;
基于此,DataStreamAPI提供了一系列的底层API转换算子,可以访问时间戳、Watermark以及注册定时事件。还可以输出特定的一些事件,例如:超时事件等。
ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑;Flink提供了8个ProcessFunction:

  • ProcessFunction :常用
  • KeyedProcessFunction :常用
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

一、KeyedProcessFunction

用来操作 KeyedStream。KeyedProcessFunction 会处理流的每一个元素,输出为0个、1个或者多个元素。所有的ProcessFunction 都继承自 RichFunction接口,所以都有 open()、close() 和 getRuntimeContext() 等方法。而 KeyedProcessFunction 还提供了额外两个方法:

  • processElement(v: IN, ctx: Context, out: Collector[OUT]),流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳、元素的key 以及TimeService 时间服务。Context 还可以将结果输出到别的流(side outputs)
  • onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]) 是一个回调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。 Collector 为输出的结果集合。OnTimerContext 和 processElement 的Context参数一样,提供了上下文的一些信息;

二、TimerService和定时器(Timer)

实例:

  1. package com.wells.flink.demo.process;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.common.state.ValueState;
  4. import org.apache.flink.api.common.state.ValueStateDescriptor;
  5. import org.apache.flink.api.java.tuple.Tuple;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.configuration.Configuration;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  12. import org.apache.flink.util.Collector;
  13. /**
  14. * Description ProcessAPI
  15. * Created by wells on 2020-05-24 17:45:41
  16. */
  17. public class ProcessAPITest {
  18. public static void main(String[] args) throws Exception {
  19. String host = "localhost";
  20. int port = 9999;
  21. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  22. env.setParallelism(1);
  23. DataStreamSource<String> dataStreamSource = env.socketTextStream(host, port);
  24. dataStreamSource.print("socketSource");
  25. SingleOutputStreamOperator<String> processStreamSource = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
  26. @Override
  27. public Tuple2<String, Integer> map(String line) throws Exception {
  28. String[] split = line.split(" ", -1);
  29. return new Tuple2<String, Integer>(split[0], Integer.parseInt(split[1]));
  30. }
  31. }).keyBy(0).process(new MyKeyedProcessFunction());
  32. processStreamSource.print("processSource");
  33. env.execute();
  34. }
  35. }
  36. class MyKeyedProcessFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, String> {
  37. // 定义一个状态,用来保存上一行记录的数字
  38. private transient ValueState<Integer> lastNumber;
  39. // 记录增加的定时器的时间,方便清除
  40. private transient ValueState<Long> currentTimerTs;
  41. @Override
  42. public void open(Configuration parameters) throws Exception {
  43. lastNumber = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastNumber", Integer.class, 0));
  44. currentTimerTs = getRuntimeContext().getState(new ValueStateDescriptor<Long>("currentTimerTs", Long.class, 0L));
  45. }
  46. // 处理流中的每一个元素
  47. @Override
  48. public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
  49. // 得到上一次的状态值
  50. Integer preNumber = lastNumber.value();
  51. // 更新上一次的状态值
  52. lastNumber.update(value.f1);
  53. // 当前定时器的时间
  54. Long currTimerTs = currentTimerTs.value();
  55. // 如果数字减小或者第一次设置定时器,则触发定时器
  56. if (value.f1 < preNumber && currTimerTs == 0) {
  57. // 触发定时器时间: 在当前时间基础上加10s 即 当前时间过10s后数字下降,触发定时器
  58. long timerTs = ctx.timerService().currentProcessingTime() + 10000L;
  59. ctx.timerService().registerProcessingTimeTimer(timerTs);
  60. currentTimerTs.update(timerTs);
  61. } else if (value.f1 > preNumber || preNumber == 0) {
  62. // 数字变大 或者 第一次进入 则删除定时器
  63. ctx.timerService().deleteProcessingTimeTimer(currTimerTs);
  64. currentTimerTs.clear();
  65. }
  66. }
  67. // 定时器要做的事情
  68. @Override
  69. public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
  70. out.collect("number more than last number");
  71. currentTimerTs.clear();
  72. }
  73. }

三、侧输出流

通过在processFunction处理将流分开;也可以使用 split 算子

  1. package com.wells.flink.demo.process;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.common.typeinfo.TypeInformation;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.ProcessFunction;
  9. import org.apache.flink.util.Collector;
  10. import org.apache.flink.util.OutputTag;
  11. /**
  12. * Description 侧输出流,通过
  13. * Created by wells on 2020-05-24 19:06:57
  14. */
  15. public class SideOutputTest {
  16. public static void main(String[] args) throws Exception {
  17. String host = "localhost";
  18. int port = 9999;
  19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20. env.setParallelism(1);
  21. DataStreamSource<String> dataStreamSource = env.socketTextStream(host, port);
  22. dataStreamSource.print("socketSource");
  23. SingleOutputStreamOperator<String> processStreamSource = dataStreamSource.map(new MapFunction<String, Integer>() {
  24. @Override
  25. public Integer map(String line) throws Exception {
  26. return Integer.parseInt(line);
  27. }
  28. }).process(new MyProcessFunction());
  29. processStreamSource.print("processSource");
  30. processStreamSource.getSideOutput(new OutputTag<String>("numberDecrease", TypeInformation.of(String.class)))
  31. .print("numberDecrease");
  32. env.execute();
  33. }
  34. }
  35. class MyProcessFunction extends ProcessFunction<Integer, String> {
  36. private transient OutputTag<String> outputTag;
  37. @Override
  38. public void open(Configuration parameters) throws Exception {
  39. outputTag = new OutputTag<String>("numberDecrease", TypeInformation.of(String.class));
  40. }
  41. // 处理流中的每一个元素
  42. @Override
  43. public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
  44. // 如果数字减小或者第一次设置定时器,则触发定时器
  45. if (value < 0) {
  46. ctx.output(outputTag, String.valueOf(value));
  47. } else {
  48. out.collect(String.valueOf(value));
  49. }
  50. }
  51. }