Flink能够根据time的不同概念处理流式数据。

  • Processing time 指的是执行相应操作的机器的系统时间(也称为“挂钟时间”)。
  • Event time 指的是基于附加到每一行的时间戳流式数据处理。时间戳可以在事件发生时进行编码。
  • Ingestion time 是事件进入Flink的时间; 在内部,它与事件时间类似地对待。

有关Flink中时间处理的更多信息,请参阅有关 Event Time and Watermarks的介绍。

本页介绍了如何在Flink的Table API和SQL中为基于时间的操作定义时间属性。

时间属性简介

基于时间的操作,例如 Table APISQL 中的窗口,需要有关时间概念及其来源的信息。因此,表可以提供 logical time attributes 用于指示时间和访问表程序中的相应时间戳。

时间属性可以是每个表模式的一部分。它们是在从 DataStream 创建表时定义的,或者是在使用 TableSource 时预定义的。一旦在开头定义了时间属性,它就可以作为字段引用,并且可以在基于时间的操作中使用。

只要时间属性未被修改并且只是从查询的一部分转发到另一部分,它仍然是有效的时间属性 时间属性的行为类似于常规时间戳,可以访问以进行计算。如果在计算中使用了时间属性,则它将具体化并成为常规时间戳。常规时间戳不与 Flink 的时间和水印系统配合,因此不能再用于基于时间的操作。

表程序要求为流式环境指定相应的时间特性:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
  3. // alternatively:
  4. // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  5. // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
  3. // alternatively:
  4. // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

处理时间

处理时间允许表程序根据本地机器的时间产生结果。这是最简单的时间概念,但不提供决定论。它既不需要时间戳提取也不需要水印生成。

有两种方法可以定义处理时间属性。

在 DataStream 到 Table 转换期间

处理时间属性在模式定义期间使用 .proctime 属性定义。time 属性只能通过附加的逻辑字段扩展物理模式。因此,它只能在模式定义的末尾定义。

  1. DataStream<Tuple2<String, String>> stream = ...;
  2. // declare an additional logical field as a processing time attribute
  3. Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
  4. WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  1. val stream: DataStream[(String, String)] = ...
  2. // declare an additional logical field as a processing time attribute val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)
  3. val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

使用 TableSource

处理时间属性由实现 DefinedProctimeAttribute 接口的 TableSource 定义。逻辑时间属性附加到由 TableSource 的返回类型定义的物理模式。

  1. // define a table source with a processing attribute
  2. public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
  3. @Override
  4. public TypeInformation<Row> getReturnType() {
  5. String[] names = new String[] {"Username" , "Data"};
  6. TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
  7. return Types.ROW(names, types);
  8. }
  9. @Override
  10. public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
  11. // create stream
  12. DataStream<Row> stream = ...;
  13. return stream;
  14. }
  15. @Override
  16. public String getProctimeAttribute() {
  17. // field with this name will be appended as a third field
  18. return "UserActionTime";
  19. }
  20. }
  21. // register table source
  22. tEnv.registerTableSource("UserActions", new UserActionSource());
  23. WindowedTable windowedTable = tEnv
  24. .scan("UserActions")
  25. .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  1. // define a table source with a processing attribute class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {
  2. override def getReturnType = {
  3. val names = Array[String](e57390c10589707ddf313ff9ae6110f0)
  4. val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
  5. Types.ROW(names, types)
  6. }
  7. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
  8. // create stream val stream = ...
  9. stream
  10. }
  11. override def getProctimeAttribute = {
  12. // field with this name will be appended as a third field "UserActionTime"
  13. }
  14. }
  15. // register table source tEnv.registerTableSource("UserActions", new UserActionSource)
  16. val windowedTable = tEnv
  17. .scan("UserActions")
  18. .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

事件时间

事件时间允许表程序根据每个记录中包含的时间生成结果。即使在无序事件或延迟事件的情况下,这也允许一致的结果。当从持久存储中读取记录时,它还确保表程序的可重放结果。

此外,事件时间允许批处理和流式处理环境中的表程序的统一语法。流式处理环境中的时间属性可以是批处理环境中的记录的常规字段。

为了处理乱序事件并区分流式数据中的准时和迟到事件,Flink 需要从事件中提取时间戳并及时取得某种进展(所谓的watermarks)。

