一、概述

1、介绍

无界和有界数据流进行有状态计算的分布式引擎和框架,并可以使用高层API编写分布式任务,主要包括:
DataSet API(批处理):静态数据抽象为分布式数据集,方便使用操作符进行处理(Python)
DataStream API(流处理):对分布式流数据处理,从而进行各种操作
Table API:将结构化数据抽象为关系表,并使用类SQL的DSL的表进行查询
其他特定领域的库,例如机器学习、图计算

2、分层架构介绍

(1)介绍
分层架构,下层组件提供抽象服务于上层
Flink面试题 - 图1
(2)自下而上各层介绍
Deploy层:Flink的不同部署模式,包括local、Standalone脱机、Cluster、Cloud等
Runtime层:提供Flink计算的核心实现(过程函数ProcessFunction)
API层:面向流(DataStream)处理和批(Batch)处理的API
Libraries层:应用框架,CEP(复杂事件处理)、基于SQL的操作(Table/SQL API)
(3)详解
Runtime层:有状态流通过过程函数(ProcessFunction)嵌入到DataStreamAPI中
API层:DataStream API提供了通用的数据处理构建模块,比如多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。
Libraries层:TableAPI是以表为中心的声明式编程,提供可比较的操作,执行前经过内置优化器进行优化

3、运行组件

(1)组成
作业管理器(JobManager)、资源管理器(ResourceManager)、任务管理器(TaskManager),以及分发器(Dispatcher)

(2)各组件功能
作业管理器JobManager:集群管理者Master和协调者、将作业图(JobGraph)转化为数据流图/执行图;请求资源、分发、协调;

资源管理器ResourceManager:(分配slot插槽)将有空闲插槽的TaskManager分配给JobManager,发起会话、中止释放资源;

任务管理器TaskManager:负责执行计算,(包含一定并发量)注册插槽、与同一程序的task M交换数据;

分发器Dispatcher:跨作业运行、为应用提交提供了REST接口。

Client:将Flink程序提交到集群,建立到JobManager的连接,将Flink Job提交给JobManager

4、Flink的部署模式

(1)Standalone模式
(2)Yarn模式-Hadoop>2.2
两种模式:Session-Cluster和Per-Job-Cluster模式
何时向yarn申请资源,创建flink集群
(3)Kubernetes部署
启动Flink的docker组件:JobManager、TaskManager、JobManagerService

5、任务提交流程

(1)常规
Flink面试题 - 图2
(2)yarn模式
Flink面试题 - 图3

6、任务调度相关概念

(1)TaskManger与Slots:JVM进程、Task Slot是静态的概念,是指TaskManager具有的并发执行能力
(2)程序与数据流(DataFlow):Flink程序-Source、Transformation和Sink,转换运算(transformations)跟dataflow中的算子(operator)是一一对应的关系
(3)执行图(ExecutionGraph):直接映射成的数据流图是StreamGraph,也被称为逻辑流图,需要转换为物理视图
执行图包括4层:StreamGraph->JobGraph->ExecutionGraph->物理执行图
(4)并行度(Parallelism):特定算子的子任务(subtask)的个数
算子之间传输数据的形式:One-to-one类似于窄依赖,Redistributing类似于宽依赖
(5)任务链(OperatorChains):相同并行度的One-to-one操作算子,形成一个task,减少线程之间的切换和基于缓存区的数据交换

7、Flink的基础编程模型

Flink 程序的基本构建是数据输入来自一个 Source,Source 代表数据的输入端,经过 Transformation 进行转换,然后在一个或者多个Sink接收器中结束。
数据流(stream)就是一组永远不会停止的数据记录流,而转换(transformation)是将一个或多个流作为输入,并生成一个或多个输出流的操作。
执行时,Flink程序映射到 streaming dataflows,由流(streams)和转换操作(transformation operators)组成。
Flink面试题 - 图4

二、基本操作

1、入门案例

(1)批处理wordcount—DataSet

val env = ExecutionEnvironment.getExecutionEnvironment
// 从文件中读取数据
val inputPath = “D:\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hello.txt”
val inputDS: DataSet[String] = env.readTextFile(inputPath)
// 分词之后,对单词进行groupby分组,然后用sum进行聚合
val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(.split(“ “)).map((, 1)).groupBy(0).sum(1)
// 打印输出
wordCountDS.print()

(2)流处理wordcount—DataStream

