时间概念

  • Event time:日志中的业务时间
  • Ingestion time:进入flink source的时间
  • Proessing time:执行每一个flink算子的时间

    event time的使用

  • 需要提供一个时间戳提取器和 Watermark 生成器

    1. // 已废弃 1.12后默认是eventTime
    2. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

watermark

如果数据流中的数据是乱序的,我们无从知晓本window的所有数据是否都被消费了,又不能无期限的等待。所以要有一种触发结束window的机制。

  • 处理乱序数据要用 watermark+window实现
  • watermark就是window该结束的时间

    • 源码中 watermark = (MAXeventTime - 延迟时间)
    • 举个栗子:max event time为8、延迟为3, 那么watermark就是5。这里的5表示以5为结束时间的窗口要关闭,并开始算子计算。
    • 换种说法:MAXeventTime =(watermark + 延迟时间)

      常用-有序、无序数据流的watermark策略

  • 创建一个策略WatermarkStrategy,指定提取数据流中时间戳的逻辑

  • 把策略传进assignTimestampsAndWatermarks

    1. public class MyWatermark {
    2. public static void main(String[] args) throws Exception {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. DataStreamSource<Event> stream = env.addSource(new ClickSource());
    5. // ***************乱序数据-watermark策略
    6. WatermarkStrategy<Event> noOrderWatermarkStrategy = WatermarkStrategy
    7. .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    8. .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
    9. // 提取event time时间戳的逻辑
    10. @Override
    11. public long extractTimestamp(Event event, long l) {
    12. return event.timestamp;
    13. }
    14. });
    15. // ***************有序数据-watermark策略
    16. WatermarkStrategy<Event> orderWatermarkStrategy = WatermarkStrategy
    17. .<Event>forMonotonousTimestamps()
    18. .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
    19. // 提取event time时间戳的逻辑
    20. @Override
    21. public long extractTimestamp(Event event, long l) {
    22. return event.timestamp;
    23. }
    24. });
    25. SingleOutputStreamOperator<Event> orderStream = stream.assignTimestampsAndWatermarks(orderWatermarkStrategy);
    26. SingleOutputStreamOperator<Event> noOrderStream = stream.assignTimestampsAndWatermarks(noOrderWatermarkStrategy);
    27. env.execute();
    28. }
    29. }

    自定义watermark策略-WatermarkStrategy

  • WatermarkStrategy 有两个方法:createTimestampAssigner提取时间戳、createWatermarkGenerator自定义周期性watermark

    • WatermarkGenerator接口两个方法:onEvent每条数据处理、onPeriodicEmit周期性处理

      自定义-周期性
      1. // 设置延迟5秒的watermark,每200毫秒生成一个watermark
      2. public class MyCustomWatermark {
      3. public static void main(String[] args) throws Exception {
      4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      5. DataStreamSource<Event> stream = env.addSource(new ClickSource());
      6. stream.assignTimestampsAndWatermarks(new MyWatermarkStrategy())
      7. .print();
      8. env.execute();
      9. }
      10. public static class MyWatermarkStrategy implements WatermarkStrategy<Event>{
      11. @Override
      12. public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
      13. return new MyWatermarkGenerator();
      14. }
      15. @Override
      16. public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
      17. return new SerializableTimestampAssigner<Event>() {
      18. @Override
      19. public long extractTimestamp(Event event, long l) {
      20. return event.timestamp;
      21. }
      22. };
      23. }
      24. }
      25. // 每条数据处理一次 取得最大时间,延迟时间设置为5秒 每200ms才真正发射watermark
      26. public static class MyWatermarkGenerator implements WatermarkGenerator<Event> {
      27. private Long delayTime = 5000L; // 延迟时间
      28. private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
      29. // 每条数据调用
      30. @Override
      31. public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
      32. maxTs = Math.max(event.timestamp, maxTs);
      33. }
      34. // 发射水位线,默认200ms调用一次
      35. @Override
      36. public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
      37. watermarkOutput.emitWatermark(new Watermark(maxTs - delayTime - 1L));
      38. }
      39. }
      40. }

      自定义-断点式
  • 不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线

    1. // 断点式watermark生成:只有遇到特定数据才生成
    2. public static class MyWatermarkGenerator2 implements WatermarkGenerator<Event> {
    3. private Long delayTime = 5000L; // 延迟时间
    4. @Override
    5. public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
    6. if ("Mary".equals(event.user)) {
    7. watermarkOutput.emitWatermark(new Watermark(event.timestamp - delayTime - 1L));
    8. }
    9. }
    10. // 发射水位线,默认200ms调用一次
    11. @Override
    12. public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
    13. return;// 什么都不做 因为在event中发射了watermark
    14. }
    15. }

