- 1. 全量和增量的区别
- 2. apply和process的区别
- reduce和aggregate的区别">3. reduce和aggregate的区别
- 二、AggregateFunction和ProcessWindowFunction结合使用
https://github.com/wangzhiwubigdata/God-Of-BigData
https://blog.csdn.net/winterking3/article/details/115352463
一、窗口函数的分类
大多数Flink应用都是要划分窗口的,如果不划分窗口,那就得计算流中所有的数据的结果(很少有这样的需求)。
本篇的重点是讲窗口函数,即数据划分窗口后可以调用的处理函数。
1. 全量和增量的区别
从上图中看出,窗口函数主要分全量函数和增量函数这2大类。
- 全量函数:窗口先缓存所有元素,等到触发条件后对窗口内的全量元素执行计算。攒一批数据计算
增量函数:窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据。
2. apply和process的区别
apply和process都是处理全量计算,但工作中正常用process。
- process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。
Process方法能够是使用ProcessFunction 结尾的类;
- ProcessFunction:dataStream
- KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理
- CoProcessFunction:用于connect连接的流
- ProcessJoinFunction:用于join流操作
- BroadcastProcessFunction:用于广播
- KeyedBroadcastProcessFunction:keyBy之后的广播
- ProcessWindowFunction:窗口增量聚合
- ProcessAllWindowFunction:全窗口聚合
WindowFunction和ProcessWindowFunction 区别在于你使用apply还是process方法;
ProcessWindowFunction和KeyedProcessFunction的区别在于前者是全量计算,后者是增量计算;
- 可将ProcessWindowFunction与增量聚合函数ReduceFunction、AggregateFunction结合。
- 元素到达窗口时增量聚合,当窗口关闭时对增量聚合的结果用ProcessWindowFunction再进行全量聚合。
- 既可以增量聚合,也可以访问窗口的元数据信息(如开始结束时间、状态等)。
3. reduce和aggregate的区别
- reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个
,压缩 - maxBy、minBy、sum这3个底层都是由reduce实现的
- aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有
二、AggregateFunction和ProcessWindowFunction结合使用
在reduce和aggregate中,都有一个可以把增量函数和全量函数结合使用的方法,就是上面图中标红色五角星的。
对于一个窗口来说,Flink先增量计算,窗口关闭前,将增量计算结果发送给ProcessWindowFunction作为输入再进行处理。
下面通过案例把增量计算和全量计算讲一下。
1. 需求背景
- 有一些城市的气温数据,如下面所示
想每隔30秒,获得该城市的这30秒内的温度最大值、最小值、平均值 :::info 江苏,苏州,1,20.5,2021-01-29 16:00:00
江苏,盐城,1,24.5,2021-01-29 16:00:20
江苏,盐城,1,28.5,2021-01-29 16:00:30
江苏,盐城,1,23.5,2021-01-29 16:00:40
江苏,苏州,1,21.5,2021-01-29 16:00:10
江苏,苏州,1,22.5,2021-01-29 16:00:20
江苏,苏州,1,24.5,2021-01-29 16:00:30
江苏,苏州,1,23.5,2021-01-29 16:00:40
江苏,苏州,1,22.5,2021-01-29 16:00:50
//太多了,省略 :::2. 分析
求温度的最大值、最小值、平均值,这样明显是聚合计算,适合用AggregateFunction
每隔30秒获取数据,就是30秒后窗口关闭时,获取窗口的信息(开始结束时间),并加上AggregateFunction的结果,这个适合用ProcessWindowFunction
所以得用下面AggregateFunction和ProcessWindowFunction结合的aggregate函数 :::info publicSingleOutputStreamOperator aggregate(
AggregateFunctionaggFunction,
ProcessWindowFunctionwindowFunction) {
} :::3. 程序主体
```java /**
- 该程序测试window的reduce功能
- reduce是每输入一个数据,触发一次计算
具体实现求得一个window数据中的max、min、sum、count、avg */ public class Test04_AggregateAndProcessFunction { public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.readTextFile(BaseConstant.TEMP_RECORD);
SingleOutputStreamOperator<TempRecord> dataStream = source
.flatMap(new TempRecordUtils.BeanFlatMap())
.assignTimestampsAndWatermarks(
WatermarkStrategy.<TempRecord>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<TempRecord>() {
@Override
public long extractTimestamp(TempRecord element, long recordTimestamp) {
return element.getTimeEpochMilli();
}
})
);
SingleOutputStreamOperator<TempRecordAggsResult> result = dataStream
.keyBy(TempRecord::getCity)
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.aggregate(new TempRecordUtils.MyAggregateFunction(),//增量计算
new TempRecordUtils.MyProcessWindow());//全量计算
result.print();
env.execute();
} }
/**
- aggregate的增量函数类 *
聚合函数,每来一个数据,就会执行聚合操作 */ public static class MyAggregateFunction implements AggregateFunction<
TempRecord,
TempRecordAggsResult,
TempRecordAggsResult> {
@Override public TempRecordAggsResult createAccumulator() {
return TempRecordAggsResult.getInitResult();
}
/**
- 每进入一个数据就会执行一次
- @param value 当前进入的数据
- @param accumulator 之前计算好的中间结果
@return */ @Override public TempRecordAggsResult add(TempRecord value, TempRecordAggsResult accumulator) { accumulator.setKey(value.getProvince() + “,” + value.getCity()); accumulator.setMax(value.getTemp() > accumulator.getMax() ? value.getTemp() : accumulator.getMax()); accumulator.setMin(value.getTemp() < accumulator.getMin() ? value.getTemp() : accumulator.getMin()); accumulator.setSum(value.getTemp() + accumulator.getSum()); accumulator.setCounts(accumulator.getCounts() + 1); accumulator.setAvg(accumulator.getSum() / accumulator.getCounts()); return accumulator; }
/ 当window的结束时间到达时,触发这个方法,返回结果 / @Override public TempRecordAggsResult getResult(TempRecordAggsResult accumulator) { //System.out.println(“getResult :” + accumulator.toString()); return accumulator; }
/**
- 在session窗口才会用到merge,时间窗口其实用不得
- @param a
- @param b
- @return */ @Override public TempRecordAggsResult merge(TempRecordAggsResult a, TempRecordAggsResult b) { a.setMax(a.getMax() > b.getMax() ? a.getMax() : b.getMax()); a.setMin(a.getMin() < b.getMin() ? a.getMin() : b.getMin()); a.setSum(a.getSum() + b.getSum()); a.setCounts(a.getCounts() + b.getCounts()); a.setAvg(a.getSum() / a.getCounts()); return a; } }
// aggregate 的 全量计算类 public static class MyProcessWindow extends ProcessWindowFunction< TempRecordAggsResult, TempRecordAggsResult, String, TimeWindow> {
@Override
public void process(String s, Context context, Iterable<TempRecordAggsResult> elements, Collector<TempRecordAggsResult> out) throws Exception {
long windowStartTs = context.window().getStart();
long windowEndTs = context.window().getEnd();
if (elements.iterator().hasNext()) {
TempRecordAggsResult result = elements.iterator().next();
System.out.println("result:" + result.toString());
result.setBeginTime(
LocalDateTime.ofInstant(
Instant.ofEpochMilli(windowStartTs), ZoneId.systemDefault()
)
);
result.setEndTime(
LocalDateTime.ofInstant(
Instant.ofEpochMilli(windowEndTs), ZoneId.systemDefault()
)
);
out.collect(result);
}
}
}
```