一、概述
1、Flink and Spark
Flink 主要解决的问题
—Apache Flink是一个和分布式处理引擎,用于对无界和有界数据流进行有状态计算。 1数据精准一次性处理(Exactly-Once) 2乱序数据,迟到数据 3低延迟,高吞吐,准确性 4容错性
Spark和Flink的计算模型
—Spark 微批次处理模型 —Flink 连续流模型
1 批处理 2 流处理
2.1 无界数据流
2.2 有界数据流
Flink和Spark的主要区别
1、Flink可以内部解决有状态的计算,Spark需要借助第三方数据库(redis)去保存状态。
二、运行架构
1、Runtime整体架构
2、客户端
用于发送dataflow到JobManager。
3、JobManager
一个应用主进程,主要用于接收应用程序,作业图、数据流图、打包的所有类、jar等。同时转换为执行图和申请资源。 JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调 这个进程包含3个不同的组件
3.1、ResourceManager
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。
3.2、Dispatcher
负责接收用户提供的作业,并且负责为这个新提交的作业启动一个新的JobManager 组件. Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。 集群Job的调度分发 根据JobGraph启动JobManager(JobMaster)
3.3、JobMaster
JobMaster负责管理单个JobGraph的执行.多个Job可以同时运行在一个Flink集群中, 每个Job都有一个自己的JobMaster.
4、TaskManager
flink中的工作进程。一个flink集群中有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。 Slot计算资源提供者
三、状态管理
1、状态说明
1.1、概念
需要记录多个事件的记录,比如本次计算需要基于上一次事件的数据,需要把上一次计算结果记录在状态里面。
1.2、场景
去重
例如:UV,需要判断这个数据是否来过。
检查
例如:在检查水位是否上升时,需要和前一个作对比。
风控
例如:对于一些登陆频繁的用户,将将其登录次数存入状态,进行风险控制。
聚合
例如:开窗聚合需要一个窗口的状态
1.3、分类
1.3.1、Managed state
- Operator State(算子状态)
说明:常用于source算子,例如:FlinkKafkaConsumer,一个子任务对应一个状态。 作用:常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。 注意:每个子任务可以共享一个状态,但是子任务之间不能共享状态。
a)列表状态(List state)
将状态表示为一组数据的列表
//实现CheckpointedFunction或ListCheckpointed(已经过时)接口
private static class MyCountMapper implements MapFunction<String, Long>, CheckpointedFunction {
private Long count = 0L;
private ListState<Long> state;
@Override
public Long map(String value) throws Exception {
count++;
return count;
}
// 初始化时会调用这个方法,向本地状态中填充数据. 每个子任务调用一次
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
System.out.println("initializeState...");
state = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<Long>("state", Long.class));
for (Long c : state.get()) {
count += c;
}
}
// Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
System.out.println("snapshotState...");
state.clear();
state.add(count);
}
}
b)Broadcast state(广播状态)
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
DataStreamSource<String> dataStream = env.socketTextStream("hadoop102", 9999);
DataStreamSource<String> controlStream = env.socketTextStream("hadoop102", 8888);
MapStateDescriptor<String, String> stateDescriptor = new MapStateDescriptor<>("state", String.class, String.class);
// 将controlStream转换为广播流
BroadcastStream<String> broadcastStream = controlStream.broadcast(stateDescriptor);
dataStream
.connect(broadcastStream)
.process(new BroadcastProcessFunction<String, String, String>() {
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
// 从广播状态中取值, 不同的值做不同的业务
ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);
if ("1".equals(state.get("switch"))) {
out.collect("切换到1号配置....");
} else if ("0".equals(state.get("switch"))) {
out.collect("切换到0号配置....");
} else {
out.collect("切换到其他配置....");
}
}
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);
// 把值放入广播状态
state.put("switch", value);
}
})
- Keyed State(键控状态)
说明:常用于KeyedStream上的算子,一个key对应一个state,一个算子可以处多个key访问多个state。重写RichFunction, 通过里面的RuntimeContext访问 作用:通过数据流中的key去维护和访问,key相同的数据都会分区到同一个算子任务中,具有相同的key可以访问相同状态。 注意:只能是keyby后的KeyedStream数据。类似于K、V的map类型。
a)ValueState
保存单个值. 每个有key有一个状态值. 设置使用update(T), 获取使用 T value()
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
//1、定义状态变量
private ValueState<Integer> state;
@Override
public void open(Configuration parameters) throws Exception {
//2、初始化状态
state = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("state", Integer.class));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
//3、获取状态
Integer lastVc = state.value() == null ? 0 : state.value();
if (Math.abs(value.getVc() - lastVc) >= 10) {
out.collect(value.getId() + " 红色警报!!!");
}
//4、更新状态
state.update(value.getVc());
}
})
b)ListState
保存元素列表 添加元素: add(T) addAll(List
) 获取元素: Iterable get() 覆盖所有元素: update(List )
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, List<Integer>>() {
private ListState<Integer> vcState;
@Override
public void open(Configuration parameters) throws Exception {
vcState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcState", Integer.class));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<List<Integer>> out) throws Exception {
vcState.add(value.getVc());
//1. 获取状态中所有水位高度, 并排序
List<Integer> vcs = new ArrayList<>();
for (Integer vc : vcState.get()) {
vcs.add(vc);
}
// 2. 降序排列
vcs.sort((o1, o2) -> o2 - o1);
// 3. 当长度超过3的时候移除最后一个
if (vcs.size() > 3) {
vcs.remove(3);
}
vcState.update(vcs);
out.collect(vcs);
}
})
c)MapState
存储键值对列表. 添加键值对: put(UK, UV) or putAll(Map
) 根据key获取值: get(UK) 获取所有: entries(), keys() and values() 检测是否为空: isEmpty()
//去重: 去掉重复的水位值. 思路: 把水位值作为MapState的key来实现去重, value随意
.process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {
private MapState<Integer, String> mapState;
@Override
public void open(Configuration parameters) throws Exception {
mapState = this
.getRuntimeContext()
.getMapState(new MapStateDescriptor<Integer, String>("mapState", Integer.class, String.class));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
if (!mapState.contains(value.getVc())) {
out.collect(value);
mapState.put(value.getVc(), "随意");
}
}
})
d)ReducingState
存储单个值, 表示把所有元素的聚合结果添加到状态中. 与ListState类似, 但是当使用add(T)的时候ReducingState会使用指定的ReduceFunction进行聚合
//计算每个传感器的水位和
.process(new KeyedProcessFunction<String, WaterSensor, Integer>() {
private ReducingState<Integer> sumVcState;
@Override
public void open(Configuration parameters) throws Exception {
sumVcState = this
.getRuntimeContext()
.getReducingState(new ReducingStateDescriptor<Integer>("sumVcState", Integer::sum, Integer.class));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<Integer> out) throws Exception {
sumVcState.add(value.getVc());
out.collect(sumVcState.get());
}
})
2、状态后端
说明:
状态的存储、访问以及维护由一个可插入的组件决定,这个组件就叫做状态后端(state backend)
作用
本地状态的管理 将检查点(checkpoint)状态写入远程存储
2.1、分类
MemoryStateBackend
内存级别的状态后端, 存储方式:本地状态存储在JobManager的内存中, checkpoint 存储在JobManager的内存中。 特点:快速,低延迟,但不稳定 使用场景:
- 本地测试
- 几乎无状态的作业(ETL)
- JobManager不容易挂,或者挂了影响不大.
- 不推荐在生产环境下使用
FsStateBackend
存储方式:本地状态在JobManager内存,Checkpoint存储在文件系统中 特点:拥有内存级别的本地访问速度,和更好的容错保证 使用场景:
- 常规使用状态的作业.例如分钟级别窗口聚合, join等
- 需要开启HA的作业 3. 可以应用在生产环境中
RocksDBStateBackend
将所有的状态序列化之后,存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储) 存储方式:
- 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘)
- Checkpoint在外部文件系统中.
使用场景:
- 超大状态的作业,例如天级的窗口聚合
- 需要开启HA的作业
- 对读写状态性能要求不高的作业
- 可以使用在生产环境
2.2、配置
- 全局配置
在flink-conf.yaml文件中设置默认的全局后端
- 代码配置
MemoryStateBackend
env.setStateBackend(new MemoryStateBackend());
FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/checkpoints/fs"));
RocksDBStateBackend
//先引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints/rocksdb"));
3、状态容错
3.1、状态一致性
1 Flink中的检查点,保存的是所有任务状态的快照?
这个状态要求是所有任务都处理同一数据之后的状态。
2 Flink Checkpoint算法?
基于Chandy-Lamport(昌迪-兰伯特)算法的分布式快照。
3 flink checkpoint中重要的概念?
barrier用于分隔不同的chenkpoint,对于每个任务而言。收到的barrier就意味着要开始做state的保存,算子中需要对不同上游分区发来的barrier,进行对齐。
4. checkpoint存储位置,由state backend(状态后端)决定?
一般是放在远程持久化存储空间,jobmanager触发一个checkpoint操作,会把checkpoint中所有状态的拓扑结构保存下来。
5. barrier和watermark类似,都可以看作一个插入数据流中的特殊数据结构
barrier在数据处理上跟watermark是两套机制,完全没有关系。
3.2、端到端一致性
--端到端状态一致
source端 —— 需要外部源可重设数据的读取位置(重发数据)
flink内部 —— 依赖checkpoint
sink端 —— 需要保证从故障恢复时,数据不会重复写入外部系统
而对于sink端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。
--幂等(Idempotent)
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改(flink+es)
--事务性(Transactional)
预写日志(WAL)->(里面存的是操作命令)和两阶段提交(2PC)->这里的事务由外部系统提供(flink+kafka)。
--sink到文件,怎么使用2pc?
1 开始事务:创建临时文件
2 预提交:往临时文件里写数据
3 正式提交:把临时文件更名为正式文件的名字
4 回滚:删除临时文件
--当为kafka数据源时,怎么保证一致性?
1 kafka的offset保存在Source算子的状态,此时kafka不知道offset更新到哪,我们setCommitOffsetsOnCheckpoints设为 true, 会同步一份给kafka,就可以看到了(详细配置见source kafka代码)
具体的两阶段提交步骤总结如下:
1) 第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
2) jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager
3) sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
4) jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
5) sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
6) 外部kafka关闭事务,提交的数据可以正常消费了。
3.3、savepoint和checkpoint区别
四、监控
1、Flink1.13反压监控和优化
1.1、指标
任务(subtask)的每个并行实例都公开一组三个指标.
backPressureTimeMsPerSecond,子任务花费的时间
idleTimeMsPerSecond,子任务等待处理数据所花费的时间
busyTimeMsPerSecond,子任务忙于执行一些实际工作的时间在任何时间点,
这三个指标的总和大约为1000ms。
1.2、例子
可以发现 DAG 图中出现了好几种颜色,空闲状态为蓝色,完全反压状态为黑色,完全忙碌状态为红色。所有介于两者之间的值都用这三种颜色之间的阴影来表示。
从上图可以看出,颜色最重的是 Map 算子(倒数第二个),最后一个 window 算子显示红色,说明反压最严重的地方是 Map,所以真正出现反压的位置是 window 算子的 processfunction 方法(最后一个算子)因为 processfunction 处理数据比较慢从而导致了 map 反压,然后逐级向上反压,直到数据源 source 出现反压.这样根据 DAG 的颜色就能判断出反压的位置,不用在去看 BackPressure ,inPoolUsage ,outPoolUsage 指标,更加的简便直观. 。
1.3、实际遇到问题
对于状态为 OK 的 subtask,没有反压的迹象。另一方面,high 表示 subtask 受到反压力。状态通过以下方式定义:
OK: 0% <= back pressured <= 10%
LOW: 10% < back pressured <= 50%
HIGH: 50% < back pressured <= 100%
由上图发现,反压是出现在前面个流程,导致反压是因为异步算子的原因。由于是异步同步到es,估计是es字段类型不匹配原因。
看日志,是否有字段类型不匹配,是否有脏数据。 看CPU使用情况,集群是否异构 客户端是怎样的配置?使用的bulk 还是单条插入 查看线程堆栈,查看耗时最久的方法调用 确定集群类型:ToB还是ToC,是否允许有少量数据丢失? 针对ToB等实时性不高的集群减少副本增加刷新时间 index buffer优化 translog优化,滚动重启集群
1.4、Credit 的反压机制
指系统能够自己检测到被阻塞的 Operator,然后自适应地降低源头或上游数据的发送速率,从而维持整个系统的稳定。Flink 任务一般运行在多个节点上,数据从上游算子发送到下游算子需要网络传输,若系统在反压时想要降低数据源头或上游算子数据的发送速率,那么肯定也需要网络传输。
五、窗口函数
1、概述
是一种切割无限数据为有限块进行处理的手段。将无限的数据放进“桶”进行计算。
2、API
2.1、TimeWindow
TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算
滚动窗口
.keyBy(data -> data.f0) //必须先做keyby
.timeWindow( Time.seconds(15) ) //调用窗口
.minBy(1)//后面必须跟上窗口函数
滑动窗口(SlidingEventTimeWindows)
.keyBy(SensorReading::getId)
.timeWindow( Time.seconds(15), Time.seconds(5) )
.minBy("temperature");
CountWindow
CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。
.keyBy(SensorReading::getId)
.countWindow( 5 )
.minBy("temperature");
2.2、window function
- 增量聚合函数(incremental aggregation functions)
每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有
例子:ReduceFunction, AggregateFunction
- 全窗口函数(full window functions)
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
例子:ProcessWindowFunction、WindowFunction就是一个全窗口函数
底层里面有很多窗口的信息。
六、调优
1、参数调优
1.1、最大并行度
每个 Flink 作业都有一个名为最大并行度(Maximum Parallelism,简称 MaxParallelism)的属性,它决定了 Flink 作业无损扩容的上限。 Maximum Parallelism 参数涉及到 Flink 最底层的状态分配逻辑,因此一旦设定,就不允许随意更改。如果一定要修改该值(例如希望扩容到超过 MaxParallelism 的 CU 数),那么 Flink 就只能丢弃现有运行时状态,重新开始。 换句话说,如果对 Flink 作业作业进行快照(例如触发 Checkpoint、Savepoint 等),那么从这个快照恢复时,新指定的算子最大并行度不能超过这个值,否则 Flink 会抛出异常并终止启动。如果不从快照启动作业,则 Flink 仍可正常启动。
七、FlinkSql
1、connector
1.1、JDBC
a)说明
JDBC SQL Connector JDBC 连接器可以让 Flink 程序从拥有 JDBC 驱动的任意关系型数据库中读取数据或将数据写入数据库。 如果在 Flink SQL 表的 DDL 语句中定义了主键,则会以 upsert 模式将流中数据写入数据库,此时流中可以存在 UPDATE/DElETE(更新/删除)类型的数据。否则,会以 append 模式将数据写出到数据库,此时流中只能有 INSERT(插入)类型的数据。
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
b)Lookup Cache
JDBC 连接器可以作为时态表关联中的查询数据源(又称维表)。目前,仅支持同步查询模式。 默认情况下,查询缓存(Lookup Cache)未被启用,需要设置 lookup.cache.max-rows 和 lookup.cache.ttl 参数来启用此功能。 Lookup 缓存是用来提升有 JDBC 连接器参与的时态关联性能的。默认情况下,缓存未启用,所有的请求会被发送到外部数据库。当缓存启用时,每个进程(即 TaskManager)维护一份缓存。收到请求时,Flink 会先查询缓存,如果缓存未命中才会向外部数据库发送请求,并用查询结果更新缓存。如果缓存中的记录条数达到了 lookup.cache.max-rows 规定的最大行数时将清除存活时间最久的记录。如果缓存中的记录存活时间超过了 lookup.cache.ttl 规定的最大存活时间,同样会被清除。 缓存中的记录未必是最新的,可以将 lookup.cache.ttl 设置为一个更小的值来获得时效性更好的数据,但这样做会增加发送到数据库的请求数量。所以需要在吞吐量和正确性之间寻求平衡。
c)Lookup Join
Lookup Join 通常在 Flink SQL 表和外部系统查询结果关联时使用。这种关联要求一张表(主表)有处理时间字段,而另一张表(维表)由 Lookup 连接器生成。 Lookup Join 做的是维度关联,而维度数据是有时效性的,那么我们就需要一个时间字段来对数据的版本进行标识。因此,Flink 要求我们提供处理时间用作版本字段。 此处选择调用 PROCTIME() 函数获取系统时间,将其作为处理时间字段。该函数调用示例如下
tableEnv.sqlQuery("select PROCTIME() proc_time")
.execute()
.print();
// 结果
+----+-------------------------+
| op | proc_time |
+----+-------------------------+
| +I | 2022-04-09 15:45:50.752 |
+----+-------------------------+
1 row in set
d)JDBC SQL Connector 参数解读
Ø connector:连接器类型,此处为 jdbc Ø url:数据库 url Ø table-name:数据库中表名 Ø lookup.cache.max-rows:lookup 缓存中的最大记录条数 Ø lookup.cache.ttl:lookup 缓存中每条记录的最大存活时间 Ø username:访问数据库的用户名 Ø password:访问数据库的密码 Ø driver:数据库驱动,注意:通常注册驱动可以省略,但是自动获取的驱动是com.mysql.jdbc.Driver,Flink CDC 2.1.0 要求 mysql 驱动版本必须为8 及以上,在 mysql-connector -8.x 中该驱动已过时,新的驱动为com.mysql.cj.jdbc.Driver。省略该参数控制台打印的警告如下
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
1.2、Kafka
a)left join和撤回流
假设 A 表作为主表与 B 表做等值左外联。当 A 表数据进入算子,而 B 表数据未至时会先生成一条 B 表字段均为 null 的关联数据ab1,其标记为 +I。其后,B 表数据到来,会先将之前的数据撤回,即生成一条与 ab1 内容相同,但标记为 -D 的数据,再生成一条关联后的数据,标记为 +I。这样生成的动态表对应的流称之为回撤流。
b)Kafka SQL Connector
Kafka SQL Connector 分为 Kafka SQL Connector 和 Upsert Kafka SQL Connector
功能
Upsert Kafka Connector支持以 upsert 方式从 Kafka topic 中读写数据
Kafka Connector支持从 Kafka topic 中读写数据
区别
建表语句的主键
i)Kafka Connector 要求表不能有主键,如果设置了主键,报错信息如下 Caused by: org.apache.flink.table.api.ValidationException: The Kafka table ‘default_catalog.default_database.normal_sink_topic’ with ‘json’ format doesn’t support defining PRIMARY KEY constraint on the table, because it can’t guarantee the semantic of primary key.
ii)而 Upsert Kafka Connector 要求表必须有主键,如果没有设置主键,报错信息如下 Caused by: org.apache.flink.table.api.ValidationException: ‘upsert-kafka’ tables require to define a PRIMARY KEY constraint. The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. The PRIMARY KEY also defines records in the ‘upsert-kafka’ table should update or delete on which keys.
iii)语法: primary key(id) not enforced 注意:not enforced 表示不对来往数据做约束校验,Flink 并不是数据的主人,因此只支持 not enforced 模式 如果没有 not enforced,报错信息如下 Exception in thread “main” org.apache.flink.table.api.ValidationException: Flink doesn’t support ENFORCED mode for PRIMARY KEY constaint. ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode
对表中数据操作类型的要求
i)Kafka Connector 不能消费带有 Upsert/Delete 操作类型数据的表,如 left join 生成的动态表。如果对这类表进行消费,报错信息如下 Exception in thread “main” org.apache.flink.table.api.TableException: Table sink ‘default_catalog.default_database.normal_sink_topic’ doesn’t support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_9]], fields=[l_id, tag_left, tag_right])]
ii)Upsert Kafka Connector 将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,因此同一主键的更新/删除消息将落在同一分区,从而保证同一主键的消息有序。
参数解读
数据的操作类型均为 INSERT,所以读取数据使用 Kafka Connector 即可。而由于 left join 的存在,流中存在修改数据,所以写出数据使用 Upsert Kafka Connector。
Kafka Connector 参数
Ø connector:指定使用的连接器,对于 Kafka,只用 ‘kafka’ Ø topic:主题 Ø properties.bootstrap.servers:以逗号分隔的 Kafka broker 列表。注意:可以通过 properties. 的方式指定配置项,的位置用 Kafka 官方规定的配置项的 key 替代。并不是所有的配置都可以通过这种方式配置,因为 Flink 可能会将它们覆盖,如:’key.deserializer’ 和’value.deserializer’ Ø properties.group.id:消费者组 ID Ø format:指定 Kafka 消息中 value 部分的序列化的反序列化方式,’format’ 和’value.format’ 二者必有其一 Ø scan.startup.mode:Kafka 消费者启动模式,有四种取值 Ø ‘earliest-offset’:从偏移量最早的位置开始读取数据 Ø ‘latest-offset’:从偏移量最新的位置开始读取数据 Ø ‘group-offsets’:从 Zookeeper/Kafka broker 中维护的消费者组偏移量开始读取数据 Ø ‘timestamp’:从用户为每个分区提供的时间戳开始读取数据 Ø ‘specific-offsets’:从用户为每个分区提供的偏移量开始读取数据 默认值为 group-offsets。要注意:latest-offset 与 Kafka 官方提供的配置项 latest 不同, Flink 会将偏移量置为最新位置,覆盖掉 Zookeeper 或 Kafka 中维护的偏移量。与官方提供的 latest 相对应的是此处的 group-offsets。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv' -- json
)
Upsert Kafka Connector 参数
Ø connector:指定使用的连接器,对于 Upsert Kafka,使用 ‘upsert-kafka’ Ø topic:主题 Ø properties.bootstrap.servers:以逗号分隔的 Kafka broker 列表 Ø key.format:key 的序列化和反序列化格式 Ø value.format:value 的序列化和反序列化格式
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);
2、流和表之间的转换
a)知识点(left join例子)
Flinksql
Table joinTable = tableEnv.sqlQuery(
"select \n" +
"l.id l_id,\n" +
"l.tag l_tag,\n" +
"r.tag r_tag\n" +
"from left l \n" +
"left join \n" +
"right r \n" +
"on l.id = r.id"
);
实体类
@Data
public class JoinBean {
String l_id;
String tag_left;
String tag_right;
}
Flink SQL 的 Table 类型变量转化为 DataStream 有四类 API(表转流)
① toAppendStream ② toDataStream ③ toChangelogStream ④ toRetractStream
其中,Table 变量中包含更新和删除数据时调用 ① 和 ② 转化为流会报错,报错信息如下
如下 // toAppendStream 报错信息 toAppendStream doesn’t support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, tag, id0, tag0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) // toDataStream 报错信息 Table sink ‘default_catalog.default_database.Unregistered_DataStream_Sink_2’ doesn’t support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, tag, id0, tag0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
调用 ③ 和 ④ 可以将包含更新和删除数据的表转化为流。需要指定表结构时用法如下
-- toChangelogStream
DataStream<Row> changelogStream = tableEnv.toChangelogStream(joinTable, Schema.newBuilder()
.column("l_id", "STRING")
.column("tag_left", "STRING")
.column("tag_right", "STRING")
.build());
-- toRetractStream
DataStream<Tuple2<Boolean, JoinBean>> retractS = tableEnv.toRetractStream(joinTable, JoinBean.class);
-- 结果
changelogStream>>>> +I[A, left, null]
changelogStream>>>> -D[A, left, null]
changelogStream>>>> +I[A, left, right]
changelogStream>>>> +I[B, left, null]
changelogStream>>>> -D[B, left, null]
changelogStream>>>> +I[B, left, right]
changelogStream>>>> +I[C, left, null]
changelogStream>>>> -D[C, left, null]
changelogStream>>>> +I[C, left, right]
retractS> (true,JoinBean(l_id=A, tag_left=left, tag_right=null))
retractS> (false,JoinBean(l_id=A, tag_left=left, tag_right=null))
retractS> (true,JoinBean(l_id=A, tag_left=left, tag_right=right))
retractS> (true,JoinBean(l_id=B, tag_left=left, tag_right=null))
retractS> (false,JoinBean(l_id=B, tag_left=left, tag_right=null))
retractS> (true,JoinBean(l_id=B, tag_left=left, tag_right=right))
retractS> (true,JoinBean(l_id=C, tag_left=left, tag_right=null))
retractS> (false,JoinBean(l_id=C, tag_left=left, tag_right=null))
retractS> (true,JoinBean(l_id=C, tag_left=left, tag_right=right))
将流转化为动态表
① 目前版本 Flink 只提供了两种 API Ø fromChangelogStream Ø fromDataStream 应用场景 a)fromDataStream 不可用于包含删除和更新数据的流向 Table 的转化,否则报错,报错信息如下 Error during input conversion. Conversion expects insert-only records but DataStream API record contains: DELETE
b)fromChangelogStream 可用于包含删除和更新数据流向 Table 的转化
Table changelogTable = tableEnv.fromChangelogStream(
changelogStream,
Schema.newBuilder()
.column("l_id", "STRING")
.column("tag_left", "STRING")
.column("tag_right", "STRING")
.build()
);
3、Flink Table 的三种 Sink 模式
八、常用算子说明
1、多流聚合算子
a)union
用于两条及多条流之间的合并,对流的数量没有限制,但是要求所有流中的数据结构完全一致。
b)connect
用于两条流的合并,其后紧邻的 process 算子中可以使用的 CoProcessFunction 是双流处理最底层的 API,可以通过键控状态和定时器的运用实现join、广播join、段join等各种关联。connect() 只能对两条流做关联,且对两条流的数据结构没有要求。
c)intervalJoin
段 join,两条流的每一条数据都可以与另一条流某个时间范围内的数据做关联。底层实现原理:以 A.intervalJoin(B) 为例,A 流中的数据进入算子后,会被保存到键控状态中,同时注册一个定时器,定时器触发时清空 A 流状态中的数据。在定时器触发之前,B 流中的每一条数据都可以与状态中保存的 A 流数据关联。同理,B 流中也维护了状态定时器。由此实现了段 join。假定A流中的定时器存在时长为3s,B流中的定时器存在时长为5s,A 流中某条数据抵达时间为 tA,可与 tA – 5s ~ tA + 3s 时间范围内抵达的 B 流数据关联;B 流中某条数据抵达时间为 tB,可与 tB – 3s ~ tB + 5s 时间范围内抵达的 A 流数据关联。
d)join
该算子的功能可以被其它算子替代,目前基本不用。
总结
connect()、intervalJoin()、join() 都是双流合并算子,本节对三条流进行合并,且流中数据结构一致,选择 union() 更为合理。
2、异步IO
a)说明
Flink 在1.2中引入了Async I/O,将IO操作异步化。在异步模式下,单个并行子任务可以连续发送多个请求,按照返回的先后顺序对请求进行处理,发送请求后不需要阻塞式等待,省去了大量的等待时间,大幅提高了流处理效率。 Async I/O 是阿里巴巴贡献给社区的特性,呼声很高,可用于解决与外部系统交互时网络延迟成为系统瓶颈的问题。 异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,因此单个并行子任务可以连续发送多个请求,从而提高并发效率。对于涉及网络IO的操作,可以显著减少因为请求等待带来的性能损耗。