废弃的方法

AssignerWithPeriodicWatermarks

  • 周期性的生成watermark,默认200毫秒,也可手动更改env.getConfig.setAutoWatermarkInterval(5000);

    • 周期性的执行getCurrentWatermark方法

      1. //实现接口、重写两个方法
      2. SingleOutputStreamOperator<Sensorreading> watermarks = inputStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Sensorreading>() {
      3. private Long bound = 60 * 1000L; // 延迟一分钟
      4. private Long maxTs = Long.MIN_VALUE; // 当前最大时间戳
      5. @Nullable
      6. @Override
      7. public Watermark getCurrentWatermark() {
      8. return new Watermark(maxTs - bound);// 生成具有1分钟容忍度的水位线
      9. }
      10. @Override
      11. public long extractTimestamp(Sensorreading element, long previousElementTimestamp) {
      12. maxTs = Math.max(element.getTimestamp(), maxTs);//获取最大时间戳
      13. return element.getTimestamp() ;//返回数据的 eventTime
      14. }
      15. });
  • 这个接口有两个实现类 日常使用的更多

    • AscendingTimestampExtractor:未乱序的数据可以使用
    • BoundedOutOfOrdernessTimestampExtractor: 乱序数据使用 ```java // 升序数据设置事件时间和watermark inputStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() { @Override public long extractAscendingTimestamp(SensorReading element) { return element.getTimestamp() * 1000L; } })

// 乱序数据设置时间戳和watermark 2秒的延迟 inputStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(2)) { @Override public long extractTimestamp(SensorReading element) { return element.getTimestamp() * 1000L;//要求毫秒 } })

  1. <a name="nf5Va"></a>
  2. #### AssignerWithPunctuatedWatermarks
  3. - 特定化的生成watermark,每一个数据都判断是否生成watermark。灵活性更强。缺点:watermark生成的频繁,数据量大的情况下不要用 性能不佳
  4. - 每条数据都执行checkAndGetNextWatermark方法
  5. ```java
  6. //只给sensor_1插入watermark
  7. SingleOutputStreamOperator<Sensorreading> watermarks1 = inputStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Sensorreading>() {
  8. private Long bound = 60 * 1000L; // 延迟一分钟
  9. @Nullable
  10. @Override
  11. public Watermark checkAndGetNextWatermark(Sensorreading lastElement, long extractedTimestamp) {
  12. if (lastElement.getId() == "sensor_1") {
  13. return new Watermark(extractedTimestamp - bound);
  14. } else {
  15. return null;
  16. }
  17. }
  18. @Override
  19. public long extractTimestamp(Sensorreading element, long previousElementTimestamp) {
  20. return element.getTimestamp();//返回数据的 eventTime
  21. }
  22. });

watermark在上下游分区间的传递:广播到下游(barrier也一样)

image.png

  • 下游分区在收集到 上游分区的watermark之后,会选择最小的那个作为本分区的watermark

    处理空数据分区

    如果某一分区在一段时间内未发送数据,则不会生成新的watermark。在上下游传递watermark时取小的情况下 则会取到空数据分区的watermark。 解决方法:
    // 检测空闲输入 标记为空闲状态
    WatermarkStrategy
          .<E>forBoundedOutOfOrderness(Duration.ofSeconds(20))
          .withIdleness(Duration.ofMinutes(1));