7.1 Flink中的时间语义

image.png

  • Event Time:事件实际发生的时间
  • Ingestion Time:数据进入Flink的时间
  • Processing Time:执行操作算子的本地系统时间,与机器相关

    7.1.1 哪种时间语义更重要

  • 不同的语义有不同的应用场合

  • 我们往往更关心事件时间(Event Time)
  • 事件时间需要作为事件数据的一部分,由数据源自行设置并传递给MQ;摄入时间和处理时间则不需要。

7.1 设置EventTime

不设置的话,默认是ProcessingTime(处理时间)。这意味着默认情况下时间窗口计时是以本地系统时间为依据。比如设置滚动窗口15S,则需要以本地系统时间为准,等待15S,不管具体接收到的事件的发生事件是什么时候。
如果想修改为Event Time(事件时间),首先要确保事件数据中有事件时间字段,然后:

  1. 设置全局时间特性。
  2. 为数据流设置时间字段和watermark。 ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  1. ---
  2. <a name="dIajo"></a>
  3. # 7.3 水位线(watermark)
  4. <a name="QMuyo"></a>
  5. ## 7.3.1 乱序数据的影响
  6. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/388635/1624329812810-fed2413e-b373-46d3-b807-9f41f787fded.png#clientId=u3904a12e-e239-4&from=paste&height=230&id=u4331234c&margin=%5Bobject%20Object%5D&name=image.png&originHeight=714&originWidth=1046&originalType=binary&ratio=2&size=238678&status=done&style=none&taskId=ub1a45cee-5b8e-498b-ad31-78106bfb50c&width=337)
  7. - FlinkEvent Time模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。
  8. - 由于网络、分布式等原因,会导致乱序数据的产生。
  9. <a name="zIlzG"></a>
  10. ## 7.3.2 水位线(watermark)
  11. - 怎样避免乱序数据带来的计算不正确?
  12. - 遇到一个时间戳达到了窗口关闭的时间,不应该like触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口。
  13. - watermark是一种衡量Event Time进展的机制,可以设定延迟触发。
  14. - watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
  15. - 数据流中的watermark用于表示timestamp小于该watermark的数据都已经到达了,因此,window的执行也是由watermark触发的。
  16. - watermark用来让程序自己平衡延迟和结果的正确性。watermark越大,延迟越大,结果越正确。watermark越小,延迟越小,结果越不准确。
  17. > watermarkallowedLatenesssideOutputLateDataFlink处理乱序数据的三驾马车。首先,只有在和时间窗口打交道的情况下,才会有乱序的概念。另外,只有当时间语义是“事件时间”时(摄入时间也有可能,但是不常用),才可能乱序。而这三者的功能分别是:
  18. > - watermark:延迟窗口关闭、触发计算的时机。
  19. > - allowedLateness:在窗口已经关闭、计算已触发之后,在一定时间内,允许接收迟到数据到本窗口,并再次触发计算。
  20. > - sideOutputLateData:对于过于延迟的数据,则放入旁路数据流中,单独处理。
  21. >
  22. > 打个比方,一辆火车计划9:00发车,但买了火车票的乘客可能会迟到,不过大部分只迟到5分钟以内,少部分迟到15分钟,极个别可能迟到更长时间。火车会采用以下三种机制来解决问题:
  23. > - watermark:把站台的时钟调慢5分钟。这样火车会延迟发车5分钟,即实际9:05发车,在此之前到来的乘客都能上车。这样大部分迟到乘客可以上车。
  24. > - allowedLateness:发车,但不关车门。火车9:05发车,但是直到9:15都不关车门,此时到来的乘客,站台可以将他们“弹射”到火车上。
  25. > - sideOutputLateData:乘坐其他火车。如果乘客9:15后才来,站台会安排他们做另外一班火车,会有专人在目的地接待这些乘客。
  26. <a name="sxdpX"></a>
  27. ## 7.3.3 watermark的特点
  28. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/388635/1624354640271-9ed9ca3f-fd95-451a-913a-10d6572a2315.png#clientId=u807e90ce-d2d9-4&from=paste&height=115&id=u91fc6581&margin=%5Bobject%20Object%5D&name=image.png&originHeight=344&originWidth=1390&originalType=binary&ratio=2&size=331852&status=done&style=none&taskId=uc97016f6-820c-489f-beea-2a14b17f82b&width=465)
  29. ```java
  30. // Flink中Watermark的定义,省略了不相关方法。
  31. public final class Watermark extends StreamElement {
  32. // 只有一个名为时间戳的字段
  33. private final long timestamp;
  34. }
  • watermark是数据流中的一条特殊的数据记录,它只有一个名为时间戳的字段。
  • watermark与数据的时间戳相关,即watermark基于数据的时间戳而产生。
  • watermark必须单调递增,以确保任务的事件时间始终在向前推进,而不是在后退。
  • watermark是一个代表时间推进的特殊事件,各算子会根据该事件来触发与事件相关的各种操作,典型的就是时间窗口操作。

7.4 watermark的传递、引入和设定

7.4.1 watermark的传递

image.png

  • 一个子任务需要像下游广播自己的watermark,以告知下游自己认为在此之前的数据都到了。
  • 一个子任务可能会不定时地从多个上游接收到watermark,此时会动态地选取已接收到的watermark中的最小值,作为自己的watermark,且在更新时广播给下游。

    7.4.2 watermark的引入

    watermark是在DataStream上调用assignTimestampsAndWatermarks方法来设置的,需要同时设置时间戳提取方式和watermark生成方式。该方法有两种重载,接收不同类型的参数:

  • AssignerWithPeriodicWatermarks接口:周期性地生成watermark,默认200ms一个周期。

  • AssignerWithPunctuatedWatermarks接口:依据数据流中元素特征来触发生成watermark。

可以自行实现这两个接口,来自定义watermark的生成时机和值。不过Flink已经为我们封装好了一些实现类,可以 直接使用。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置时间语义
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    // 获取Socket文本流
    DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);

    // 转换成SensorReading
    SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
        String[] fields = line.split(",");
        return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    })
        // 指定时间戳和watermark
        .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.milliseconds(500) /* 指定最大乱序程度,用以计算watermark */) {
            // 指定提取时间戳(ms)的方法
            @Override
            public long extractTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000;
            }
        }
    );


    SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
        String[] fields = line.split(",");
        return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
    })
        // 如果确定数据不会乱序,则可以只指定时间戳
        .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
            @Override
            public long extractAscendingTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000;
            }
        });

    env.execute();
}

7.4.3 watermark的设定

  • 在Flink中,watermark由应用程序开发人员生成,这通常需要对相对应的领域有一定了解(数据的量级、乱序程度,应用场景对延迟和正确性的要求)。

7.5 实战

7.5.1 窗口起始点和偏移量

Flink中,窗口是一个个的左闭右开的区间,这些窗口构成一个集合。第一个区间左边界是0,而所有窗口的右边边界都是窗口尺寸的整数倍。假设窗口尺寸是15s,那么默认窗口集合为:

  • [0, 15)
  • [15, 30)
  • [30, 45)
  • [45, 60)
  • ……

窗口集合是在程序启动时就确定了的,运行时只需要判断数据属于哪个窗口。
如果窗口默认对齐0的行为不符合预期,则可以通过offset窗口来调整窗口的偏移量。典型的应用场景:用offset来处理时差问题。比如事件原始事件是UTC时间,而我们想计算按照北京时间计算的每天聚合值,则应该给-8小时的偏移量。