背景

在flink 1.11之前的版本中,提供了两种生成水印(Watermark)的策略,分别是AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks,这两个接口都继承自TimestampAssigner接口。
用户想使用不同的水印生成方式,则需要实现不同的接口,但是这样引发了一个问题,对于想给水印添加一些通用的、公共的功能则变得复杂,因为我们需要给这两个接口都同时添加新的功能,这样还造成了代码的重复。
所以为了避免代码的重复,在flink 1.11 中对flink的水印生成接口进行了重构,

新的水印生成接口

当我们构建了一个DataStream之后,使用assignTimestampsAndWatermarks方法来构造水印,新的接口需要传入一个WatermarkStrategy对象。

  1. DataStream#assignTimestampsAndWatermarks(WatermarkStrategy<T>)

WatermarkStrategy 这个接口是做什么的呢?这里面提供了很多静态的方法和带有缺省实现的方法,只有一个方法是非default和没有缺省实现的,就是下面的这个方法。

  1. /**
  2. * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
  3. */
  4. @Override
  5. WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);

所以默认情况下,我们只需要实现这个方法就行了,这个方法主要是返回一个
WatermarkGenerator,我们在进入这里边看看。

  1. @Public
  2. public interface WatermarkGenerator<T> {
  3. /**
  4. * Called for every event, allows the watermark generator to examine and remember the
  5. * event timestamps, or to emit a watermark based on the event itself.
  6. */
  7. void onEvent(T event, long eventTimestamp, WatermarkOutput output);
  8. /**
  9. * Called periodically, and might emit a new watermark, or not.
  10. *
  11. * <p>The interval in which this method is called and Watermarks are generated
  12. * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
  13. */
  14. void onPeriodicEmit(WatermarkOutput output);
  15. }

这个方法简单明了,主要是有两个方法:

  • onEvent :每个元素都会调用这个方法,如果我们想依赖每个元素生成一个水印,然后发射到下游(可选,就是看是否用output来收集水印),我们可以实现这个方法.
  • onPeriodicEmit : 如果数据量比较大的时候,我们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的方法。这个水印的生成周期可以这样设置:env.getConfig().setAutoWatermarkInterval(5000L);

我们自己实现一个简单的周期性的发射水印的例子:
在这个onEvent方法里,我们从每个元素里抽取了一个时间字段,但是我们并没有生成水印发射给下游,而是自己保存了在一个变量里,在onPeriodicEmit方法里,使用最大的日志时间减去我们想要的延迟时间作为水印发射给下游。

  1. DataStream<Tuple2<String,Long>> withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks(
  2. new WatermarkStrategy<Tuple2<String,Long>>(){
  3. @Override
  4. public WatermarkGenerator<Tuple2<String,Long>> createWatermarkGenerator(
  5. WatermarkGeneratorSupplier.Context context){
  6. return new WatermarkGenerator<Tuple2<String,Long>>(){
  7. private long maxTimestamp;
  8. private long delay = 3000;
  9. @Override
  10. public void onEvent(
  11. Tuple2<String,Long> event,
  12. long eventTimestamp,
  13. WatermarkOutput output){
  14. maxTimestamp = Math.max(maxTimestamp, event.f1);
  15. }
  16. @Override
  17. public void onPeriodicEmit(WatermarkOutput output){
  18. output.emitWatermark(new Watermark(maxTimestamp - delay));
  19. }
  20. };
  21. }
  22. });

内置水印生成策略

为了方便开发,flink提供了一些内置的水印生成方法供我们使用。

固定延迟生成水印

通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间.使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。

  1. WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)

我们实现一个延迟3秒的固定延迟水印,可以这样做:

  1. DataStream dataStream = ...... ;
  2. dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));

他的底层使用的WatermarkGenerator接口的一个实现类BoundedOutOfOrdernessWatermarks。我们看下源码中的这两个方法,是不是和我们上面自己写的很像.

  1. @Override
  2. public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
  3. maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
  4. }
  5. @Override
  6. public void onPeriodicEmit(WatermarkOutput output) {
  7. output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
  8. }

单调递增生成水印

通过静态方法forMonotonousTimestamps来提供.

  1. WatermarkStrategy.forMonotonousTimestamps()

这个也就是相当于上述的延迟策略去掉了延迟时间,以event中的时间戳充当了水印。
在程序中可以这样使用:

  1. DataStream dataStream = ...... ;
  2. dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

它的底层实现是AscendingTimestampsWatermarks,其实它就是BoundedOutOfOrdernessWatermarks类的一个子类,没有了延迟时间,我们来看看具体源码的实现.

  1. @Public
  2. public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
  3. /**
  4. * Creates a new watermark generator with for ascending timestamps.
  5. */
  6. public AscendingTimestampsWatermarks() {
  7. super(Duration.ofMillis(0));
  8. }
  9. }

