从Flink 1.10开始,flink引入了一组新的tbale connector选项,并同时支持旧op。新版(DynamicTableFactory,DynamicTableSou race/sink)接口上流批、upset-mode等没有分离,接口简化,并只能在blink上用,用来取代旧版。因需自写table connector,因此做个笔记。
目的
- 理解动态表
- 梳理Flink table中新旧api语法
- 新旧Table的source&sink接口设计
- source和sink建立过程
- 自定connector
理解动态表
动态表是Flink的Table API和SQL对流数据的支持的核心概念,是一个逻辑概念,其最终目的是在查询语义上流批一体。批对应静态数据,流则随时间变化,随输入表更改不断更新结果。
| 关系代数/ SQL | 流处理 |
|---|---|
| 关系(或表)是有界的(多个)元组。 | 流是无限的元组序列。 |
| 对批处理数据(例如,关系数据库中的表)执行的查询可以访问完整的输入数据。 | 流查询启动时,流查询无法访问所有数据,而必须“等待”以流式传输数据。 |
| 批处理查询产生固定大小的结果后终止。 | 流查询会根据接收到的记录不断更新其结果,并且永远不会完成。 |
新旧api
引入的新options语法简化,如下:
旧版在DDL中语法
tableEnvironment.executeSql("CREATE TABLE MyTable (\n" +" ... -- declare table schema \n" +") WITH (\n" +" 'connector.type' = '...', -- declare connector specific properties\n" +-- 是ConnectorDescriptorValidator.CONNECTOR_TYPE" ...\n" +" 'update-mode' = 'append', -- declare update mode\n" +" 'format.type' = '...', -- declare format specific properties\n" +" ...\n" +")");
新版
CREATE TABLE MyUserTable (-- declare the schema of the table`user` BIGINT,`message` STRING,`rowtime` TIMESTAMP(3) METADATA FROM 'timestamp', -- use a metadata column to access Kafka's record timestamp`proctime AS PROCTIME(), -- use a computed column to define a proctime attributeWATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '5' SECOND -- use a WATERMARK statement to define a rowtime attribute) WITH (-- declare the external system to connect to'connector' = 'kafka', --不是ConnectorDescriptorValidator.CONNECTOR_TYPE'topic' = 'topic_name','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'localhost:9092','format' = 'json' -- declare a format for this system)
接口设计
旧版source和sink, batch stream接口分离。
- factory接口:

- source/sink接口流批分离,以source为例:

- 在source中支持谓词下推,投影下推等优化接口为:
- FilterableTableSource
- ProjectableTableSource
如ParquetTableSource定义:
public class ParquetTableSource implements BatchTableSource<Row>, FilterableTableSource<Row>, ProjectableTableSource<Row> {}
1.11后引入DynamicTableFactory,DynamicTableSource/Sink
- factory接口

