概述
本文将从以下三部分讲解Flink LookupTable的实现。
1、何为(What)
2、为何(Why)
3、如何(How)
研究任何技术都依次从这三个角度分析:何为、为何、如何。
何为LookupTable
Flink SQL读取数据的一种方式。一般用来做实时数仓维表,与实时的数据进行JOIN。如下图Kafka数据进入Flink后查询MongoDB维表并将拼接(JOIN)后的结果写入MySQL。MongoDB在这次计算任务中的抽象就是LookupTable。
具体使用SQL样例
LookupTable的种类
一共有两种
- 同步LookupTable,串行逐条查询数据。
- 异步LookupTable,并发查询数据。可以看下图,一段时间内因为采用了异步,吞吐量提升了。
同步和异步分别对应代码中的TableFunction类和AsyncTableFunction类。
本文将以AsyncTableFunction举例讲解。
为何使用LookupTable
张学友演唱会抓逃犯
可以将参加演唱会人员的数据实时发送给Flink,与数据库里的逃犯数据进行对比。
寻找被拐儿童
可以将城市各处摄像头人脸数据实时发送给Flink,模型识别此人为被拐儿童,查询数据库中所属地区警方数据,自动向警方报警。
如何实现LookupTable
Flink提交任务流程
在编写Flink 自定义的Connector时一定要清楚Flink任务提交的流程。
提交结构图
在编写自己的LookupTable时,一定要清楚哪些代码是在Client端执行,也就是图中最左边节点。哪些是在最右端的TaskManager Slot节点执行。如果不清晰很容易写出Bug。哪些方法是在Slot中执行,可以通过看Java文档注释,或者远程Debug TaskMananger分析出来。当然还有些特殊的方法可能在JobManager节点执行。
因为以上原因,所以在实现LookupTable之前,才先让大家熟悉下上面的Flink提交任务结构图。
实现LookupTable步骤
步骤简介
就如同把大象放冰箱中一样,只需三步。
- 解析SQL
- 读取数据
- 转换数据
其中第一步解析SQL在Client端执行,第二第三步处理数据在TaskManager中执行。
第一步简介
第一步:根据Flink给予标准接口和工具类,解析下图的SQL语句。
第一步启动时SQL解析流程图
后续只需要实现下图中紫色块和蓝色块中的接口,即可完成第一步解析SQL。
第二步、第三步示意图
这一步我们只需要根据数据源类库、Table API、DataStreamAPI去实现即可。最终目的是把数据库中的数据转换成Flink的RowData类型。
具体代码讲解
具体代码可以参考FlinkX中各个Connector的实现。项目地址:https://github.com/DTStack/flinkx
一下的介绍可以参考FlinkX中Connector源码,边看文章边对照源码。多调试几次即可熟悉。
1、解析SQL
主要两个接口DynamicTableSourceFactory和LookupTableSource。
- DynamicTableSourceFactory 接口作用是解析建表DDL。
- LookupTableSource接口实现对 JOIN中等式信息的解析。
DynamicTableSourceFactory接口方法介绍
- createDynamicTableSource():实现具体的解析、校验DDL中参数的逻辑。
- factoryIdentifier():设置DDL中’connector’选项,用来不同connector,所以不要和其他connector重名。
- requiredOptions():设置DDL中必选参数
- optionalOptions():设置DDL中可选参数
LookupTableSource接口方法介绍
- getLookupRuntimeProvider():获取JOIN等式中的字段名,并通过LookupRuntimeProvider初始化一个AsyncTableFunction类。代码编写到LookupRuntimeProvider,就说明所有在Client端执行的启动时逻辑已经编写完成。
2、读取数据
- AsyncTableFunction
- HBase Connector中的org.apache.flink.connector.hbase.util.HBaseSerde
- JDBC Connector中的org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter
模仿其实现即可。
技术要点总结
- 理清Connector代码中那些是启动时执行,那些是运行时执行。换句话说,那些在Client端执行,那些在TaskManager的Slot端执行。
- 根据Flink SQL中官方接口,去解析并获取所有DDL中配置,以及DML中JOIN信息。
- 解析好所有SQL信息后,通过实现AsyncTableFunction和自定义的RowDataConverter,来把数据库中的数据转化成Flink的RowData类型发送给下游即可。