时间概念
- Event time:日志中的业务时间
- Ingestion time:进入flink source的时间
Proessing time:执行每一个flink算子的时间
event time的使用
需要提供一个时间戳提取器和 Watermark 生成器
// 已废弃 1.12后默认是eventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
watermark
如果数据流中的数据是乱序的,我们无从知晓本window的所有数据是否都被消费了,又不能无期限的等待。所以要有一种触发结束window的机制。
- 处理乱序数据要用 watermark+window实现
watermark就是window该结束的时间
创建一个策略WatermarkStrategy,指定提取数据流中时间戳的逻辑
把策略传进assignTimestampsAndWatermarks
public class MyWatermark {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Event> stream = env.addSource(new ClickSource());// ***************乱序数据-watermark策略WatermarkStrategy<Event> noOrderWatermarkStrategy = WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {// 提取event time时间戳的逻辑@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}});// ***************有序数据-watermark策略WatermarkStrategy<Event> orderWatermarkStrategy = WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {// 提取event time时间戳的逻辑@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}});SingleOutputStreamOperator<Event> orderStream = stream.assignTimestampsAndWatermarks(orderWatermarkStrategy);SingleOutputStreamOperator<Event> noOrderStream = stream.assignTimestampsAndWatermarks(noOrderWatermarkStrategy);env.execute();}}
自定义watermark策略-WatermarkStrategy
WatermarkStrategy 有两个方法:createTimestampAssigner提取时间戳、createWatermarkGenerator自定义周期性watermark
WatermarkGenerator接口两个方法:onEvent每条数据处理、onPeriodicEmit周期性处理
自定义-周期性
// 设置延迟5秒的watermark,每200毫秒生成一个watermarkpublic class MyCustomWatermark {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Event> stream = env.addSource(new ClickSource());stream.assignTimestampsAndWatermarks(new MyWatermarkStrategy()).print();env.execute();}public static class MyWatermarkStrategy implements WatermarkStrategy<Event>{@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new MyWatermarkGenerator();}@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}};}}// 每条数据处理一次 取得最大时间,延迟时间设置为5秒 每200ms才真正发射watermarkpublic static class MyWatermarkGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳// 每条数据调用@Overridepublic void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {maxTs = Math.max(event.timestamp, maxTs);}// 发射水位线,默认200ms调用一次@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {watermarkOutput.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}}
自定义-断点式
不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线
// 断点式watermark生成:只有遇到特定数据才生成public static class MyWatermarkGenerator2 implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间@Overridepublic void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {if ("Mary".equals(event.user)) {watermarkOutput.emitWatermark(new Watermark(event.timestamp - delayTime - 1L));}}// 发射水位线,默认200ms调用一次@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {return;// 什么都不做 因为在event中发射了watermark}}
废弃的方法
AssignerWithPeriodicWatermarks
周期性的生成watermark,默认200毫秒,也可手动更改env.getConfig.setAutoWatermarkInterval(5000);
周期性的执行getCurrentWatermark方法
//实现接口、重写两个方法SingleOutputStreamOperator<Sensorreading> watermarks = inputStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Sensorreading>() {private Long bound = 60 * 1000L; // 延迟一分钟private Long maxTs = Long.MIN_VALUE; // 当前最大时间戳@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(maxTs - bound);// 生成具有1分钟容忍度的水位线}@Overridepublic long extractTimestamp(Sensorreading element, long previousElementTimestamp) {maxTs = Math.max(element.getTimestamp(), maxTs);//获取最大时间戳return element.getTimestamp() ;//返回数据的 eventTime}});
这个接口有两个实现类 日常使用的更多
- AscendingTimestampExtractor:未乱序的数据可以使用
- BoundedOutOfOrdernessTimestampExtractor: 乱序数据使用
```java
// 升序数据设置事件时间和watermark
inputStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor
() { @Override public long extractAscendingTimestamp(SensorReading element) { return element.getTimestamp() * 1000L; } })
// 乱序数据设置时间戳和watermark 2秒的延迟
inputStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor
<a name="nf5Va"></a>#### AssignerWithPunctuatedWatermarks- 特定化的生成watermark,每一个数据都判断是否生成watermark。灵活性更强。缺点:watermark生成的频繁,数据量大的情况下不要用 性能不佳- 每条数据都执行checkAndGetNextWatermark方法```java//只给sensor_1插入watermarkSingleOutputStreamOperator<Sensorreading> watermarks1 = inputStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Sensorreading>() {private Long bound = 60 * 1000L; // 延迟一分钟@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Sensorreading lastElement, long extractedTimestamp) {if (lastElement.getId() == "sensor_1") {return new Watermark(extractedTimestamp - bound);} else {return null;}}@Overridepublic long extractTimestamp(Sensorreading element, long previousElementTimestamp) {return element.getTimestamp();//返回数据的 eventTime}});
watermark在上下游分区间的传递:广播到下游(barrier也一样)