可以在 DataStream 到 Table 转换期间或使用 TableSource 定义事件时间属性。

DataStream 到 Table 转换期间

在模式定义期间使用 .rowtime 属性定义事件时间属性。Timestamps and watermarks 必须在已转换的 DataStream 中分配。

DataStream 转换为 Table 时有两种定义时间属性的方法。根据指定的 .rowtime 字段名称是否存在于 DataStream 的模式中,时间戳字段要么是

  • 作为新字段附加到模式或
  • 替换现有字段。

在任何一种情况下,事件时间时间戳字段都将保存 DataStream 事件时间戳的值。

  1. // Option 1:
  2. // extract timestamp and assign watermarks based on knowledge of the stream
  3. DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
  4. // declare an additional logical field as an event time attribute
  5. Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");
  6. // Option 2:
  7. // extract timestamp from first field, and assign watermarks based on knowledge of the stream
  8. DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
  9. // the first field has been used for timestamp extraction, and is no longer necessary
  10. // replace first field with a logical event time attribute
  11. Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");
  12. // Usage:
  13. WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  1. // Option 1:
  2. // extract timestamp and assign watermarks based on knowledge of the stream val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
  3. // declare an additional logical field as an event time attribute val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)
  4. // Option 2:
  5. // extract timestamp from first field, and assign watermarks based on knowledge of the stream val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
  6. // the first field has been used for timestamp extraction, and is no longer necessary
  7. // replace first field with a logical event time attribute val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)
  8. // Usage:
  9. val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

使用 TableSource

事件时间属性由实现 DefinedRowtimeAttributes 接口的 TableSource 定义。getRowtimeAttributeDescriptors() 方法返回一个用于描述时间属性的最终名称 RowtimeAttributeDescriptor 列表,一个用于派生属性值的时间戳提取器,以及与该属性相关的 watermark 策略。

请确保 getDataStream() 方法返回的 DataStream 与定义的 time 属性对齐。只有在定义了 StreamRecordTimestamp 时间戳提取器时,才会考虑 DataStream 的时间戳(由 TimestampAssigner 分配的时间戳)。只有在定义了 PreserveWatermarks 水印策略时,才会保留 DataStream 的水印。 否则,只有 TableSource 的 rowtime 属性的值是相关的。

  1. // define a table source with a rowtime attribute
  2. public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
  3. @Override
  4. public TypeInformation<Row> getReturnType() {
  5. String[] names = new String[] {"Username", "Data", "UserActionTime"};
  6. TypeInformation[] types =
  7. new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
  8. return Types.ROW(names, types);
  9. }
  10. @Override
  11. public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
  12. // create stream
  13. // ...
  14. // assign watermarks based on the "UserActionTime" attribute
  15. DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
  16. return stream;
  17. }
  18. @Override
  19. public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
  20. // Mark the "UserActionTime" attribute as event-time attribute.
  21. // We create one attribute descriptor of "UserActionTime".
  22. RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
  23. "UserActionTime",
  24. new ExistingField("UserActionTime"),
  25. new AscendingTimestamps());
  26. List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
  27. return listRowtimeAttrDescr;
  28. }
  29. }
  30. // register the table source
  31. tEnv.registerTableSource("UserActions", new UserActionSource());
  32. WindowedTable windowedTable = tEnv
  33. .scan("UserActions")
  34. .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  1. // define a table source with a rowtime attribute class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
  2. override def getReturnType = {
  3. val names = Array[String](a89bac3786b8823ec6827018da21a0cb)
  4. val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
  5. Types.ROW(names, types)
  6. }
  7. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
  8. // create stream // ... // assign watermarks based on the "UserActionTime" attribute val stream = inputStream.assignTimestampsAndWatermarks(...)
  9. stream
  10. }
  11. override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
  12. // Mark the "UserActionTime" attribute as event-time attribute. // We create one attribute descriptor of "UserActionTime". val rowtimeAttrDescr = new RowtimeAttributeDescriptor(
  13. "UserActionTime",
  14. new ExistingField("UserActionTime"),
  15. new AscendingTimestamps)
  16. val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)
  17. listRowtimeAttrDescr
  18. }
  19. }
  20. // register the table source tEnv.registerTableSource("UserActions", new UserActionSource)
  21. val windowedTable = tEnv
  22. .scan("UserActions")
  23. .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)