1.1 介绍

Flink一般的转换算子是无法访问事件的时间戳信息和WaterMark信息的。例如 MapFunction 这样的 map 转换算子就无法访问时间戳或者当前事件的事件时间。而这在某些应用场景下,这些信息确极为重要。基于此 DataStream API 提供了一系列的 Low-Level 转换算子,可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。
Process Function 是用来构建事件驱动的应用以及实现自定义的业务逻辑 ( 使用之前的window 函数和转换算子无法实现 ) 。例如 Flink SQL 就是使用 Process Function 实现的。所有的 Process Function 都继承自 RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。

1.2 内部构件

ProcessFunction是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:

  • events:数据流中的元素
  • state:状态,用于容错和一致性,仅用于keyed stream
  • timers:定时器,支持事件时间和处理时间,仅用于keyed stream

    1.3 分类

    Flink 提供了 8 个 Process Function:
  1. ProcessFunction:用于DataStream流数据处理
  2. KeyedProcessFunction:用于keyBy之后的KeyedStream流处理
  3. CoProcessFunction:用于connect连接的流处理
  4. ProcessJoinFunction:用于join流操作
  5. BroadcastProcessFunction:用于广播
  6. KeyedBroadcastProcessFunction:keyBy之后的广播
  7. ProcessWindowFunction:窗口增量聚合
  8. ProcessAllWindowFunction:全窗口增量聚合

    2. ProcessFunction

    2.1 ProcessFunction 介绍

  • 可以使用 steam.process(ProcessFunction)的方式使用 ProcessFunction
  • 从 ProcessFunction类图可见,它有RichFunction的特性open、close,也有两个重要的方法processElement和onTimer

ProcessFunction 介绍 - 图1
ProcessFunction 继承图

2.2 ProcessFunction 源码

  1. public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
  2. private static final long serialVersionUID = 1L;
  3. /**
  4. * ProcessFunction处理数据的主要方法,处理输入流中的每个元素
  5. * 此函数可以使用Collector输出零个或多个元素,
  6. * 还可以使用Context参数更新内部状态或设置计时器。
  7. *
  8. * @param value 输入类型.
  9. * @param ctx 上下文
  10. * @param out 使用 collector 输出数据
  11. */
  12. public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
  13. /**
  14. * 在使用TimerService设置的计时器触发时调用。
  15. */
  16. public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
  17. /**
  18. * 上下文
  19. */
  20. public abstract class Context {
  21. /**
  22. * 当前正在处理的元素的时间戳或触发计时器的时间戳
  23. */
  24. public abstract Long timestamp();
  25. /**
  26. * 用于注册计时器和查询时间的TimerService
  27. */
  28. public abstract TimerService timerService();
  29. /**
  30. * 向OutputTag标识的侧输出流发出记录
  31. *
  32. * @param outputTag 指定侧输出流
  33. * @param value 发送的记录
  34. */
  35. public abstract <X> void output(OutputTag<X> outputTag, X value);
  36. }
  37. /**
  38. * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
  39. */
  40. public abstract class OnTimerContext extends Context {
  41. /**
  42. * 触发计时器的 TimeDomain
  43. */
  44. public abstract TimeDomain timeDomain();
  45. }
  46. }

3. KeyedProcessFunction

3.1 介绍

  • KeyedProcessFunction 用来处理 KeyedStream 中的数据。
  • KeyedProcessFunction[KEY, IN, OUT] 还额外提供了两个方法: ```java /**
    • 流中的每一个元素都会在这个方法中进行处理
    • 参数说明
    • Collector :将处理完成的数据输出,可能输出 0 到多个结果
    • Context:可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。
    • : 还可以将结果输出到别的流(side outputs)。
    • I :输入数据类型 */ public abstract void processElement(I value, Context ctx, Collector out) throws Exception;

/**

  • onTimer是一个回调函数。当之前注册的定时器触发时调用。
  • 参数说明
  • timestamp:为定时器所设定的触发的时间戳。
  • Collector:为输出结果的集合。
  • OnTimerContext:和processElement的Context参数一样,提供上下文的一些信息,例如定时器触发的时间信息 */ public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {} ```

    4. CoProcessFunction

  • DataStream API 提供了 CoProcessFunction 这样的 low-level 操作来对于两条输入流进行处理
  • CoProcessFunction 提供了操作每一个输入流的方法: processElement1()processElement2()
  • 类似于 ProcessFunction 这两种方法都通过 Context 对象来调用。这个 Context 对象可以访问事件数据,定时器时间戳,TimerService,以及 side outputs。CoProcessFunction 也提供了 onTimer()回调函数。

    5. ProcessJoinFunction

    scala
    leftKeyedStream // intervalJoin目前只支持Event Time .intervalJoin(rightKeyedStream) // 时间间隔,设定下界和上界 .between(Time.minutes(-10),Time.seconds(0)) // 不包含下界 //.lowerBoundExclusive() // 不包含上界 //.upperBoundExclusive() // 自定义ProcessJoinFunction 处理Join到的元素 .process(ProcessJoinFunction)

    6. BroadcastProcessFunction

    参考Idea广播流代码

7. KeyedBroadcastProcessFunction

8. ProcessWindowFunction

ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性,但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。

9. ProcessAllWindowFunction

与 ProcessWindowFunction 功能类似,不过作用在 AllWindowedStream 之上。

10. SideOutput(侧输出流)

大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。processfunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象, X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。