object StreamWordCount {
def main(args: Array[String]): Unit = {
// 从外部命令中获取参数
val params: ParameterTool = ParameterTool.fromArgs(args)
val host: String = params.get(“host”)
val port: Int = params.getInt(“port”)
// 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 接收socket文本流
val textDstream: DataStream[String] = env.socketTextStream(host, port)
// flatMap和Map需要引用的隐式转换
import org.apache.flink.api.scala.
val dataStream: DataStream[(String, Int)] = textDstream.flatMap(
.split(“\\s”)).filter(.nonEmpty).map((, 1)).keyBy(0).sum(1)
dataStream.print().setParallelism(1)
// 启动executor,执行任务
env.execute(“Socket stream word count”)
}
}

2、Environment创建

getExecutionEnvironment,客户端,当前执行程序的上下文
createLocalEnvironment:返回本地执行环境
createRemoteEnvironment:集群执行环境,需要指定ip、端口及jar包

3、Source读取

(1)从集合读取数据:env.fromCollection(List(SensorReading(“sensor_1 “,15477181 99,35.8),SensorReading(“sensor_6”,1547718201,15.4))
(2)从文件读取数据:valstream2=env.readTextFile(“YOUR_FILE_PATH”)
(3)以kafka消息队列的数据作为来源:valstream3=env.addSource(newFlinkKafkaConsumer 011String,properties))
(4)自定义Source:valstream4=env.addSource(newMySensorSource()

4、Transform算子

Map:DataStream → DataStream,输入一个参数产生一个参数,map的功能是对输入的参数进行转换操作。
flatMap:flatMap(List(1,2,3))(i⇒List(i,i))变成112233自动加逗号
Filter:过滤掉指定条件的数据。
KeyBy:按照指定的key进行分组,流拆分成不相交的分区。
Reduce:合并当前的元素和上次聚合的结果,用来进行结果汇总合并。
Window:窗口函数,根据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)
滚动聚合算子(RollingAggregation):sum()、min()、max()、minBy()、maxBy()针对每个支流聚合Split和Select:拆分和获取指定的流
Connect(放在同一个流中)和CoMap(组合成一个流)
Union:产生一个包含所有DataStream元素的新DataStream
Connect只能操作两个流,Union可以操作多个。

5、常见的数据类型

env.fromElements(XXXX)
(1)基础数据类型
(2)Java和Scala元组(Tuples)
(3)Scala样例类(caseclasses)
(4)Java简单对象(POJOs)
(5)其它(Arrays,Lists,Maps,Enums,等等)

6、UDF-更细粒度的控制流

函数类(Function Classes):实现MapFunction,FilterFunction,ProcessFunction接口
匿名函数(Lambda Functions)
富函数(Rich Functions):函数类的接口,所有Flink函数类都有其Rich版本,自带一系列生命周期方法(开关、得到上下文),可以实现复杂功能

7、sink操作

(1)使用
没有spark中的forEach方法,需要通过stream.addSink(newMySink(xxxx))完成任务最终输出
(2)举例
kafka:union.addSink(new FlinkKafkaProducer011String))
redis:dataStream.addSink(newRedisSinkSensorReading)
Elasticsearch:dataStream.addSink( esSinkBuilder.build() )
自定义sink:dataStream.addSink(newMyJdbcSink())

三、窗口

1、窗口的介绍

(1)含义
将无限的流式数据切割为有限块处理,以便于聚合等操作
(2)图解
Flink面试题 - 图5

2、窗口的分类

(1)按性质分
Flink 支持三种划分窗口的方式,time、count和会话窗口(Session Windows):session间隔定义了非活跃周期的长度,一段时间没有接收到新数据就会生成新的窗口。如果根据时间划分窗口,那么它就是一个time-window(时间窗口);如果根据数据划分窗口,那么它就是一个count-window(数量窗口)。一段时间没有接收到新数据就会生成新的窗口,则为会话窗口。
(2)按类型分
窗口又可以分为滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
滚动窗口无重叠数据,而滑动窗口有重叠数据。

3、窗口API

窗口包含两个重要属性(size窗口大小和interval间隔-多久统计一次),如果size=interval,那么就会形成tumbling-window(无重叠数据) 如果size>interval,那么就会形成sliding-window(有重叠数据) 如果size< interval,那么窗口将会丢失数据。
例如:每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据
组合后可以形成下列四种窗口
time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))
time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))
count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)
count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)

4、API补充

windowfunction:增量聚合(每次到来都计算)、全窗口函数(全部到来再遍历)
其它可选API:.trigger()——触发器、.evitor()——移除器、.allowedLateness()——允许处理迟到的数据、.getSideOutput() —— 获取侧输出流

