https://github.com/wangzhiwubigdata/God-Of-BigData
https://blog.csdn.net/winterking3/article/details/115352463
一、窗口函数的分类
大多数Flink应用都是要划分窗口的,如果不划分窗口,那就得计算流中所有的数据的结果(很少有这样的需求)。
本篇的重点是讲窗口函数,即数据划分窗口后可以调用的处理函数。
image.png

1. 全量和增量的区别

从上图中看出,窗口函数主要分全量函数和增量函数这2大类。

  • 全量函数:窗口先缓存所有元素,等到触发条件后对窗口内的全量元素执行计算。攒一批数据计算
  • 增量函数:窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据。

    2. apply和process的区别

    image.png

  • apply和process都是处理全量计算,但工作中正常用process。

  • process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。

Process方法能够是使用ProcessFunction 结尾的类;

  • ProcessFunction:dataStream
  • KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理
  • CoProcessFunction:用于connect连接的流
  • ProcessJoinFunction:用于join流操作
  • BroadcastProcessFunction:用于广播
  • KeyedBroadcastProcessFunction:keyBy之后的广播
  • ProcessWindowFunction:窗口增量聚合
  • ProcessAllWindowFunction:全窗口聚合

WindowFunction和ProcessWindowFunction 区别在于你使用apply还是process方法;
ProcessWindowFunction和KeyedProcessFunction的区别在于前者是全量计算,后者是增量计算;

  1. 可将ProcessWindowFunction与增量聚合函数ReduceFunction、AggregateFunction结合。
  2. 元素到达窗口时增量聚合,当窗口关闭时对增量聚合的结果用ProcessWindowFunction再进行全量聚合。
  3. 既可以增量聚合,也可以访问窗口的元数据信息(如开始结束时间、状态等)。

3. reduce和aggregate的区别

image.png

  • reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个 ,压缩
  • maxBy、minBy、sum这3个底层都是由reduce实现的
  • aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有

二、AggregateFunction和ProcessWindowFunction结合使用

在reduce和aggregate中,都有一个可以把增量函数和全量函数结合使用的方法,就是上面图中标红色五角星的。
对于一个窗口来说,Flink先增量计算,窗口关闭前,将增量计算结果发送给ProcessWindowFunction作为输入再进行处理。
下面通过案例把增量计算和全量计算讲一下。

