Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的 日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事 件时间戳。
Ingestion Time:是数据进入 Flink 的时间。
Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器 相关,默认的时间属性就是 Processing Time。
往往关注事件时间Event Time,Event Time可以从日志数据的时间戳(timestamp)中提取
代码中设置Event Time
val env = StreamExecutionEnvironment.getExecutionEnvironment//从调用时刻开始给env创建的每一个stream追加事件特征env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
watermark基本概念
https://www.cnblogs.com/rossiXYZ/p/12286407.html
翻译成水位线/水印都可

watermark是用于处理乱序事件或延迟数据的,这通常用watermark机制结合window来实现(Watermarks用来触发window窗口计算)。水位线/(watermark)本质就是一个时间戳,时间戳反映的是事件发生时间,而不是事件处理时间。
可以理解为:收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水位线。可以把Watermark看作是一种告诉Flink一个消息延迟多少的方式。定义了什么时候不再等待更早的数据。
- watermark必须单调递增(可以不增,但不能减少)
- watermark与数据的时间戳有关
上图的watermark理解:当watermark为2时,当前的event time是2,即2秒之前的数据已经到齐;当watermark为5时,当前的event time是5,即5秒之前的数据已经到齐

如何定义watermark?(针对乱序数据)
简单的方式:取之前当前到达数据里面时间戳的最大值(这就保证了watermark是单调递增的,起码不会减少)减去一个允许延迟的时间
即 watermark时间 = 当前到达数据里面时间戳的最大值 - 允许延迟的时间
如何设置允许延迟的时间?
找乱序程度最大数据的时间戳的差值。如上图中乱序数据的时间戳为:1、4、5、2、3、6,(正常数据为:1、2、3、4、5、6)发现时间戳为2的数据是乱序数据,应该发生在时间戳为5的数据之前,他两之间的差值为(5-2=3),3也是乱序数据的时间戳,与5之间的差值为(5-3=2),那么设置延迟时间为3秒
例子讲解:
上面谈到,数据乱序问题的处理机制是watermark+window,假设当前有两个时间窗口window:0-5秒,5-10秒(时间窗口左闭右开,即[0,5)),上面计算出延迟时间为3秒
Window可以将数据流分为固定大小的”桶(buckets)”(即通过按照固定时间或长度将数据流切分成不同的窗口)

- 当时间戳为1的数据到达,按时间戳判断,将时间戳为1的数据丢入桶1中,此时watermark时间=1-3=-2(负数先不考虑)
- 时间戳为4的数据到达时,丢入桶1中,此时当前到达数据里面时间戳的最大值为4,此时watermark=4-3=1,表示事件时间为1,即时间戳为1之前的数据都到齐了(如果有1秒中的窗口,可以关闭了,该例子无1秒1的窗口,不需要做操作)

- 时间戳为5的数据到达时,丢入桶2中,此时当前到达数据里面时间戳的最大值为5,此时watermark=5-3=2,表示2秒之前的数据都到齐了

- 时间戳为2的数据到达时,丢入桶1中,此时当前到达数据里面时间戳的最大值为仍为5,watermark不变,仍为2

- 时间戳为3的数据到达后,丢入桶1中,此时watermark仍不变
- 时间戳为6的数据到达后,丢入桶2中,此时当前到达数据里面时间戳的最大值为6,watermark=6-3=3,此时所定义的5秒窗口仍未关闭(watermark最大值为3,没有达到5秒的窗口)

- 若后续有一个时间戳为8的数据到达,此时当前到达数据里面时间戳的最大值为8,watermark=8-3=5,表示时间戳为5秒前的数据都到齐了,此时需要关闭5秒的窗口(即桶1),桶1中的所有数据统计输出

- 若后续还有个时间戳为4的数据到达,因为桶1已经关闭,此时时间戳为4的数据丢掉(因为4与8是后来加的数据,导致最大乱序程度不一样,进而导致watermark不一样)
watermark的传递
窗口起始点的确定
源码中
/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
offset默认为0,timestamp为当前数据的时间戳,加一个windowSize然后对windowSize取余,对整体没有影响(例如:windowsize为5时,(4+5)%5=4,仍然是原来的4),(timestamp - offset + windowSize) % windowSize计算结果为timestamp的余数,timestamp - (timestamp - offset + windowSize) % windowSize,意思是timestamp减掉自身的余数,那么计算为timestamp的整数倍。
例如:timestamp默认是毫秒时间戳(如1547718199000),windowsize此时为15秒(为15000)。(timestamp - offset + windowSize) % windowSize = (1547718199000-0+15000)%1547718199000 = 4000,表示4秒(13位时间戳,最后三位表示秒)。那么时间戳为1547718199000的数据,第一个窗口长度为[195,210),(1547718199000-0000000004000=1547718195000,这里为了方便显示秒数为195,那么终点为195+15=210,15为windowsize设置的15秒),至此时间窗口的起始点确认
