概述

本文将从以下三部分讲解Flink LookupTable的实现。
1、何为(What)
2、为何(Why)
3、如何(How)
研究任何技术都依次从这三个角度分析:何为、为何、如何。

何为LookupTable

Flink SQL读取数据的一种方式。一般用来做实时数仓维表,与实时的数据进行JOIN。如下图Kafka数据进入Flink后查询MongoDB维表并将拼接(JOIN)后的结果写入MySQL。MongoDB在这次计算任务中的抽象就是LookupTable。
image.png

具体使用SQL样例

image.png

LookupTable的种类

一共有两种

  1. 同步LookupTable,串行逐条查询数据。
  2. 异步LookupTable,并发查询数据。可以看下图,一段时间内因为采用了异步,吞吐量提升了。

同步和异步分别对应代码中的TableFunction类和AsyncTableFunction类。
本文将以AsyncTableFunction举例讲解。

image.png

为何使用LookupTable

这个是实时计算基础功能之一。

张学友演唱会抓逃犯

可以将参加演唱会人员的数据实时发送给Flink,与数据库里的逃犯数据进行对比。
image.png

寻找被拐儿童

可以将城市各处摄像头人脸数据实时发送给Flink,模型识别此人为被拐儿童,查询数据库中所属地区警方数据,自动向警方报警。
image.png

如何实现LookupTable

Flink提交任务流程

在编写Flink 自定义的Connector时一定要清楚Flink任务提交的流程。
提交结构图
image.png

在编写自己的LookupTable时,一定要清楚哪些代码是在Client端执行,也就是图中最左边节点。哪些是在最右端的TaskManager Slot节点执行。如果不清晰很容易写出Bug。哪些方法是在Slot中执行,可以通过看Java文档注释,或者远程Debug TaskMananger分析出来。当然还有些特殊的方法可能在JobManager节点执行。
因为以上原因,所以在实现LookupTable之前,才先让大家熟悉下上面的Flink提交任务结构图。

实现LookupTable步骤

步骤简介

就如同把大象放冰箱中一样,只需三步。

  1. 解析SQL
  2. 读取数据
  3. 转换数据

其中第一步解析SQL在Client端执行,第二第三步处理数据在TaskManager中执行。

第一步简介

第一步:根据Flink给予标准接口和工具类,解析下图的SQL语句。
image.png

第一步启动时SQL解析流程图

后续只需要实现下图中紫色块和蓝色块中的接口,即可完成第一步解析SQL。
image.png

第二步、第三步示意图

这一步我们只需要根据数据源类库、Table API、DataStreamAPI去实现即可。最终目的是把数据库中的数据转换成Flink的RowData类型。
image.png

具体代码讲解

具体代码可以参考FlinkX中各个Connector的实现。项目地址:https://github.com/DTStack/flinkx
一下的介绍可以参考FlinkX中Connector源码,边看文章边对照源码。多调试几次即可熟悉。

1、解析SQL

主要两个接口DynamicTableSourceFactory和LookupTableSource。

  1. DynamicTableSourceFactory 接口作用是解析建表DDL。
  2. LookupTableSource接口实现对 JOIN中等式信息的解析。

DynamicTableSourceFactory接口方法介绍

  • createDynamicTableSource():实现具体的解析、校验DDL中参数的逻辑。
  • factoryIdentifier():设置DDL中’connector’选项,用来不同connector,所以不要和其他connector重名。
  • requiredOptions():设置DDL中必选参数
  • optionalOptions():设置DDL中可选参数

LookupTableSource接口方法介绍

  • getLookupRuntimeProvider():获取JOIN等式中的字段名,并通过LookupRuntimeProvider初始化一个AsyncTableFunction类。代码编写到LookupRuntimeProvider,就说明所有在Client端执行的启动时逻辑已经编写完成。

2、读取数据

  • AsyncTableFunction
    • open():开启数据库连接
    • eval():根据传入的Key,异步查询远端数据库。
    • close():关闭数据库连接

      3、转换数据

      RowDataConverter:这个类的目的是将数 据源类库返回的类型如Mongodb的Document类型或JDBC的ResultSet类型转换成Flink SQL标准的RowData类型。从数据层面与Flink SQL Runtime对接。
      这个官方没有标准接口我们可以参考两个类
  1. HBase Connector中的org.apache.flink.connector.hbase.util.HBaseSerde
  2. JDBC Connector中的org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter

模仿其实现即可。

技术要点总结

  1. 理清Connector代码中那些是启动时执行,那些是运行时执行。换句话说,那些在Client端执行,那些在TaskManager的Slot端执行。
  2. 根据Flink SQL中官方接口,去解析并获取所有DDL中配置,以及DML中JOIN信息。
  3. 解析好所有SQL信息后,通过实现AsyncTableFunction和自定义的RowDataConverter,来把数据库中的数据转化成Flink的RowData类型发送给下游即可。