event时间的获取

上述我们讲了flink自带的两种水印生成策略,但是对于我们使用eventtime语义的时候,我们想从我们的自己的数据中抽取eventtime,这个就需要TimestampAssigner了.

  1. @Public
  2. @FunctionalInterface
  3. public interface TimestampAssigner<T> {
  4. ............
  5. long extractTimestamp(T element, long recordTimestamp);
  6. }

使用的时候我们主要就是从我们自己的元素element中提取我们想要的eventtime。
使用flink自带的水印策略和eventtime抽取类,可以这样用:

  1. DataStream dataStream = ...... ;
  2. dataStream.assignTimestampsAndWatermarks(
  3. WatermarkStrategy
  4. .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  5. .withTimestampAssigner((event, timestamp)->event.f1));

处理空闲数据源

在某些情况下,由于数据产生的比较少,导致一段时间内没有数据产生,进而就没有水印的生成,导致下游依赖水印的一些操作就会出现问题,比如某一个算子的上游有多个算子,这种情况下,水印是取其上游两个算子的较小值,如果上游某一个算子因为缺少数据迟迟没有生成水印,就会出现eventtime倾斜问题,导致下游没法触发计算。
所以filnk通过WatermarkStrategy.withIdleness()方法允许用户在配置的时间内(即超时时间内)没有记录到达时将一个流标记为空闲。这样就意味着下游的数据不需要等待水印的到来。
当下次有水印生成并发射到下游的时候,这个数据流重新变成活跃状态。
通过下面的代码来实现对于空闲数据流的处理

  1. WatermarkStrategy
  2. .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
  3. .withIdleness(Duration.ofMinutes(1));

DEMO

  1. public class OutOfOrderCase {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  5. env.addSource(new SourceFunction<Tuple3<String, Long,Integer>>() {
  6. @Override
  7. public void run(SourceContext<Tuple3<String, Long,Integer>> ctx) throws Exception {
  8. ctx.collect(Tuple3.of("key", 1610355600000L,1));
  9. ctx.collect(Tuple3.of("key", 1610355601000L,1));
  10. ctx.collect(Tuple3.of("key", 1610355608000L,1));
  11. ctx.collect(Tuple3.of("key", 1610355607000L,1));
  12. ctx.collect(Tuple3.of("key", 1610355604000L,1));
  13. ctx.collect(Tuple3.of("key", 1610355606000L,1));
  14. ctx.collect(Tuple3.of("key", 1610355606000L,1));
  15. ctx.collect(Tuple3.of("key", 1610355610000L,1));
  16. ctx.collect(Tuple3.of("key", 1610355608000L,1));
  17. // ctx.collect(Tuple2.of("key", 9000L,1));
  18. }
  19. @Override
  20. public void cancel() {
  21. }
  22. }).assignTimestampsAndWatermarks(
  23. // new AssignerWithPunctuatedWatermarks<Tuple3<String, Long, Integer>>() {
  24. // @Override
  25. // public long extractTimestamp(Tuple3<String, Long, Integer> element, long recordTimestamp) {
  26. // return element.f1;
  27. // }
  28. //
  29. // @Nullable
  30. // @Override
  31. // public Watermark checkAndGetNextWatermark(Tuple3<String, Long, Integer> lastElement, long extractedTimestamp) {
  32. // return new Watermark(lastElement.f1);
  33. // }
  34. // }
  35. // 延迟3秒的固定延迟水印
  36. // WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  37. // .withTimestampAssigner((event, timestamp) -> event.f1)
  38. // 单调递增生成水印
  39. WatermarkStrategy.<Tuple3<String, Long,Integer>>forMonotonousTimestamps()
  40. .withTimestampAssigner((event, timestamp) -> event.f1)
  41. // 在某些情况下,由于数据产生的比较少,导致一段时间内没有数据产生,进而就没有水印的生成,导致下游依赖水印的一些操作就会出现问题,
  42. // 比如某一个算子的上游有多个算子,这种情况下,水印是取其上游两个算子的较小值,如果上游某一个算子因为缺少数据迟迟没有生成水印,就会出现eventtime倾斜问题,导致下游没法触发计算。
  43. //
  44. // 所以filnk通过WatermarkStrategy.withIdleness()方法允许用户在配置的时间内(即超时时间内)没有记录到达时将一个流标记为空闲。这样就意味着下游的数据不需要等待水印的到来。
  45. // WatermarkStrategy
  46. // .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
  47. // .withIdleness(Duration.ofMinutes(1))
  48. )
  49. .keyBy(t -> t.f0)
  50. .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  51. .sum(2).print();
  52. env.execute("watermark");
  53. }
  54. }