https://zhuanlan.zhihu.com/p/151781508
https://zhuanlan.zhihu.com/p/360321364
flink的核心亮点:
- 窗口
- 时间语义
- 精准一次性
但是我们在之前都是以事件为驱动,等于说是来了一条数据,我就处理一次,但是现在遇到的问题是:
我们可以简单的把wordCount的需求比做公司的订单金额,也就是订单金额会随着订单的增加而只增不减,那么如果运营部门提了以下需求:
- 每有1000条订单就输出一次这1000条订单的总金额
- 每5分钟输出一次刚刚过去这5分钟的订单总金额
- 每3秒输出一次最近5分钟内的累计成交额
- 连续2条订单的间隔时间超过30秒就按照这个时间分为2组订单,输出前一组订单的总金额
那么面对这个需求,因为时间一直是流动的,大家有什么想法?
基于这些需求,我们来讲一下flink的窗口。
窗口
窗口:无论是hive中的开窗函数,还是Spark中的批次计算中的窗口,还是我们这里讲的窗口,「本质上都是对数据进行划分,然后对划分后的数据进行计算。」
那么Windows是处理无限流的核心。Windows将流分成有限大小的“存储桶”,我们可以在其上应用计算。
在flink中,窗口式Flink程序一般有2类
键控流
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
非键控流
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
「唯一的区别是:对键控流的keyBy(…)调用window(…),而非键控流则是调用windowAll(…)。」
窗口的生命周期
我们上面说窗口就是对数据进行划分到不同的“桶”中,然后进行计算,那么什么开始有这个桶,什么时候就算是分完了呢? 简而言之,一旦应属于该窗口的第一个元素到达,就会创建一个窗口,当时间超过用户设置的时间戳时,flink将删除这个窗口。
那我们来理解一下窗口的类型:
- CountWindow:按照指定的数据条数生成一个Window,与时间无关。
- TimeWindow:按照时间生成Window。 1. 滚动窗口 2. 滑动窗口 3. 会话窗口
从文字也不难看出,CountWindow就是按照数据条数生成窗口,样例代码如下:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object CountWindowsTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val wordDS = env.socketTextStream("master102",3456)
wordDS
.map((_,1))
.keyBy(0)
//累计单个Key中3条数据就进行处理
.countWindow(3)
.sum(1)
.print("测试:")
env.execute()
}
}
可以看出,不同的单词根据keyby进入不同的窗口,然后当窗口中的单个key的数据个数达到3个之后进行输出。
接下来,我们主要来说一下时间窗口,「这些窗口的结束与开始都是根据数据的时间来判断的」,所以这里就引出了我们今天的第二个重点:时间语义