Event Time(事件时间)

Event Time / Processing Time / Ingestion Time (事件时间/处理时间/摄取时间)

Flink supports different notions of time in streaming programs. flink支持不同的 time 流媒体的概念。

  • 处理时间: 处理时间是指正在执行相应操作的机器的系统时间。

    当流式程序在处理时间上运行时,所有基于时间的操作(如时间窗口)都将使用运行相应操作员的机器的系统时钟。每小时处理时间窗口将包括在系统时钟指示整个小时的时间之间到达特定操作员的所有记录。例如,如果应用程序在上午9:15开始运行,则第一个每小时处理时间窗口将包括上午9:15至10:00之间处理的事件,下一个窗口将包括上午10:00至11:00之间处理的事件,依此类推。

    处理时间是最简单的时间概念,并且不需要流和机器之间的协调。它提供了最佳性能和最低延迟。但是,在分布式和异步环境中,处理时间并不提供决定论,因为它容易受到记录到达系统的速度(例如,从消息队列)到系统内部操作员之间记录流的速度,以及停机(预定或其他)。

  • 事件时间: 事件时间是每个事件发生在其生产设备上的时间。此时间通常在记录进入Flink之前嵌入其中,并且可以从每个记录中提取该 event timestamp 。在事件时间,时间的进展取决于数据,而不是任何墙上的时钟。事件时程序必须指定如何生成 事件时间水印,这是一种在事件时间内表示进度的机制。这种水印机制将在后面的一节中描述,下面

    在一个完美的世界中,事件时间处理将产生完全一致和决定性的结果,而不管事件何时到达,或事件的顺序如何。但是,除非已知事件按顺序到达(按时间戳),否则事件时间处理在等待无序事件时会产生一些延迟。由于只能等待一段有限的时间,这就限制了确定性事件时间应用程序的性能。

    假设所有数据已经到达,事件时间操作将按预期运行,即使在与无序或延迟事件一起工作时或在再处理历史数据时也会产生正确且一致的结果。例如,每小时事件时间窗口将包含所有记录,该记录携带一个落入该小时的事件时间戳,而不管它们到达的顺序,还是在它们被处理时。(有关详细信息,请参见[晚期事件](#延迟元件)上的部分。)

    注意,有时当事件时间程序正在实时地处理实时数据时,它们将使用一些 processing time 操作,以保证它们以及时的方式进行。

  • 摄入时间: 摄入时间是事件进入Flink的时间。在源操作符中,每个记录以时间戳的形式获取源的当前时间,而基于时间的操作(如时间窗口)引用该时间戳。

    Ingestion time 概念上位于 event timeprocessing time 之间。与 processing time 相比,它的成本略高,但结果更可预测。由于摄取时间使用稳定的时间戳(在源上分配一次),对记录的不同窗口操作将引用相同的时间戳,而 _processing time 窗口运算符可以将记录分配给不同的窗口(基于本地系统时钟和任何传输延迟)。

    event time 相比,ingestion time 程序无法处理任何无序事件或后期数据,但程序不必指定如何生成 watermarks

    内部,ingestion time 被处理得像 event time,但具有自动时间戳分配和自动水印生成。

Event Time(事件时间) - 图1

Setting a Time Characteristic(设定时间特征)

Flink Datastream程序的第一部分通常设置基本 time characteristic。该设置定义了数据流源的行为方式(例如,它们是否将分配时间戳),以及诸如‘KeyedStream.timeWindow(Time.秒(30)’)之类的窗口操作应该使用什么时间概念。

以下示例显示了在每小时时间窗口中聚合事件的FLink程序。Windows的行为随时间特性而变化。

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  3. // alternatively:
  4. // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  5. // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  6. DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
  7. stream
  8. .keyBy( (event) -> event.getUser() )
  9. .timeWindow(Time.hours(1))
  10. .reduce( (a, b) -> a.add(b) )
  11. .addSink(...);
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  3. // alternatively:
  4. // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
  5. // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  6. val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
  7. stream
  8. .keyBy( _.getUser )
  9. .timeWindow(Time.hours(1))
  10. .reduce( (a, b) => a.add(b) )
  11. .addSink(...)

请注意,为了在 event time 中运行此示例,程序需要使用直接为数据定义事件时间并自己发出水印的源,或者程序必须在源之后注入一个 Timestamp Assigner & Watermark Generator。这些函数描述了如何访问事件时间戳,以及事件流显示的异常程度。

下面的部分描述了后面的一般机制 timestampswatermarks。有关如何在FLink数据流API中使用时间戳分配和水印生成的指南,请参阅生成时间戳/水印

Event Time and Watermarks(事件时间和水印)

注意:Flink实现了来自Dataflow模型的许多技术。有关事件时间和水印的详细介绍,请查看下面的文章。

支持event time 需要的流处理器需要一种度量事件时间进度的方法。例如,当事件时间超过小时结束时,生成每小时窗口的窗口运算符需要通知,以便操作员可以关闭窗口。v

EventTime 可独立于 processing time (由墙时钟测量)进度。例如,在一个程序中,运算符的当前 event time 运算符可能在 processing time (在接收事件时延迟)稍落后,同时两者都以相同的速度进行。另一方面,另一个流传送程序可能在几周的事件时间内通过仅仅几秒钟的处理来进行,通过在卡夫卡主题(或另一个消息队列)中已经缓冲的一些历史数据的快速转发。


Flink中测量事件时间进度的机制是水印。水印流作为数据流的一部分,并携带时间戳 t。a Watermark(t) 声明事件时间已到达该流中的 t,这意味着流中不应该有更多具有时间戳 t’ <= t (即具有时间戳较早或等于水印的事件)的元素。

下图显示了具有(逻辑)时间戳和内联的水印的事件流。在该示例中,事件是有序的(关于它们的时间戳),这意味着水印仅仅是流中的周期性标记。

具有事件(顺序)和水印的数据流

水印对于 out-of-order stream是至关重要的,如下所示,其中事件不是由它们的时间戳来排序的。通常,水印是由流中那个点声明的声明,直到某个时间戳的所有事件都应该到达。一旦水印到达操作者,操作者可以将其内部 event time clock TO推进到水印的值。

事件(无序)和watermarks的数据流“

注意,事件时间是由新创建的流元素(或元素)从产生它们的事件或从触发创建这些元素的水印中继承的。

Watermarks in Parallel Streams(并行流中的水印)

在源函数之前或之后生成水印。源函数的每个并行子任务通常独立地生成其水印。这些水印定义该特定并行源的事件时间。

当水印流经流媒体程序时,它们会提前到达运营商的事件时间。每当运算符提前其事件时间时,它就会在下游为其后续运算符生成一个新的水印。

某些运算符会消耗多个输入流;例如,一个UNION,或在 keyBy(…) 后面的 partition(…) 函数。这样的运营商的当前事件时间是其输入流“事件时间”的最小值,因为它的输入流更新了它们的事件时间,因此操作人员也是如此。

下图显示了流经并行流的事件和水印的示例,以及跟踪事件时间的操作符。

具有事件和水印的并行数据流和运算符

请注意,Kafka源支持每个分区加水印,您可以更多地阅读此处

Late Elements(后期元素)

某些元素可能会违反水印条件,这意味着即使在 Watermark(T) 发生之后,也会出现更多具有时间戳 t’ <= t 的元素。事实上,在许多现实世界的设置中,某些元素可能被任意延迟,因此无法指定某个事件时间戳的所有元素发生的时间。此外,即使延迟可以被限制,将水印延迟太多也是不可取的,因为它会导致对事件时间窗口的评估出现太多的延迟。

由于这个原因,流程序可能显式地期望某些 late 元素。延迟元素是在系统的事件时间时钟(由水印发出信号)已经超过晚元素时间戳的时间之后到达的元素。有关如何在事件时窗口中使用晚元素的更多信息,请参见已允许的Lateness“。

Idling sources (闲置来源)

目前,使用纯事件时间水印生成器,如果没有需要处理的元素,水印就无法进行处理。这意味着,在输入数据出现间隙时,事件时间不会进展,例如,将不会触发窗口操作符,因此现有窗口将无法生成任何输出数据。v

为了避免这种情况,可以使用周期水印分配器,其不仅基于元素时间戳来分配。示例性解决方案可以是在未观察到新事件一段时间之后切换到使用当前处理时间作为时间基准的分配器。

可以使用SourceFunction.SourceContext#markAsTemporarilyIdle将源标记为空闲。有关详细信息,请参阅本方法的Javadoc以及 StreamStatus

Debugging Watermarks(调试水印)

请参阅调试Windows和事件时间部分,用于在运行时调试水印。

How operators are processing watermarks (运算符如何处理水印)

一般情况下,操作人员在向下游转发水印之前必须完全处理给定的水印。例如,WindowOperator 将首先评估哪些窗口应该被触发,并且只有在生成由水印触发的所有输出之后,水印本身才会被发送到下游。换句话说,由于水印的出现而产生的所有元素都将在水印之前发出。

同样的规则也适用于 TwoInputStreamOperator。但是,在这种情况下,算子的当前水印被定义为其两个输入的最小值。

此行为的详细信息由OneInputStreamOperator#processWatermarkTwoInputStreamOperator#processWatermark1TwoInputStreamOperator#processWatermark2方法的实现来定义。