1. 时间操作

1.1 设置时间属性

如果我们想要在分布式流处理应用程序中定义有关时间的操作,彻底理解时间的语义是非常重要的。当我们指定了一个窗口去收集某1分钟内的数据时,这个长度为1分钟的桶中,到底应该包含哪些数据?在DataStream API中,我们将使用时间属性来告诉Flink:当我们创建窗口时,我们如何定义时间。时间属性是StreamExecutionEnvironment的一个属性,有以下值:

ProcessingTime

机器时间在分布式系统中又叫做“墙上时钟”。
当操作符执行时,此操作符看到的时间是操作符所在机器的机器时间。Processing-time window的触发取决于机器时间,窗口包含的元素也是那个机器时间段内到达的元素。通常情况下,窗口操作符使用processing time会导致不确定的结果,因为基于机器时间的窗口中收集的元素取决于元素到达的速度快慢。使用processing time会为程序提供极低的延迟,因为无需等待水位线的到达。
如果要追求极限的低延迟,请使用processing time。

EventTime

当操作符执行时,操作符看的当前时间是由流中元素所携带的信息决定的。流中的每一个元素都必须包含时间戳信息。而系统的逻辑时钟由水位线(Watermark)定义。时间戳要么在事件进入流处理程序之前已经存在,要么就需要在程序的数据源(source)处进行分配。当水位线宣布特定时间段的数据都已经到达,事件时间窗口将会被触发计算。即使数据到达的顺序是乱序的,事件时间窗口的计算结果也将是确定性的。窗口的计算结果并不取决于元素到达的快与慢。
当水位线超过事件时间窗口的结束时间时,窗口将会闭合,不再接收数据,并触发计算。

IngestionTime

当事件进入source操作符时,source操作符所在机器的机器时间,就是此事件的“摄入时间”(IngestionTime),并同时产生水位线。IngestionTime相当于EventTime和ProcessingTime的混合体。一个事件的IngestionTime其实就是它进入流处理器中的时间。
IngestionTime没什么价值,既有EventTime的执行效率(比较低),有没有EventTime计算结果的准确性。

下面的例子展示了如何设置事件时间。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  3. DataStream sensorData = env.addSource(...);
  4. 如果要使用processing time,将TimeCharacteristic.EventTime替换为TimeCharacteristic.ProcessingTIme就可以了。

1.2 指定时间戳和产生水位线

如果使用事件时间,那么流中的事件必须包含这个事件真正发生的时间。使用了事件时间的流必须携带水位线。
时间戳和水位线的单位是毫秒,记时从1970-01-01T00:00:00Z开始。到达某个操作符的水位线就会告知这个操作符:小于等于水位线中携带的时间戳的事件都已经到达这个操作符了。时间戳和水位线可以由SourceFunction产生,或者由用户自定义的时间戳分配器和水位线产生器来生成。
Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。一般来说,时间戳分配器需要在source操作符后马上进行调用。
因为时间戳分配器看到的元素的顺序应该和source操作符产生数据的顺序是一样的,否则就乱了。这就是为什么我们经常将source操作符的并行度设置为1的原因。也就是说,任何分区操作都会将元素的顺序打乱,例如:并行度改变,keyBy()操作等等。
所以最佳实践是:在尽量接近数据源source操作符的地方分配时间戳和产生水位线,甚至最好在SourceFunction中分配时间戳和产生水位线。当然在分配时间戳和产生水位线之前可以对流进行map和filter操作是没问题的,也就是说必须是窄依赖。

以下这种写法是可以的。

  1. DataStream stream = env
  2. .addSource(...)
  3. .map(...)
  4. .filter(...)
  5. .assignTimestampsAndWatermarks(...)

1.3 周期性的生成水位线

周期性的生成水位线:系统会周期性的将水位线插入到流中(水位线也是一种特殊的事件!)。默认周期是200毫秒,也就是说,系统会每隔200毫秒就往流中插入一次水位线。
这里的200毫秒是机器时间!
可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。

  1. env.getConfig().setAutoWatermarkInterval(5000);

上面的例子产生水位线的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks中的getCurrentWatermark()方法。如果方法返回的时间戳大于之前水位线的时间戳,新的水位线会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位线的时间戳,则不会产生新的水位线。

例子,自定义一个周期性的时间戳抽取

  1. //周期性 发射watermark
  2. SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = dataStream.assignTimestampsAndWatermarks(
  3. // 最大延迟时间 3s
  4. WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  5. .withTimestampAssigner((event, timestamp) -> event.f1));

如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序。我们可以使用WatermarkStrategy.forMonotonousTimestamps();,方法会直接使用数据的时间戳生成水位线。

  1. dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

思考题一:实时程序,要求实时性非常高,并且结果并不一定要求非常准确,那么应该怎么办?

回答:直接使用处理时间。

思考题二:如果要进行时间旅行,也就是要还原以前的数据集当时的流的状态,应该怎么办?

回答:使用事件时间。