source/sink接口不再流批分离,接口独立
sink接口为:DynamicTableSink
source接口为:DynamicTableSource ,下面分出两个实现LookupTableSource和ScanTableSource
source中支持优化的接口:
- SupportsPartitionPushDown
- SupportsProjectionPushDown
- SupportsLimitPushDown
source和sink建立过程
sink
sink在sql语句中即为insert,在SqlNode转Operation过程中,SqlInsert被转化为CatalogSinkModifyOperation。
/** Convert insert into statement. */private Operation convertSqlInsert(RichSqlInsert insert) {// Get sink table name.//略。。。return new CatalogSinkModifyOperation(identifier,query,insert.getStaticPartitionKVs(),insert.isOverwrite(),dynamicOptions);}
在flink生成Rel的过程中会初始化获取Sink
private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {modifyOperation match {//略。。。case catalogSink: CatalogSinkModifyOperation =>val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()val identifier = catalogSink.getTableIdentifierval dynamicOptions = catalogSink.getDynamicOptionsgetTableSink(identifier, dynamicOptions).map {//略。。case (table, sink: DynamicTableSink) =>DynamicSinkUtils.toRel(getRelBuilder, input, catalogSink, sink, table)} match {case Some(sinkRel) => sinkRelcase None =>throw new TableException(s"Sink ${catalogSink.getTableIdentifier} does not exists")}//略}
getTableSink即为获取sink,此处会做新旧api的判断,用来决定是取TableFactory还是DynamicTableFactory
private def getTableSink(objectIdentifier: ObjectIdentifier,dynamicOptions: JMap[String, String]): Option[(CatalogTable, Any)] = {val lookupResult = JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier))lookupResult.map(_.getTable) match {//略val isTemporary = lookupResult.get.isTemporary//此if即是判断connector.typeif (isLegacyConnectorOptions(objectIdentifier, table, isTemporary)) {val tableSink = TableFactoryUtil.findAndCreateTableSink(//通过spi加载TableFactory//略。。)Option(table, tableSink)} else {val tableSink = FactoryUtil.createTableSink(//通过spi加载DynamicTableFactory//略)Option(table, tableSink)}case _ => None}}
- source
source在sql语句中即为SqlCreateTable,在SqlNode转Operation过程中,SqlCreateTable被转化为CreateTableOperation。并包含CatalogTable。
/** Convert the {@link SqlCreateTable} node. */Operation convertCreateTable(SqlCreateTable sqlCreateTable) {sqlCreateTable.getTableConstraints().forEach(validateTableConstraint);CatalogTable catalogTable = createCatalogTable(sqlCreateTable);UnresolvedIdentifier unresolvedIdentifier =UnresolvedIdentifier.of(sqlCreateTable.fullTableName());ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);return new CreateTableOperation(identifier,catalogTable,sqlCreateTable.isIfNotExists(),sqlCreateTable.isTemporary());}
SqlNode在toRel中,会先生成RelOptTable.这在flink需要经历两个过程
通过Schema(此Schema具体实现为CatalogCalciteSchema,由flink中为CatalogManagerCalciteSchema生成)转换成CatalogSchemaTable。Schema也需要两层转换,因为有catalog和db层级关系,在CatalogCalciteSchema下面还有DatabaseCalciteSchema。CatalogCalciteSchema是Flink catalog和calcite schema之间的映射,DatabaseCalciteSchema很明显是db级别。表经过这种映射转换成calcite的表。下面CatalogSchemaTable即是calcite形式的表。其构造函数中(result)即catalog中存在的CatalogTable。
public Table getTable(String tableName) {ObjectIdentifier identifier = ObjectIdentifier.of(catalogName, databaseName, tableName);return catalogManager.getTable(identifier).map(result -> {CatalogBaseTable table = result.getTable();FlinkStatistic statistic =getStatistic(result.isTemporary(), table, identifier);return new CatalogSchemaTable( //继承calcite表identifier,result, //statistic,catalogManager.getCatalog(catalogName).orElseThrow(IllegalStateException::new),isStreamingMode);}).orElse(null);}
FlinkCalciteCatalogReader将CatalogSchemaTable转换成flink形式的sourcetable,此处便会判断是否是新旧api接口
CatalogSourceTable 通过FactoryUtil::createTableSource指定加载DynamicTableSourceFactory
LegacyCatalogSourceTable通过TableFactoryUtil::findAndCreateTableSource指定加载TableSourceFactoryprivate static FlinkPreparingTableBase convertCatalogTable(RelOptSchema relOptSchema,List<String> names,RelDataType rowType,CatalogTable catalogTable,CatalogSchemaTable schemaTable) {if (isLegacySourceOptions(catalogTable, schemaTable)) {//connector.type判断return new LegacyCatalogSourceTable<>(relOptSchema, names, rowType, schemaTable, catalogTable);} else {return new CatalogSourceTable(relOptSchema, names, rowType, schemaTable, catalogTable);}}
最终生成tablesource,以CatalogSourceTable为例private DynamicTableSource createDynamicTableSource(FlinkContext context, CatalogTable catalogTable) {final ReadableConfig config = context.getTableConfig().getConfiguration();return FactoryUtil.createTableSource(schemaTable.getCatalog(),schemaTable.getTableIdentifier(),catalogTable,config,Thread.currentThread().getContextClassLoader(),schemaTable.isTemporary());}
自写connector
通过上面我们知道,如果在sql中指定connector.type,则需自写Table(Source/Sink)Factory.如果指定connector,则需自写实现DynamicTable(Source/Sink)Factory
以Flink自带kafka为例。KafkaDynamicTableFactory 实现了DynamicTableSourceFactory,DynamicTableSinkFactory。
public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {}
KafkaDynamicTableFactory创建的source实现了ScanTableSource,通过getScanRuntimeProvider接口即可获得flink运行时的实现。
- 自写ES5 table connector
- ES5 RestHighLevelClient没有实现closeable接口,因此用flink 自带ES5 transport client,pom中引入flink-connector-elasticsearch-base和elasticsearch-connector-elasticsearch5。
- ES功能只需写入,实现DynamicTableSinkFactory,重写IDENTIFIER,requiredOptions,optionalOptions,createDynamicTableSink方法。