1. 需求背景

  • 有一些城市的气温数据,如下面所示
  • 想每隔30秒,获得该城市的这30秒内的温度最大值、最小值、平均值 :::info 江苏,苏州,1,20.5,2021-01-29 16:00:00
    江苏,盐城,1,24.5,2021-01-29 16:00:20
    江苏,盐城,1,28.5,2021-01-29 16:00:30
    江苏,盐城,1,23.5,2021-01-29 16:00:40
    江苏,苏州,1,21.5,2021-01-29 16:00:10
    江苏,苏州,1,22.5,2021-01-29 16:00:20
    江苏,苏州,1,24.5,2021-01-29 16:00:30
    江苏,苏州,1,23.5,2021-01-29 16:00:40
    江苏,苏州,1,22.5,2021-01-29 16:00:50
    //太多了,省略 :::

    2. 分析

    求温度的最大值、最小值、平均值,这样明显是聚合计算,适合用AggregateFunction
    每隔30秒获取数据,就是30秒后窗口关闭时,获取窗口的信息(开始结束时间),并加上AggregateFunction的结果,这个适合用ProcessWindowFunction
    所以得用下面AggregateFunction和ProcessWindowFunction结合的aggregate函数 :::info public SingleOutputStreamOperator aggregate(
    AggregateFunction aggFunction,
    ProcessWindowFunction windowFunction) {
    } :::

    3. 程序主体

    ```java /**

    • 该程序测试window的reduce功能
    • reduce是每输入一个数据,触发一次计算
    • 具体实现求得一个window数据中的max、min、sum、count、avg */ public class Test04_AggregateAndProcessFunction { public static void main(String[] args) throws Exception {

      1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      2. env.setParallelism(1);
      3. DataStreamSource<String> source = env.readTextFile(BaseConstant.TEMP_RECORD);
      4. SingleOutputStreamOperator<TempRecord> dataStream = source
      5. .flatMap(new TempRecordUtils.BeanFlatMap())
      6. .assignTimestampsAndWatermarks(
      7. WatermarkStrategy.<TempRecord>forBoundedOutOfOrderness(Duration.ofSeconds(0))
      8. .withTimestampAssigner(new SerializableTimestampAssigner<TempRecord>() {
      9. @Override
      10. public long extractTimestamp(TempRecord element, long recordTimestamp) {
      11. return element.getTimeEpochMilli();
      12. }
      13. })
      14. );
      15. SingleOutputStreamOperator<TempRecordAggsResult> result = dataStream
      16. .keyBy(TempRecord::getCity)
      17. .window(TumblingEventTimeWindows.of(Time.seconds(30)))
      18. .aggregate(new TempRecordUtils.MyAggregateFunction(),//增量计算
      19. new TempRecordUtils.MyProcessWindow());//全量计算
      20. result.print();
      21. env.execute();

      } }

/**

  • aggregate的增量函数类 *
  • 聚合函数,每来一个数据,就会执行聚合操作 */ public static class MyAggregateFunction implements AggregateFunction<

    1. TempRecord,
    2. TempRecordAggsResult,
    3. TempRecordAggsResult> {

    @Override public TempRecordAggsResult createAccumulator() {

    1. return TempRecordAggsResult.getInitResult();

    }

    /**

    • 每进入一个数据就会执行一次
    • @param value 当前进入的数据
    • @param accumulator 之前计算好的中间结果
    • @return */ @Override public TempRecordAggsResult add(TempRecord value, TempRecordAggsResult accumulator) { accumulator.setKey(value.getProvince() + “,” + value.getCity()); accumulator.setMax(value.getTemp() > accumulator.getMax() ? value.getTemp() : accumulator.getMax()); accumulator.setMin(value.getTemp() < accumulator.getMin() ? value.getTemp() : accumulator.getMin()); accumulator.setSum(value.getTemp() + accumulator.getSum()); accumulator.setCounts(accumulator.getCounts() + 1); accumulator.setAvg(accumulator.getSum() / accumulator.getCounts()); return accumulator; }

      / 当window的结束时间到达时,触发这个方法,返回结果 / @Override public TempRecordAggsResult getResult(TempRecordAggsResult accumulator) { //System.out.println(“getResult :” + accumulator.toString()); return accumulator; }

      /**

    • 在session窗口才会用到merge,时间窗口其实用不得
    • @param a
    • @param b
    • @return */ @Override public TempRecordAggsResult merge(TempRecordAggsResult a, TempRecordAggsResult b) { a.setMax(a.getMax() > b.getMax() ? a.getMax() : b.getMax()); a.setMin(a.getMin() < b.getMin() ? a.getMin() : b.getMin()); a.setSum(a.getSum() + b.getSum()); a.setCounts(a.getCounts() + b.getCounts()); a.setAvg(a.getSum() / a.getCounts()); return a; } }

// aggregate 的 全量计算类 public static class MyProcessWindow extends ProcessWindowFunction< TempRecordAggsResult, TempRecordAggsResult, String, TimeWindow> {

  1. @Override
  2. public void process(String s, Context context, Iterable<TempRecordAggsResult> elements, Collector<TempRecordAggsResult> out) throws Exception {
  3. long windowStartTs = context.window().getStart();
  4. long windowEndTs = context.window().getEnd();
  5. if (elements.iterator().hasNext()) {
  6. TempRecordAggsResult result = elements.iterator().next();
  7. System.out.println("result:" + result.toString());
  8. result.setBeginTime(
  9. LocalDateTime.ofInstant(
  10. Instant.ofEpochMilli(windowStartTs), ZoneId.systemDefault()
  11. )
  12. );
  13. result.setEndTime(
  14. LocalDateTime.ofInstant(
  15. Instant.ofEpochMilli(windowEndTs), ZoneId.systemDefault()
  16. )
  17. );
  18. out.collect(result);
  19. }
  20. }

}

```