四、时间语义与水印

1、时间语义分类

流式数据处理的时间可以分为事件时间,进入时间和处理时间三种
Event Time:事件的创建时间,消息本身携带
Ingestion Time:进入时间,以client客户端时间为准
Processing Time:处理时间(默认的时间属性),以服务端时间为准
通常根据日志的生成时间(Event Time)进行统计
引入时间语义:

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2、Watermark水印

(1)含义
Flink 为了处理 EventTime 窗口延迟计算提出的一种机制,本质是一种时间戳。每条消息都有一个事件时间和一个水印时间(计算得出,例如maxEventTime-t)。
通常与Window一起处理乱序事件。由于网络延迟等原因,不能无限期的等下去,保证特定时间后,必须触发窗口进行计算。
(2)实现原理
添加水印后,窗口会等5秒,再执行计算。若超过5秒,则舍弃。
窗口执行计算时间由水印时间来触发(而非窗口结束时间),当接收到消息的watermark >= endtime时,触发窗口的计算
(3)具体操作
实现TimestampAssigner接口,实现根据事件时间计算水印时间

五、ProcessFunctionAPI(底层API)

1、含义

是flink的底层转换算子,通过这些底层转换算子可以访问数据的时间戳、watermark以及注册定时事件等,还可以输出特定的一些事件,例如超时事件等。

2、组成

ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction

3、调用方式

inputData.flatMap(**new MySpliter())
.process(**new KeyedProcessFunction<String, Sensor, String>() {})

所有的Process Function都实现了RichFunction接口,都有open()、close()和getRuntimeContext()等方法

4、KeyedProcessFunction

按照key对元素进行处理,额外提供下列两个方法
processElement(Sensor sensor, Context context, Collector collector), 流中的每个元素都会调用此方法,调用结果将会放在Collector数据类型中输出;context包含上下文信息,包括当前数据的时间戳、key以及时间服务,还可以将数据放到侧输出流

onTimer(long timestamp, OnTimerContext ctx, Collector out)是一个回调函数。当定时器触发时调用。timestamp是定时器的触发时间戳,ctx上下文信息,out收集输出信息

5、TimerService和定时器对象

(1)含义及组成
TimerService是Context上下文和OnTimerContext 的对象,均包含下列方法:

//返回当前处理时间
long currentProcessingTime();
//返回当前水位线
long currentWatermark();
//注册当前key的process time定时器,process time到达定时时间时触发timer
void registerProcessingTimeTimer(long var1);
//注册当前key的event time定时器,当watermark大于等于定时时间时触发timer
void registerEventTimeTimer(long var1);
//删除指定时间戳的process time定时器
void deleteProcessingTimeTimer(long var1);
//删除指定时间戳的event time定时器
void deleteEventTimeTimer(long var1);

(3)举例:温度传感器
传感器温度在1秒内持续升高则发出报警信息。

