从Flink 1.10开始,flink引入了一组新的tbale connector选项,并同时支持旧op。新版(DynamicTableFactory,DynamicTableSou race/sink)接口上流批、upset-mode等没有分离,接口简化,并只能在blink上用,用来取代旧版。因需自写table connector,因此做个笔记。

目的

  • 理解动态表
  • 梳理Flink table中新旧api语法
  • 新旧Table的source&sink接口设计
  • source和sink建立过程
  • 自定connector

理解动态表

动态表是Flink的Table API和SQL对流数据的支持的核心概念,是一个逻辑概念,其最终目的是在查询语义上流批一体。批对应静态数据,流则随时间变化,随输入表更改不断更新结果。

关系代数/ SQL 流处理
关系(或表)是有界的(多个)元组。 流是无限的元组序列。
对批处理数据(例如,关系数据库中的表)执行的查询可以访问完整的输入数据。 流查询启动时,流查询无法访问所有数据,而必须“等待”以流式传输数据。
批处理查询产生固定大小的结果后终止。 流查询会根据接收到的记录不断更新其结果,并且永远不会完成。

新旧api

引入的新options语法简化,如下:

  • 旧版在DDL中语法

    1. tableEnvironment.executeSql(
    2. "CREATE TABLE MyTable (\n" +
    3. " ... -- declare table schema \n" +
    4. ") WITH (\n" +
    5. " 'connector.type' = '...', -- declare connector specific properties\n" +
    6. -- ConnectorDescriptorValidator.CONNECTOR_TYPE
    7. " ...\n" +
    8. " 'update-mode' = 'append', -- declare update mode\n" +
    9. " 'format.type' = '...', -- declare format specific properties\n" +
    10. " ...\n" +
    11. ")");
  • 新版

    1. CREATE TABLE MyUserTable (
    2. -- declare the schema of the table
    3. `user` BIGINT,
    4. `message` STRING,
    5. `rowtime` TIMESTAMP(3) METADATA FROM 'timestamp', -- use a metadata column to access Kafka's record timestamp
    6. `proctime AS PROCTIME(), -- use a computed column to define a proctime attribute
    7. WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '5' SECOND -- use a WATERMARK statement to define a rowtime attribute
    8. ) WITH (
    9. -- declare the external system to connect to
    10. 'connector' = 'kafka', --不是ConnectorDescriptorValidator.CONNECTOR_TYPE
    11. 'topic' = 'topic_name',
    12. 'scan.startup.mode' = 'earliest-offset',
    13. 'properties.bootstrap.servers' = 'localhost:9092',
    14. 'format' = 'json' -- declare a format for this system
    15. )

接口设计

旧版source和sink, batch stream接口分离。
  • factory接口:

flink 动态表与新旧table source/sink - 图1

  • source/sink接口流批分离,以source为例:

flink 动态表与新旧table source/sink - 图2

  • 在source中支持谓词下推,投影下推等优化接口为:
    • FilterableTableSource
    • ProjectableTableSource


如ParquetTableSource定义:

  1. public class ParquetTableSource implements BatchTableSource<Row>, FilterableTableSource<Row>, ProjectableTableSource<Row> {}

1.11后引入DynamicTableFactory,DynamicTableSource/Sink
  • factory接口

flink 动态表与新旧table source/sink - 图3

  • source/sink接口不再流批分离,接口独立

    sink接口为:DynamicTableSink

    1. source接口为:DynamicTableSource ,下面分出两个实现LookupTableSourceScanTableSource
  • source中支持优化的接口:

    • SupportsPartitionPushDown
    • SupportsProjectionPushDown
    • SupportsLimitPushDown

source和sink建立过程

  • sink

    1. sink在sql语句中即为insert,在SqlNode转Operation过程中,SqlInsert被转化为CatalogSinkModifyOperation。

      1. /** Convert insert into statement. */
      2. private Operation convertSqlInsert(RichSqlInsert insert) {
      3. // Get sink table name.
      4. //略。。。
      5. return new CatalogSinkModifyOperation(
      6. identifier,
      7. query,
      8. insert.getStaticPartitionKVs(),
      9. insert.isOverwrite(),
      10. dynamicOptions);
      11. }
    2. 在flink生成Rel的过程中会初始化获取Sink

      1. private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {
      2. modifyOperation match {
      3. //略。。。
      4. case catalogSink: CatalogSinkModifyOperation =>
      5. val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
      6. val identifier = catalogSink.getTableIdentifier
      7. val dynamicOptions = catalogSink.getDynamicOptions
      8. getTableSink(identifier, dynamicOptions).map {
      9. //略。。
      10. case (table, sink: DynamicTableSink) =>
      11. DynamicSinkUtils.toRel(getRelBuilder, input, catalogSink, sink, table)
      12. } match {
      13. case Some(sinkRel) => sinkRel
      14. case None =>
      15. throw new TableException(s"Sink ${catalogSink.getTableIdentifier} does not exists")
      16. }
      17. //略
      18. }
    3. getTableSink即为获取sink,此处会做新旧api的判断,用来决定是取TableFactory还是DynamicTableFactory

      1. private def getTableSink(
      2. objectIdentifier: ObjectIdentifier,
      3. dynamicOptions: JMap[String, String])
      4. : Option[(CatalogTable, Any)] = {
      5. val lookupResult = JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier))
      6. lookupResult
      7. .map(_.getTable) match {
      8. //略
      9. val isTemporary = lookupResult.get.isTemporary
      10. //此if即是判断connector.type
      11. if (isLegacyConnectorOptions(objectIdentifier, table, isTemporary)) {
      12. val tableSink = TableFactoryUtil.findAndCreateTableSink(//通过spi加载TableFactory
      13. //略。。
      14. )
      15. Option(table, tableSink)
      16. } else {
      17. val tableSink = FactoryUtil.createTableSink(//通过spi加载DynamicTableFactory
      18. //略
      19. )
      20. Option(table, tableSink)
      21. }
      22. case _ => None
      23. }
      24. }
  • source
  1. source在sql语句中即为SqlCreateTable,在SqlNode转Operation过程中,SqlCreateTable被转化为CreateTableOperation。并包含CatalogTable。

    1. /** Convert the {@link SqlCreateTable} node. */
    2. Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
    3. sqlCreateTable.getTableConstraints().forEach(validateTableConstraint);
    4. CatalogTable catalogTable = createCatalogTable(sqlCreateTable);
    5. UnresolvedIdentifier unresolvedIdentifier =
    6. UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
    7. ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
    8. return new CreateTableOperation(
    9. identifier,
    10. catalogTable,
    11. sqlCreateTable.isIfNotExists(),
    12. sqlCreateTable.isTemporary());
    13. }
  2. SqlNode在toRel中,会先生成RelOptTable.这在flink需要经历两个过程

    1. 通过Schema(此Schema具体实现为CatalogCalciteSchema,由flink中为CatalogManagerCalciteSchema生成)转换成CatalogSchemaTable。Schema也需要两层转换,因为有catalog和db层级关系,在CatalogCalciteSchema下面还有DatabaseCalciteSchema。CatalogCalciteSchema是Flink catalog和calcite schema之间的映射,DatabaseCalciteSchema很明显是db级别。表经过这种映射转换成calcite的表。下面CatalogSchemaTable即是calcite形式的表。其构造函数中(result)即catalog中存在的CatalogTable。

      1. public Table getTable(String tableName) {
      2. ObjectIdentifier identifier = ObjectIdentifier.of(catalogName, databaseName, tableName);
      3. return catalogManager
      4. .getTable(identifier)
      5. .map(
      6. result -> {
      7. CatalogBaseTable table = result.getTable();
      8. FlinkStatistic statistic =
      9. getStatistic(result.isTemporary(), table, identifier);
      10. return new CatalogSchemaTable( //继承calcite表
      11. identifier,
      12. result, //
      13. statistic,
      14. catalogManager
      15. .getCatalog(catalogName)
      16. .orElseThrow(IllegalStateException::new),
      17. isStreamingMode);
      18. })
      19. .orElse(null);
      20. }
    2. FlinkCalciteCatalogReader将CatalogSchemaTable转换成flink形式的sourcetable,此处便会判断是否是新旧api接口
      CatalogSourceTable 通过FactoryUtil::createTableSource指定加载DynamicTableSourceFactory
      LegacyCatalogSourceTable通过TableFactoryUtil::findAndCreateTableSource指定加载TableSourceFactory

      1. private static FlinkPreparingTableBase convertCatalogTable(
      2. RelOptSchema relOptSchema,
      3. List<String> names,
      4. RelDataType rowType,
      5. CatalogTable catalogTable,
      6. CatalogSchemaTable schemaTable) {
      7. if (isLegacySourceOptions(catalogTable, schemaTable)) {//connector.type判断
      8. return new LegacyCatalogSourceTable<>(
      9. relOptSchema, names, rowType, schemaTable, catalogTable);
      10. } else {
      11. return new CatalogSourceTable(relOptSchema, names, rowType, schemaTable, catalogTable);
      12. }
      13. }


      最终生成tablesource,以CatalogSourceTable为例

      1. private DynamicTableSource createDynamicTableSource(
      2. FlinkContext context, CatalogTable catalogTable) {
      3. final ReadableConfig config = context.getTableConfig().getConfiguration();
      4. return FactoryUtil.createTableSource(
      5. schemaTable.getCatalog(),
      6. schemaTable.getTableIdentifier(),
      7. catalogTable,
      8. config,
      9. Thread.currentThread().getContextClassLoader(),
      10. schemaTable.isTemporary());
      11. }

自写connector

通过上面我们知道,如果在sql中指定connector.type,则需自写Table(Source/Sink)Factory.如果指定connector,则需自写实现DynamicTable(Source/Sink)Factory

以Flink自带kafka为例。KafkaDynamicTableFactory 实现了DynamicTableSourceFactory,DynamicTableSinkFactory。

  1. public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {}

KafkaDynamicTableFactory创建的source实现了ScanTableSource,通过getScanRuntimeProvider接口即可获得flink运行时的实现。

  • 自写ES5 table connector
    1. ES5 RestHighLevelClient没有实现closeable接口,因此用flink 自带ES5 transport client,pom中引入flink-connector-elasticsearch-base和elasticsearch-connector-elasticsearch5。
    2. ES功能只需写入,实现DynamicTableSinkFactory,重写IDENTIFIER,requiredOptions,optionalOptions,createDynamicTableSink方法。

参考