一、概述
1、介绍
对无界和有界数据流进行有状态计算的分布式引擎和框架,并可以使用高层API编写分布式任务,主要包括:
DataSet API(批处理):静态数据抽象为分布式数据集,方便使用操作符进行处理(Python)
DataStream API(流处理):对分布式流数据处理,从而进行各种操作
Table API:将结构化数据抽象为关系表,并使用类SQL的DSL的表进行查询
其他特定领域的库,例如机器学习、图计算
2、分层架构介绍
(1)介绍
分层架构,下层组件提供抽象服务于上层
(2)自下而上各层介绍
Deploy层:Flink的不同部署模式,包括local、Standalone脱机、Cluster、Cloud等
Runtime层:提供Flink计算的核心实现(过程函数ProcessFunction)
API层:面向流(DataStream)处理和批(Batch)处理的API
Libraries层:应用框架,CEP(复杂事件处理)、基于SQL的操作(Table/SQL API)
(3)详解
Runtime层:有状态流通过过程函数(ProcessFunction)嵌入到DataStreamAPI中
API层:DataStream API提供了通用的数据处理构建模块,比如多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。
Libraries层:TableAPI是以表为中心的声明式编程,提供可比较的操作,执行前经过内置优化器进行优化
3、运行组件
(1)组成
作业管理器(JobManager)、资源管理器(ResourceManager)、任务管理器(TaskManager),以及分发器(Dispatcher)
(2)各组件功能
作业管理器JobManager:集群管理者Master和协调者、将作业图(JobGraph)转化为数据流图/执行图;请求资源、分发、协调;
资源管理器ResourceManager:(分配slot插槽)将有空闲插槽的TaskManager分配给JobManager,发起会话、中止释放资源;
任务管理器TaskManager:负责执行计算,(包含一定并发量)注册插槽、与同一程序的task M交换数据;
分发器Dispatcher:跨作业运行、为应用提交提供了REST接口。
Client:将Flink程序提交到集群,建立到JobManager的连接,将Flink Job提交给JobManager
4、Flink的部署模式
(1)Standalone模式
(2)Yarn模式-Hadoop>2.2
两种模式:Session-Cluster和Per-Job-Cluster模式
何时向yarn申请资源,创建flink集群
(3)Kubernetes部署
启动Flink的docker组件:JobManager、TaskManager、JobManagerService
5、任务提交流程
(1)常规
(2)yarn模式
6、任务调度相关概念
(1)TaskManger与Slots:JVM进程、Task Slot是静态的概念,是指TaskManager具有的并发执行能力
(2)程序与数据流(DataFlow):Flink程序-Source、Transformation和Sink,转换运算(transformations)跟dataflow中的算子(operator)是一一对应的关系
(3)执行图(ExecutionGraph):直接映射成的数据流图是StreamGraph,也被称为逻辑流图,需要转换为物理视图
执行图包括4层:StreamGraph->JobGraph->ExecutionGraph->物理执行图
(4)并行度(Parallelism):特定算子的子任务(subtask)的个数
算子之间传输数据的形式:One-to-one类似于窄依赖,Redistributing类似于宽依赖
(5)任务链(OperatorChains):相同并行度的One-to-one操作算子,形成一个task,减少线程之间的切换和基于缓存区的数据交换
7、Flink的基础编程模型
Flink 程序的基本构建是数据输入来自一个 Source,Source 代表数据的输入端,经过 Transformation 进行转换,然后在一个或者多个Sink接收器中结束。
数据流(stream)就是一组永远不会停止的数据记录流,而转换(transformation)是将一个或多个流作为输入,并生成一个或多个输出流的操作。
执行时,Flink程序映射到 streaming dataflows,由流(streams)和转换操作(transformation operators)组成。
二、基本操作
1、入门案例
(1)批处理wordcount—DataSet
val env = ExecutionEnvironment.getExecutionEnvironment // 从文件中读取数据 val inputPath = “D:\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hello.txt” val inputDS: DataSet[String] = env.readTextFile(inputPath) // 分词之后,对单词进行groupby分组,然后用sum进行聚合 val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(.split(“ “)).map((, 1)).groupBy(0).sum(1) // 打印输出 wordCountDS.print() |
---|
(2)流处理wordcount—DataStream
object StreamWordCount { def main(args: Array[String]): Unit = { // 从外部命令中获取参数 val params: ParameterTool = ParameterTool.fromArgs(args) val host: String = params.get(“host”) val port: Int = params.getInt(“port”) // 创建流处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 接收socket文本流 val textDstream: DataStream[String] = env.socketTextStream(host, port) // flatMap和Map需要引用的隐式转换 import org.apache.flink.api.scala. val dataStream: DataStream[(String, Int)] = textDstream.flatMap(.split(“\\s”)).filter(.nonEmpty).map((, 1)).keyBy(0).sum(1) dataStream.print().setParallelism(1) // 启动executor,执行任务 env.execute(“Socket stream word count”) } } |
---|
2、Environment创建
getExecutionEnvironment,客户端,当前执行程序的上下文
createLocalEnvironment:返回本地执行环境
createRemoteEnvironment:集群执行环境,需要指定ip、端口及jar包
3、Source读取
(1)从集合读取数据:env.fromCollection(List(SensorReading(“sensor_1 “,15477181 99,35.8),SensorReading(“sensor_6”,1547718201,15.4))
(2)从文件读取数据:valstream2=env.readTextFile(“YOUR_FILE_PATH”)
(3)以kafka消息队列的数据作为来源:valstream3=env.addSource(newFlinkKafkaConsumer 011String,properties))
(4)自定义Source:valstream4=env.addSource(newMySensorSource()
4、Transform算子
Map:DataStream → DataStream,输入一个参数产生一个参数,map的功能是对输入的参数进行转换操作。
flatMap:flatMap(List(1,2,3))(i⇒List(i,i))变成112233自动加逗号
Filter:过滤掉指定条件的数据。
KeyBy:按照指定的key进行分组,流拆分成不相交的分区。
Reduce:合并当前的元素和上次聚合的结果,用来进行结果汇总合并。
Window:窗口函数,根据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)
滚动聚合算子(RollingAggregation):sum()、min()、max()、minBy()、maxBy()针对每个支流聚合Split和Select:拆分和获取指定的流
Connect(放在同一个流中)和CoMap(组合成一个流)
Union:产生一个包含所有DataStream元素的新DataStream
Connect只能操作两个流,Union可以操作多个。
5、常见的数据类型
env.fromElements(XXXX)
(1)基础数据类型
(2)Java和Scala元组(Tuples)
(3)Scala样例类(caseclasses)
(4)Java简单对象(POJOs)
(5)其它(Arrays,Lists,Maps,Enums,等等)
6、UDF-更细粒度的控制流
函数类(Function Classes):实现MapFunction,FilterFunction,ProcessFunction接口
匿名函数(Lambda Functions)
富函数(Rich Functions):函数类的接口,所有Flink函数类都有其Rich版本,自带一系列生命周期方法(开关、得到上下文),可以实现复杂功能
7、sink操作
(1)使用
没有spark中的forEach方法,需要通过stream.addSink(newMySink(xxxx))完成任务最终输出
(2)举例
kafka:union.addSink(new FlinkKafkaProducer011String))
redis:dataStream.addSink(newRedisSinkSensorReading)
Elasticsearch:dataStream.addSink( esSinkBuilder.build() )
自定义sink:dataStream.addSink(newMyJdbcSink())
三、窗口
1、窗口的介绍
(1)含义
将无限的流式数据切割为有限块处理,以便于聚合等操作
(2)图解
2、窗口的分类
(1)按性质分
Flink 支持三种划分窗口的方式,time、count和会话窗口(Session Windows):session间隔定义了非活跃周期的长度,一段时间没有接收到新数据就会生成新的窗口。如果根据时间划分窗口,那么它就是一个time-window(时间窗口);如果根据数据划分窗口,那么它就是一个count-window(数量窗口)。一段时间没有接收到新数据就会生成新的窗口,则为会话窗口。
(2)按类型分
窗口又可以分为滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
滚动窗口无重叠数据,而滑动窗口有重叠数据。
3、窗口API
窗口包含两个重要属性(size窗口大小和interval间隔-多久统计一次),如果size=interval,那么就会形成tumbling-window(无重叠数据) 如果size>interval,那么就会形成sliding-window(有重叠数据) 如果size< interval,那么窗口将会丢失数据。
例如:每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据
组合后可以形成下列四种窗口
time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))
time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))
count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)
count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)
4、API补充
windowfunction:增量聚合(每次到来都计算)、全窗口函数(全部到来再遍历)
其它可选API:.trigger()——触发器、.evitor()——移除器、.allowedLateness()——允许处理迟到的数据、.getSideOutput() —— 获取侧输出流
四、时间语义与水印
1、时间语义分类
流式数据处理的时间可以分为事件时间,进入时间和处理时间三种
Event Time:事件的创建时间,消息本身携带
Ingestion Time:进入时间,以client客户端时间为准
Processing Time:处理时间(默认的时间属性),以服务端时间为准
通常根据日志的生成时间(Event Time)进行统计
引入时间语义:
val env = StreamExecutionEnvironment.getExecutionEnvironment // 从调用时刻开始给env创建的每一个stream追加时间特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) |
---|
2、Watermark水印
(1)含义
Flink 为了处理 EventTime 窗口延迟计算提出的一种机制,本质是一种时间戳。每条消息都有一个事件时间和一个水印时间(计算得出,例如maxEventTime-t)。
通常与Window一起处理乱序事件。由于网络延迟等原因,不能无限期的等下去,保证特定时间后,必须触发窗口进行计算。
(2)实现原理
添加水印后,窗口会等5秒,再执行计算。若超过5秒,则舍弃。
窗口执行计算时间由水印时间来触发(而非窗口结束时间),当接收到消息的watermark >= endtime时,触发窗口的计算
(3)具体操作
实现TimestampAssigner接口,实现根据事件时间计算水印时间
五、ProcessFunctionAPI(底层API)
1、含义
是flink的底层转换算子,通过这些底层转换算子可以访问数据的时间戳、watermark以及注册定时事件等,还可以输出特定的一些事件,例如超时事件等。
2、组成
ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
3、调用方式
inputData.flatMap(**new MySpliter()) .process(**new KeyedProcessFunction<String, Sensor, String>() {}) |
---|
所有的Process Function都实现了RichFunction接口,都有open()、close()和getRuntimeContext()等方法
4、KeyedProcessFunction
按照key对元素进行处理,额外提供下列两个方法
processElement(Sensor sensor, Context context, Collector collector), 流中的每个元素都会调用此方法,调用结果将会放在Collector数据类型中输出;context包含上下文信息,包括当前数据的时间戳、key以及时间服务,还可以将数据放到侧输出流
onTimer(long timestamp, OnTimerContext ctx, Collector out)是一个回调函数。当定时器触发时调用。timestamp是定时器的触发时间戳,ctx上下文信息,out收集输出信息
5、TimerService和定时器对象
(1)含义及组成
TimerService是Context上下文和OnTimerContext 的对象,均包含下列方法:
//返回当前处理时间 long currentProcessingTime(); //返回当前水位线 long currentWatermark(); //注册当前key的process time定时器,process time到达定时时间时触发timer void registerProcessingTimeTimer(long var1); //注册当前key的event time定时器,当watermark大于等于定时时间时触发timer void registerEventTimeTimer(long var1); //删除指定时间戳的process time定时器 void deleteProcessingTimeTimer(long var1); //删除指定时间戳的event time定时器 void deleteEventTimeTimer(long var1); |
---|
(3)举例:温度传感器
传感器温度在1秒内持续升高则发出报警信息。
.process(**new KeyedProcessFunction<String, Sensor, String>() 实现processElement()方法负责根据值注册和清除定时器 实现onTimer()**方法用于发出报警【回调函数】 |
---|
6、侧输出流(SideOutput)
(1)含义
默认算子单一输出,用侧输出流可以产生多条不同数据类型的流
每个输出流可以定义为定义为OutputTag[X]对象,并通过Context对象发射到指定事件或对象
(2)使用方式
val monitoredReadings: DataStream[SensorReading] = readings .process(**new FreezingMonitor) monitoredReadings .getSideOutput(**new OutputTag[String](“freezing-alarms”)) .print() //输出指定的流信息 readings.print() //输出全部的流信息 |
---|
(3)FreezingMonitor函数具体实现
class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] { // 定义一个侧输出标签 lazy val freezingAlarmOutput: OutputTag[String] = new OutputTag[String](“freezing-alarms”) override def processElement(r: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { // 温度在32F以下时,输出警告信息 if (r.temperature < 32.0) { ctx.output(freezingAlarmOutput, s”Freezing Alarm for ${r.id}”) } // 所有数据直接常规输出到主流 out.collect(r) } } |
---|
7、CoProcessFunction
(1)含义
使用CoProcessFunction可以合并两条流,根据id将两个流中的数据组合
(2)实现
提供了操作每一个输入流的方法:processElement1()和processElement2()
第一个流ValueState
// 流2的处理逻辑与流1的处理逻辑类似 @Override public void processElement2(String value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception { String value1 = state1.value(); if (value1 != null**) { out.collect(Tuple2.of(value1, value**)); state1.clear(); ctx.timerService().deleteEventTimeTimer(timeState.value()); timeState.clear(); } else { state2.update(value); long time = 1111L + 60000; timeState.update(time); ctx.timerService().registerEventTimeTimer(time); } } //在定时器中将value不为空的tag进行输出 |
---|
六、状态编程与容错机制
1、状态介绍
(1)分类
流式计算分为无状态和有状态
无状态流针对每个独立事件输出结果,有状态流需要维护一个状态,并基于多个事件输出结果(当前事件+当前状态值)
(2)有状态计算举例
窗口
复杂事件处理:一分钟出现两次
流与other的关联操作
2、有状态的算子
数据源source,数据存储sink都是有状态的
状态与算子相关联,有两种类型的状态:算子状态和键控状态
(1)算子状态(operator state)
为算子状态提供三种基本数据结构:列表状态(List state)、联合列表状态(Union list state)、广播状态(Broadcast state)
(2)键控状态(keyed state)
根据输入数据流中的键(key)来维护和访问,相同的key访问相同的状态
State支持的数据类型:单个值、列表值、map值、AggregatingState、ReducingState
2、状态一致性
(1)含义
成功处理故障并恢复之后得到的结果,与不产生故障的结果是否一致
(2)流处理器内部的一致性级别
主要包括at-most-once(计数结果可能丢失,无正确性保障)、at-least-once(计数结果可能大于但不会小于,可能会重复)、exactly-once(计数结果准确-依赖于检查点)
第一代流处理器Storm只保证at-least-once
Flink既保证了exactly-once,也具有低延迟和高吞吐的处理能力。
(3)流处理器之间的端到端状态一致性
结果的正确性贯穿了整个流处理应用的始终,各组件具有自身一致性;系统级别取决于所有组件中一致性最弱的组件
整个应用的一致性级别可以划分为:
内部保证(检查点)
source(可以重设数据读取位置)
sink(故障恢复时,数据不会重复写入,实现方式包含幂等写入、事务写入)
幂等写入:可执行多次,重复执行不起作用(例如转账500元)
事务写入:构建事务进行写入,checkpoint完成,才会将结果写入,事务性写入,具体又有两种实现方式:预写日志(WAL,GenericWriteAheadSink模板类)和两阶段提交(2PC,TwoPhaseCommitSinkFunction接口)
3、检查点(checkpoint)
(1)含义
重新计数的参考点
使用检查点保证exactly-once(计数结果准确),出现故障时将系统重置回正确状态
(2)实现
记录(字符串,数值),分别表示分组字符串(状态)和位置/偏移量
遇到检查点分界线(barrier)时,将输入流的位置异步持久化存储,从而可以从此位置重启
检查点失败,则会丢弃检查点并继续执行
4、数据管道实现精确语义
(1)组成
Flink + Kafka的数据管道系统(Kafka进、Kafka出)
(2)实现
内部:利用checkpoint机制,把状态存盘
source:kafka consumer作为source,可以将偏移量保存下来,故障恢复可以重置偏移量
sink:采用两阶段提交 sink,需要实现TwoPhaseCommitSinkFunction(预提交,jobmanager ack后正式提交)
5、状态后端选择
(1)分类
MemoryStateBackend:将键控状态作为内存中的对象进行管理,状态存在JVM堆上,检查点存在jobmanager内存中
FsStateBackend:检查点存在存到远程的持久化文件系统(FileSystem)上
RocksDBStateBackend:所有状态序列化后,存入本地的RocksDB中存储
RocksDB需要引入对应的依赖
(2)使用不同的状态后端
val env = StreamExecutionEnvironment.getExecutionEnvironment val checkpointPath: String = ??? val backend = new RocksDBStateBackend**(<strong>checkpointPath</strong>) env.setStateBackend(backend) env.setStateBackend(**new FsStateBackend(“file:///tmp/checkpoints”)) env.enableCheckpointing(1000) // 配置重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS))) |
---|
七、TableAPI与SQL
1、介绍
(1)介绍
Table API是流处理和批处理通用的关系型API,实现了流处理和批处理的统一
Table API和SQL是Flink中封装程度最高的API,但并不支持所有算子
(2)实现功能
内部目录catalog中注册表
注册外部catalog
执行SQL查询
注册用户定义(标量,表或聚合)函数
将DataStream或DataSet转换为表
持有对ExecutionEnvironment或StreamExecutionEnvironment的引用
2、Table API过程
(1)引入pom依赖
flink-table_2.11
(2)构造表环境
def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment **= StreamExecutionEnvironment.getExecutionEnvironment val myKafkaConsumer: FlinkKafkaConsumer011**[String] = MyKafkaUtil.getConsumer(“GMALL_STARTUP”) val dstream: DataStream[String] = env**.addSource(myKafkaConsumer) val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) val startupLogDstream: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) } val startupLogTable: Table **= tableEnv.**fromDataStream**(startupLogDstream) val table: Table = startupLogTable.select(“mid,ch”).filter(“ch =’appstore’”) val midchDataStream: DataStream**[(String, String)] = table.toAppendStream[(String,String)] midchDataStream.print() env.execute() }** |
---|
(3)动态表
根据样例类生成table:tableEnv.fromDataStream(startupLogDstream)
对流根据字段命名:tableEnv.fromDataStream(startupLogDstream,’mid,’uid …….) -单引号标识
动态表按流输出:table.toAppendStream[(String,String)]
3、窗口聚合操作
(1)例子:统计每10秒中每个传感器温度值的个数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); DataStreamSource<String> inputData = env.readTextFile(“F:\\workspace\\flinkjavademo\\src\\main\\resources\\Sensor.txt”); DataStream<Sensor> dataStream = inputData.flatMap(**new MySpliter()) .assignTimestampsAndWatermarks(**new WatermarkStrategy<Sensor>() { @Override public WatermarkGenerator<Sensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Sensor>() { private long maxTimestamp; private long delay = 3000; //以“timeStamp”字段为event-time时间戳开启了一个时间跨度为10秒,水位线为3秒的滚动窗口 @Override public void onEvent(Sensor sensor, long l, WatermarkOutput watermarkOutput) { maxTimestamp = Math.max(sensor.getTimeStamp(),l); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(**new Watermark(maxTimestamp-delay)); } }; } }); EnvironmentSettings envSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting); Table table = tableEnv.fromDataStream(dataStream,$(“id”),$(“timeStamp”).rowtime(),$(“temperature”)); Table filterTable = table.window(Tumble.over(lit(10).seconds()).on($(“timeStamp”)).as(“tw”)) .groupBy($(“id”),$(“tw”)) .select($(“id”),$(“id”).count().as(“count”)); DataStream<Tuple2<Boolean, Row>> sensorDataStream = tableEnv.toRetractStream(filterTable, Row.class); sensorDataStream.print(); env.execute(“table test”);** |
---|
(2)group by
Group table转换为流:table.toRetractStream[(String,Long)]
api包括时间窗口,窗口的字段必须出现在groupBy中。
val resultTable: Table = dataTable .window( Tumble over 10.seconds on ‘ts as ‘tw ) .groupBy(‘id, ‘tw) .select(‘id, ‘id.count) |
---|
(3)时间窗口
提前声明时间字段,如果是processTime直接在创建动态表时进行追加
val dataTable: Table = tableEnv.fromDataStream(dataStream, ‘id, ‘temperature, ‘ps.proctime) |
---|
使用Tumble over 10000.millis on 来表示滚动窗口
使用Tumble over 10.seconds on ‘ts as ‘tw表示滑动窗口
4、SQL编写
EnvironmentSettings envSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting); //使用流创建表 Table table = tableEnv.fromDataStream(dataStream,$(“id”),$(“ts”).rowtime(),$(“temperature”)); //编写SQL完成开窗统计 Table filterTable = tableEnv.sqlQuery(“select id,count(id) num from “ + table + “ group by id,tumble(ts, interval ‘10’ second)”); DataStream<Tuple2<Boolean, Row>> sensorDataStream = tableEnv.toRetractStream(filterTable, Row.class); sensorDataStream.print(); env.execute(“table test”); |
---|
八、Flink CEP
1、介绍
Complex Event Processing,复杂事件处理
一个或多个简单事件构成的复杂事件流,得到满足规则的复杂事件。
2、Flink CEP library
Flink有专门的library支持
3、使用
在DataStream流上定义出模式条件,之后Flink CEP引擎进行模式检测,必要时生成告警
例子:登录检测