如时间戳和水印处理中所述,Flink提供抽象,允许程序员分配他们自己的时间戳并发出他们自己的水印。更具体地说,可以通过实现其中一个AssignerWithPeriodicWatermarks
和AssignerWithPunctuatedWatermarks
接口来实现,具体取决于用例。简而言之,第一个将定期发出水印,而第二个基于传入记录的某些属性,例如,只要在流中遇到特殊数据元。
为了进一步简化此类任务的编程工作,Flink附带了一些预先实现的时间戳分配器。本节提供了它们的列表。除了开箱即用的函数外,它们的实现还可以作为自定义实现的示例。
具有递增时间戳的分发者
定期水印生成的最简单的特殊情况是给定源任务看到的时间戳按升序发生的情况。在这种情况下,当前时间戳始终可以充当水印,因为没有更早的时间戳会到达。
请注意,每个并行数据源任务只需要提升时间戳。例如,如果在特定设置中,一个并行数据源实例读取一个Kafka分区,则只需要在每个Kafka分区中时间戳递增。当并行流被混洗,联合,连接或合并时,Flink的水印合并机制将生成正确的水印。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
允许固定数量的迟到的分配者
定期水印生成的另一个例子是当水印滞后于在流中看到的最大(事件 - 时间)时间戳一段固定的时间。这种情况涵盖了预先知道流中可能遇到的最大延迟的情况,例如,当创建包含时间戳在固定时间段内扩展的数据元的自定义源以进行测试时。对于这些情况,Flink提供了BoundedOutOfOrdernessTimestampExtractor
作为参数的参数maxOutOfOrderness
,即在计算给定窗口的最终结果时,在忽略数据元之前允许数据元延迟的最长时间。延迟对应于结果t - t_w
,其中t
是数据元的(事件 - 时间)时间戳,以及t_w
前一个水印的时间戳。如果lateness > 0
然后,该数据元被认为是迟到的,并且在计算其对应窗口的作业结果时默认被忽略。有关 使用延迟数据元的更多信息,请参阅有关允许延迟的文档。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](docs_1.7-SNAPSHOT_Time.seconds(10))( _.getCreationTime ))