.process(**new KeyedProcessFunction<String, Sensor, String>()
实现processElement
()方法负责根据值注册和清除定时器
实现onTimer
()**方法用于发出报警【回调函数】

6、侧输出流(SideOutput)

(1)含义
默认算子单一输出,用侧输出流可以产生多条不同数据类型的流
每个输出流可以定义为定义为OutputTag[X]对象,并通过Context对象发射到指定事件或对象
(2)使用方式

val monitoredReadings: DataStream[SensorReading] = readings
.process(**new FreezingMonitor)
monitoredReadings
.getSideOutput(**new OutputTag[String](“freezing-alarms”))
.print() //输出指定的流信息
readings.print() //输出全部的流信息

(3)FreezingMonitor函数具体实现

class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] {
// 定义一个侧输出标签
lazy val freezingAlarmOutput: OutputTag[String] =
new OutputTag[String](“freezing-alarms”)

override def processElement(r: SensorReading,
ctx: ProcessFunction[SensorReading, SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// 温度在32F以下时,输出警告信息
if (r.temperature < 32.0) {
ctx.output(freezingAlarmOutput, s”Freezing Alarm for ${r.id}”)
}
// 所有数据直接常规输出到主流
out.collect(r)
}
}

7、CoProcessFunction

(1)含义
使用CoProcessFunction可以合并两条流,根据id将两个流中的数据组合
(2)实现
提供了操作每一个输入流的方法:processElement1()和processElement2()
第一个流ValueState的值不为空,则在第二个流中合并

// 流2的处理逻辑与流1的处理逻辑类似
@Override
public void processElement2(String value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
String value1 = state1.value();
if (value1 != null**) {
out.collect(Tuple2.of(value1, value**));
state1.clear();
ctx.timerService().deleteEventTimeTimer(timeState.value());
timeState.clear();
} else {
state2.update(value);
long time = 1111L + 60000;
timeState.update(time);
ctx.timerService().registerEventTimeTimer(time);
}
}
//在定时器中将value不为空的tag进行输出

六、状态编程与容错机制

1、状态介绍

(1)分类
流式计算分为无状态和有状态
无状态流针对每个独立事件输出结果,有状态流需要维护一个状态,并基于多个事件输出结果(当前事件+当前状态值)
Flink面试题 - 图6
(2)有状态计算举例
窗口
复杂事件处理:一分钟出现两次
流与other的关联操作

2、有状态的算子

数据源source,数据存储sink都是有状态的
状态与算子相关联,有两种类型的状态:算子状态和键控状态
(1)算子状态(operator state)
为算子状态提供三种基本数据结构:列表状态(List state)、联合列表状态(Union list state)、广播状态(Broadcast state)
(2)键控状态(keyed state)
根据输入数据流中的键(key)来维护和访问,相同的key访问相同的状态
State支持的数据类型:单个值、列表值、map值、AggregatingState、ReducingState

2、状态一致性

(1)含义
成功处理故障并恢复之后得到的结果,与不产生故障的结果是否一致
(2)流处理器内部的一致性级别
主要包括at-most-once(计数结果可能丢失,无正确性保障)、at-least-once(计数结果可能大于但不会小于,可能会重复)、exactly-once(计数结果准确-依赖于检查点)
第一代流处理器Storm只保证at-least-once
Flink既保证了exactly-once,也具有低延迟和高吞吐的处理能力。
(3)流处理器之间的端到端状态一致性
结果的正确性贯穿了整个流处理应用的始终,各组件具有自身一致性;系统级别取决于所有组件中一致性最弱的组件
整个应用的一致性级别可以划分为:
内部保证(检查点)
source(可以重设数据读取位置)
sink(故障恢复时,数据不会重复写入,实现方式包含幂等写入、事务写入)
幂等写入:可执行多次,重复执行不起作用(例如转账500元)
事务写入:构建事务进行写入,checkpoint完成,才会将结果写入,事务性写入,具体又有两种实现方式:预写日志(WAL,GenericWriteAheadSink模板类)和两阶段提交(2PC,TwoPhaseCommitSinkFunction接口)

3、检查点(checkpoint)

(1)含义
重新计数的参考点
使用检查点保证exactly-once(计数结果准确),出现故障时将系统重置回正确状态
(2)实现
记录(字符串,数值),分别表示分组字符串(状态)和位置/偏移量
遇到检查点分界线(barrier)时,将输入流的位置异步持久化存储,从而可以从此位置重启
检查点失败,则会丢弃检查点并继续执行

4、数据管道实现精确语义

(1)组成
Flink + Kafka的数据管道系统(Kafka进、Kafka出)
Flink面试题 - 图7
(2)实现
内部:利用checkpoint机制,把状态存盘
source:kafka consumer作为source,可以将偏移量保存下来,故障恢复可以重置偏移量
sink:采用两阶段提交 sink,需要实现TwoPhaseCommitSinkFunction(预提交,jobmanager ack后正式提交)

5、状态后端选择

(1)分类
MemoryStateBackend:将键控状态作为内存中的对象进行管理,状态存在JVM堆上,检查点存在jobmanager内存中
FsStateBackend:检查点存在存到远程的持久化文件系统(FileSystem)上
RocksDBStateBackend:所有状态序列化后,存入本地的RocksDB中存储
RocksDB需要引入对应的依赖
(2)使用不同的状态后端

val env = StreamExecutionEnvironment.getExecutionEnvironment
val checkpointPath: String = ???
val backend = new RocksDBStateBackend**(<strong>checkpointPath</strong>)

env
.setStateBackend(backend)
env
.setStateBackend(**new FsStateBackend(“file:///tmp/checkpoints”))
env.enableCheckpointing(1000)
// 配置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)))

七、TableAPI与SQL

1、介绍

(1)介绍
Table API是流处理和批处理通用的关系型API,实现了流处理和批处理的统一
Table API和SQL是Flink中封装程度最高的API,但并不支持所有算子
(2)实现功能
内部目录catalog中注册表
注册外部catalog
执行SQL查询
注册用户定义(标量,表或聚合)函数
将DataStream或DataSet转换为表
持有对ExecutionEnvironment或StreamExecutionEnvironment的引用

2、Table API过程

(1)引入pom依赖
flink-table_2.11
(2)构造表环境

def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment **= StreamExecutionEnvironment.getExecutionEnvironment
val myKafkaConsumer
: FlinkKafkaConsumer011**[String] = MyKafkaUtil.getConsumer(“GMALL_STARTUP”)
val dstream: DataStream[String] = env**.addSource(myKafkaConsumer)
val tableEnv
: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
val startupLogDstream
: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) }
val startupLogTable
: Table **= tableEnv.**fromDataStream**(startupLogDstream)
val table: Table = startupLogTable.select(“mid,ch”).filter(“ch =’appstore’”)
val midchDataStream: DataStream**[(String, String)] = table.toAppendStream[(String,String)]
midchDataStream
.print()
env
.execute()
}**

