https://zhuanlan.zhihu.com/p/151781508
https://zhuanlan.zhihu.com/p/360321364
flink的核心亮点:

  1. 窗口
  2. 时间语义
  3. 精准一次性

但是我们在之前都是以事件为驱动,等于说是来了一条数据,我就处理一次,但是现在遇到的问题是:
我们可以简单的把wordCount的需求比做公司的订单金额,也就是订单金额会随着订单的增加而只增不减,那么如果运营部门提了以下需求:

  1. 每有1000条订单就输出一次这1000条订单的总金额
  2. 每5分钟输出一次刚刚过去这5分钟的订单总金额
  3. 每3秒输出一次最近5分钟内的累计成交额
  4. 连续2条订单的间隔时间超过30秒就按照这个时间分为2组订单,输出前一组订单的总金额

那么面对这个需求,因为时间一直是流动的,大家有什么想法?
基于这些需求,我们来讲一下flink的窗口。

窗口

窗口:无论是hive中的开窗函数,还是Spark中的批次计算中的窗口,还是我们这里讲的窗口,「本质上都是对数据进行划分,然后对划分后的数据进行计算。」
那么Windows是处理无限流的核心。Windows将流分成有限大小的“存储桶”,我们可以在其上应用计算。
在flink中,窗口式Flink程序一般有2类

  1. 键控流

    1. stream
    2. .keyBy(...) <- keyed versus non-keyed windows
    3. .window(...) <- required: "assigner"
    4. [.trigger(...)] <- optional: "trigger" (else default trigger)
    5. [.evictor(...)] <- optional: "evictor" (else no evictor)
    6. [.allowedLateness(...)] <- optional: "lateness" (else zero)
    7. [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
    8. .reduce/aggregate/fold/apply() <- required: "function"
    9. [.getSideOutput(...)] <- optional: "output tag"
  2. 非键控流

    1. stream
    2. .windowAll(...) <- required: "assigner"
    3. [.trigger(...)] <- optional: "trigger" (else default trigger)
    4. [.evictor(...)] <- optional: "evictor" (else no evictor)
    5. [.allowedLateness(...)] <- optional: "lateness" (else zero)
    6. [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
    7. .reduce/aggregate/fold/apply() <- required: "function"
    8. [.getSideOutput(...)] <- optional: "output tag"

    「唯一的区别是:对键控流的keyBy(…)调用window(…),而非键控流则是调用windowAll(…)。」

窗口的生命周期

我们上面说窗口就是对数据进行划分到不同的“桶”中,然后进行计算,那么什么开始有这个桶,什么时候就算是分完了呢? 简而言之,一旦应属于该窗口的第一个元素到达,就会创建一个窗口,当时间超过用户设置的时间戳时,flink将删除这个窗口。
那我们来理解一下窗口的类型:

  1. CountWindow:按照指定的数据条数生成一个Window,与时间无关。
  2. TimeWindow:按照时间生成Window。 1. 滚动窗口 2. 滑动窗口 3. 会话窗口

从文字也不难看出,CountWindow就是按照数据条数生成窗口,样例代码如下:

  1. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  2. import org.apache.flink.streaming.api.scala._
  3. object CountWindowsTest {
  4. def main(args: Array[String]): Unit = {
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6. val wordDS = env.socketTextStream("master102",3456)
  7. wordDS
  8. .map((_,1))
  9. .keyBy(0)
  10. //累计单个Key中3条数据就进行处理
  11. .countWindow(3)
  12. .sum(1)
  13. .print("测试:")
  14. env.execute()
  15. }
  16. }

可以看出,不同的单词根据keyby进入不同的窗口,然后当窗口中的单个key的数据个数达到3个之后进行输出。
接下来,我们主要来说一下时间窗口,「这些窗口的结束与开始都是根据数据的时间来判断的」,所以这里就引出了我们今天的第二个重点:时间语义