(3)动态表
根据样例类生成table:tableEnv.fromDataStream(startupLogDstream)
对流根据字段命名:tableEnv.fromDataStream(startupLogDstream,’mid,’uid …….) -单引号标识
动态表按流输出:table.toAppendStream[(String,String)]

3、窗口聚合操作

(1)例子:统计每10秒中每个传感器温度值的个数

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> inputData = env.readTextFile(“F:\\workspace\\flinkjavademo\\src\\main\\resources\\Sensor.txt”);
DataStream<Sensor> dataStream = inputData.flatMap(**new MySpliter())
.assignTimestampsAndWatermarks(**new WatermarkStrategy<Sensor>() {
@Override
public WatermarkGenerator<Sensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Sensor>() {
private long maxTimestamp;
private long delay = 3000;
//以“timeStamp”字段为event-time时间戳开启了一个时间跨度为10秒,水位线为3秒的滚动窗口
@Override
public void onEvent(Sensor sensor, long l, WatermarkOutput watermarkOutput) {
maxTimestamp = Math.max(sensor.getTimeStamp(),l);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(**new Watermark(maxTimestamp-delay));
}
};
}
});
EnvironmentSettings envSetting
= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv
= StreamTableEnvironment.create(env, envSetting);
Table table
= tableEnv.fromDataStream(dataStream,$(“id”),$(“timeStamp”).rowtime(),$(“temperature”));
Table filterTable
= table.window(Tumble.over(lit(10).seconds()).on($(“timeStamp”)).as(“tw”))
.groupBy($(“id”),$(“tw”))
.select($(“id”),$(“id”).count().as(“count”));
DataStream
<Tuple2<Boolean, Row>> sensorDataStream = tableEnv.toRetractStream(filterTable, Row.class);
sensorDataStream
.print();
env
.execute(“table test”);**

(2)group by
Group table转换为流:table.toRetractStream[(String,Long)]
api包括时间窗口,窗口的字段必须出现在groupBy中。

val resultTable: Table = dataTable
.window( Tumble over 10.seconds on ‘ts as ‘tw )
.groupBy(‘id, ‘tw)
.select(‘id, ‘id.count)

(3)时间窗口
提前声明时间字段,如果是processTime直接在创建动态表时进行追加

val dataTable: Table = tableEnv.fromDataStream(dataStream, ‘id, ‘temperature, ‘ps.proctime)

使用Tumble over 10000.millis on 来表示滚动窗口
使用Tumble over 10.seconds on ‘ts as ‘tw表示滑动窗口

4、SQL编写

EnvironmentSettings envSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting);
//使用流创建表
Table table = tableEnv.fromDataStream(dataStream,$(“id”),$(“ts”).rowtime(),$(“temperature”));
//编写SQL完成开窗统计
Table filterTable = tableEnv.sqlQuery(“select id,count(id) num from “ + table + “ group by id,tumble(ts, interval ‘10’ second)”);
DataStream<Tuple2<Boolean, Row>> sensorDataStream = tableEnv.toRetractStream(filterTable, Row.class);
sensorDataStream.print();
env.execute(“table test”);

八、Flink CEP

1、介绍

Complex Event Processing,复杂事件处理
一个或多个简单事件构成的复杂事件流,得到满足规则的复杂事件。

2、Flink CEP library

Flink有专门的library支持
Flink面试题 - 图8

3、使用

在DataStream流上定义出模式条件,之后Flink CEP引擎进行模式检测,必要时生成告警
例子:登录检测