用户定义的 Sources 和 Sinks

TableSource 提供对存储在外部系统(数据库、键值存储、消息队列)或文件中的数据的访问。TableSource 在 TableEnvironment 中注册后可以通过Table APISQL查询访问它。

TableSink发送 Table到外部存储系统,如数据库、键值存储、消息队列或文件系统(以不同的编码方式,如 CSV、Parquet 或 ORC)。

TableFactory 允许将与外部系统的连接声明与实际实现分离。表工厂从规范化的、基于字符串的属性创建表源(sources)和表接收器(sinks)的已配置实例。可以使用描述符(Descriptor)以编程方式生成属性,也可以通过SQL Client的YAML配置文件生成属性。

查看常见概念和API页面,了解如何注册TableSource以及如何通过 TableSink 发出 Table的详细信息。有关如何使用工厂的示例,请参阅内置源,接收器和格式页面。

定义一个表源(TableSource)

TableSource 是一个通用接口,允许 Table API 和 SQL 查询访问存储在外部系统中的数据。它提供了表的模式以及映射到具有表模式的行的记录。根据是否在流式或批量查询中使用 TableSource,记录将作为 DataSetDataStream 生成。

如果在流查询中使用 TableSource,它必须实现 StreamTableSource 接口;如果在批处理查询中使用它,它必须实现 BatchTableSource 接口。TableSource 还可以实现这两个接口,并可用于流查询和批处理查询。

StreamTableSourceBatchTableSource 扩展了定义了以下方法的基本接口 TableSource:

  1. TableSource<T> {
  2. public TableSchema getTableSchema();
  3. public TypeInformation<T> getReturnType();
  4. public String explainSource();
  5. }
  1. TableSource[T] {
  2. def getTableSchema: TableSchema
  3. def getReturnType: TypeInformation[T]
  4. def explainSource: String
  5. }
  • getTableSchema(): 返回表的模式,即表中字段的名称和类型。字段类型是使用 Flink 的 类型信息(TypeInformation)(参见Table API typesSQL types)定义的。

  • getReturnType(): 返回 DataStream (StreamTableSource) 或 DataSet (BatchTableSource) 的物理类型和由 TableSource 生成的记录。

  • explainSource(): 返回描述 TableSource 的字符串。此方法是可选的,仅用于显示目的。

TableSource 接口将逻辑表模式与返回的 DataStreamDataSet 的物理类型分开。因此,表模式(getTableSchema())的所有字段必须映射到物理返回类型(getReturnType())的对应类型的字段。默认情况下,这个映射是基于字段名完成的。例如,定义具有两个字段[name: String, size: Integer]的表模式的 TableSource 需要一个 TypeInformation,其中至少有两个字段名为 namesize ,类型为 StringInteger。 这可能是一个 PojoTypeInfoRowTypeInfo,它有两个名为 namesize 的字段匹配类型。

然而,有些类型,如 Tuple 或 CaseClass 类型,确实支持自定义字段名。如果 TableSource 返回具有固定字段名的类型的 DataStreamDataSet,它可以实现 DefinedFieldMapping 接口,将字段名从表模式映射到物理返回类型的字段名。

定义一个 BatchTableSource

BatchTableSource 接口扩展了 TableSource 接口,并定义了一个额外的方法:

  1. BatchTableSource<T> implements TableSource<T> {
  2. public DataSet<T> getDataSet(ExecutionEnvironment execEnv);
  3. }
  1. BatchTableSource[T] extends TableSource[T] {
  2. def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
  3. }
  • getDataSet(execEnv): 返回包含表数据的 DataSetDataSet 的类型必须与 TableSource.getReturnType() 方法定义的返回类型相同。DataSet 可以通过使用 DataSet API 的常规数据源创建。通常,BatchTableSource 是通过包装 InputFormatbatch connector来实现的。

定义一个 StreamTableSource

StreamTableSource 接口扩展了 TableSource 接口,并定义了一个额外的方法:

  1. StreamTableSource<T> implements TableSource<T> {
  2. public DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
  3. }
  1. StreamTableSource[T] extends TableSource[T] {
  2. def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
  3. }
  • getDataStream(execEnv): 返回带有表数据的DataStreamDataStream 的类型必须与 TableSource.getReturnType() 方法定义的返回类型相同。DataStream 可以通过使用 DataStream API 的常规数据源(data source)创建。通常,StreamTableSource 是通过包装 SourceFunction流连接器(stream connector)来实现的。

使用时间属性定义表源(TableSource)

流式表APISQL查询的基于时间的操作(如窗口聚合或连接)需要显式指定时间属性

TableSource 在其表模式中将 time 属性定义为类型为 Types.SQL_TIMESTAMP 的字段。与模式中的所有常规字段相比,时间属性不能与表源返回类型中的物理字段匹配。相反,TableSource通过实现某个接口来定义时间属性。

定义处理时间属性(Processing Time Attribute)

处理时间属性通常用于流查询。处理时间属性返回访问它的操作符的当前壁钟时间(wall-clock time)。TableSource 通过实现 DefinedProctimeAttribute 接口来定义处理时间属性。接口如下:

  1. DefinedProctimeAttribute {
  2. public String getProctimeAttribute();
  3. }
  1. DefinedProctimeAttribute {
  2. def getProctimeAttribute: String
  3. }
  • getProctimeAttribute(): 返回处理时间属性的名称。必须在表模式中定义指定的属性类型 Types.SQL_TIMESTAMP,并且可以在基于时间的操作中使用。DefinedProctimeAttribute 表源可以通过返回 null 来定义没有处理时间属性。

注意 StreamTableSourceBatchTableSource 都可以实现 DefinedProctimeAttribute 并定义处理时间属性。对于 BatchTableSource,处理时间字段在表扫描期间使用当前时间戳初始化。

定义 Rowtime 属性(Rowtime Attribute)

Rowtime属性TIMESTAMP 类型的属性,在流和批处理查询中以统一的方式处理。

通过指定,可以将类型为 SQL_TIMESTAMP 的表模式字段声明为 rowtime 属性

  • 字段名,
  • TimestampExtractor 计算属性的实际值(通常来自一个或多个其他字段),以及
  • 指定如何为 rowtim e属性生成水印的 WatermarkStrategy

TableSource 通过实现 DefinedRowtimeAttributes 接口定义了一个 rowtime 属性。接口如下:

  1. DefinedRowtimeAttribute {
  2. public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors();
  3. }
  1. DefinedRowtimeAttributes {
  2. def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
  3. }
  • getRowtimeAttributeDescriptors(): 返回一个 RowtimeAttributeDescriptor 列表。RowtimeAttributeDescriptor 描述了一个具有以下属性的 rowtime 属性:
    • attributeName: 表模式中 rowtime 属性的名称。必须使用类型 Types.SQL_TIMESTAMP 定义字段。
    • timestampExtractor: 时间戳提取器(timestamp extractor)从具有返回类型的记录中提取时间戳。例如,它可以将长字段转换为时间戳,或者解析字符串编码的时间戳。Flink 附带一组内置的 TimestampExtractor 实现,用于常见的用例。还可以提供自定义实现。
    • watermarkStrategy: 水印策略(watermark strategy)定义了如何为 rowtime 属性生成水印。Flink 为常见用例提供了一组内置的 WatermarkStrategy 实现。还可以提供自定义实现。

注意,虽然 getRowtimeAttributeDescriptors() 方法返回一个描述符列表,但目前只支持一个 rowtime 属性。我们计划在将来删除这个限制,并支持具有多个 rowtime 属性的表。

请注意,StreamTableSourceBatchTableSource 都可以实现 DefinedRowtimeAttributes 并定义 rowtime 属性。在这两种情况下,行时间字段都是使用 TimestampExtractor 提取的。因此,实现 StreamTableSourceBatchTableSource 并定义 rowtime 属性的 TableSource 为流处理和批处理查询提供了完全相同的数据。

提供时间戳提取器(Timestamp Extractors)

Flink 为常用用例提供了 TimestampExtractor 实现。

以下 TimestampExtractor 实现目前可用:

  • ExistingField(fieldName): 从现有的 LONG, SQL_TIMESTAMP 或格式化的 STRING 字段中提取 rowtime 属性的值。这样一个字符串的一个例子是 ‘2018-05-28 12:34:56.000’。
  • StreamRecordTimestamp(): 从 DataStream StreamRecord 的时间戳中提取 rowtime 属性的值。注意,此 TimestampExtractor 不适用于批处理表源。

自定义的 TimestampExtractor 可以通过实现相应的接口来定义。

提供水印策略(Watermark Strategies)

Flink 为常用用例提供了水印策略(WatermarkStrategy)实现。

目前提供以下 WatermarkStrategy 实现:

  • AscendingTimestamps: 用于提升时间戳的水印策略。带有无序时间戳的记录将被视为迟到。
  • BoundedOutOfOrderTimestamps(delay): 时间戳的水印策略,其最多在指定的延迟之外是无序的。
  • PreserveWatermarks(): 应该从底层的 DataStream 中保留指示水印的策略。

可以通过实现相应的接口来定义自定义的 WatermarkStrategy

使用投影下推定义表源 Defining a TableSource with Projection Push-Down

TableSource 通过实现 ProjectableTableSource 接口支持投影下推。该接口定义了一个单一的方法:

  1. ProjectableTableSource<T> {
  2. public TableSource<T> projectFields(int[] fields);
  3. }
  1. ProjectableTableSource[T] {
  2. def projectFields(fields: Array[Int]): TableSource[T]
  3. }
  • projectFields(fields): 返回具有调整后的物理返回类型的 TableSource副本(copy)fields 参数提供了必须由 TableSource 提供的字段的索引。索引与物理返回类型的 TypeInformation 有关,而 不是 与逻辑表模式相关。复制的 TableSource 必须调整其返回类型和返回的 DataStreamDataSet。复制的 TableSourceTableSchema 不能更改,即它必须与原来的 TableSource 相同。如果 TableSource 实现了 DefinedFieldMapping 接口,则必须将字段映射调整为新的返回类型。

ProjectableTableSource 为项目平面字段添加了支持。如果 TableSource 定义了一个带有嵌套模式的表,它可以实现 NestedFieldsProjectableTableSource 来将投影扩展到嵌套字段。NestedFieldsProjectableTableSource 的定义如下:

  1. NestedFieldsProjectableTableSource<T> {
  2. public TableSource<T> projectNestedFields(int[] fields, String[][] nestedFields);
  3. }
  1. NestedFieldsProjectableTableSource[T] {
  2. def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
  3. }
  • projectNestedField(fields, nestedFields): 返回 TableSource 的具有调整后的物理返回类型的一个 副本(copy)。可以删除或重新排序物理返回类型的字段,但不能更改它们的类型。该方法的契约(contract)本质上与 ProjectableTableSource.projectFields() 方法相同。此外,nestedFields 参数包含 fields 列表中的每个字段索引,这是查询访问的所有嵌套字段的路径列表。所有其他嵌套字段都不需要在 TableSource 生成的记录中读取、解析和设置。最重要的是 不能更改投影字段的类型,但是可以将未使用的字段设置为null或默认值。

使用下推过滤器定义表源(TableSource)

FilterableTableSource 接口增加了对 TableSource 下推过滤器的支持。扩展这个接口的 TableSource 能够过滤记录,这样返回的 DataStreamDataSet 返回的记录就会更少。

接口如下:

  1. FilterableTableSource<T> {
  2. public TableSource<T> applyPredicate(List<Expression> predicates);
  3. public boolean isFilterPushedDown();
  4. }
  1. FilterableTableSource[T] {
  2. def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]
  3. def isFilterPushedDown: Boolean
  4. }
  • applyPredicate(predicates): 返回带有添加谓词的 TableSource副本(copy)predicates 参数是一个可变的连接谓词列表,它们“提供(offered)”给 TableSourceTableSource 接受通过从列表中删除谓词来计算谓词的值。列表中剩下的谓词将由后续过滤器操作符计算。
  • isFilterPushedDown(): 如果之前调用了 applyPredicate() 方法,则返回 true。因此,对于从 applyPredicate() 调用返回的所有 TableSource 实例,isFilterPushedDown() 必须返回 true。

定义一个 TableSink

TableSink 指定如何将 Table 发送到外部系统或位置。该接口是通用的,因此它可以支持不同的存储位置和格式。对于批处理表和流表,有不同的表接收器(table sinks)。

通用接口如下:

  1. TableSink<T> {
  2. public TypeInformation<T> getOutputType();
  3. public String[] getFieldNames();
  4. public TypeInformation[] getFieldTypes();
  5. public TableSink<T> configure(String[] fieldNames, TypeInformation[] fieldTypes);
  6. }
  1. TableSink[T] {
  2. def getOutputType: TypeInformation<T>
  3. def getFieldNames: Array[String]
  4. def getFieldTypes: Array[TypeInformation]
  5. def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T]
  6. }

调用 TableSink#configure 方法将表的模式(字段名和类型)传递给 TableSink。该方法必须返回 TableSink 的一个新实例,该实例被配置为发出所提供的表模式。

批处理表接收器(BatchTableSink)

定义外部 TableSink 以发出批处理表。

接口如下:

  1. BatchTableSink<T> implements TableSink<T> {
  2. public void emitDataSet(DataSet<T> dataSet);
  3. }
  1. BatchTableSink[T] extends TableSink[T] {
  2. def emitDataSet(dataSet: DataSet[T]): Unit
  3. }

附加流表接收器(AppendStreamTableSink)

定义一个外部 TableSink 来发出只包含插入更改的流表。

接口如下:

  1. AppendStreamTableSink<T> implements TableSink<T> {
  2. public void emitDataStream(DataStream<T> dataStream);
  3. }
  1. AppendStreamTableSink[T] extends TableSink[T] {
  2. def emitDataStream(dataStream: DataStream<T>): Unit
  3. }

如果表也被 update 或 delete 修改,则会抛出一个 TableException

收回流表接收器(RetractStreamTableSink)

定义一个外部 TableSink,用于发出具有插入、更新和删除更改的流表。

接口如下:

  1. RetractStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {
  2. public TypeInformation<T> getRecordType();
  3. public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
  4. }
  1. RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
  2. def getRecordType: TypeInformation[T]
  3. def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
  4. }

该表将被转换为一个累加和收回消息流,这些消息被编码为 Java Tuple2。第一个字段是一个布尔标志,用于指示消息类型(true 表示插入,false 表示删除)。第二个字段保存所请求类型 T 的记录。

维护流表接收器(UpsertStreamTableSink)

定义一个外部 TableSink,用于发出具有插入、更新和删除更改的流表。

接口如下:

  1. UpsertStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {
  2. public void setKeyFields(String[] keys);
  3. public void setIsAppendOnly(boolean isAppendOnly);
  4. public TypeInformation<T> getRecordType();
  5. public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
  6. }
  1. UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
  2. def setKeyFields(keys: Array[String]): Unit
  3. def setIsAppendOnly(isAppendOnly: Boolean): Unit
  4. def getRecordType: TypeInformation[T]
  5. def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
  6. }

该表必须具有唯一的 key 字段(原子或复合)或仅附加。如果表没有唯一 key 并且不是仅附加,则抛出 TableException。表的唯一键由 UpsertStreamTableSink#setKeyFields() 方法配置。

该表将被转换为 upsert 和 delete 消息流,这些消息被编码为 Java Tuple2。第一个字段是一个布尔标志,用于指示消息类型。第二个字段保存所请求类型 T 的记录。

具有 true 布尔字段的消息是已配置 key 的 upsert 消息。带有错误标志的消息是已配置 key 的删除消息。如果表是仅附加的,则所有消息都将具有 true 标志,并且必须解释为插入。

定义表工厂(TableFactory)

TableFactory 允许从基于字符串的属性创建不同的表相关实例。调用所有可用工厂以匹配给定的属性集和相应的工厂类。

工厂利用 Java 的服务提供者接口(Service Provider Interfaces, SPI)进行发现。这意味着每个依赖项和 JAR 文件都应在 META_INF/services 资源目录中包含一个文件 org.apache.flink.table.factories.TableFactory,该文件列出了它提供的所有可用表工厂。

每个表工厂都需要实现以下接口:

  1. package org.apache.flink.table.factories;
  2. interface TableFactory {
  3. Map<String, String> requiredContext();
  4. List<String> supportedProperties();
  5. }
  1. package org.apache.flink.table.factories
  2. trait TableFactory {
  3. def requiredContext(): util.Map[String, String]
  4. def supportedProperties(): util.List[String]
  5. }
  • requiredContext(): 指定已为此工厂实现的上下文。如果满足指定的属性和值集,框架保证仅匹配此工厂。典型属性可能是 connector.type, format.typeupdate-mode。诸如 connector.property-versionformat.property-version 之类的属性 keys 保留用于将来的向后兼容性情况。
  • supportedProperties: 此工厂可以处理的属性 keys 列表。此方法将用于验证。如果传递了该工厂无法处理的属性,则会抛出异常。该列表不得包含上下文指定的 keys。

为了创建特定实例,工厂类可以实现 org.apache.flink.table.factories 中提供的一个或多个接口:

  • BatchTableSourceFactory: 创建批处理表源。
  • BatchTableSinkFactory: 创建批处理表接收器。
  • StreamTableSoureFactory: 创建流表源。
  • StreamTableSinkFactory: 创建流表接收器。
  • DeserializationSchemaFactory: 创建反序列化架构格式。
  • SerializationSchemaFactory: 创建序列化架构格式。

工厂的发现经历了多个阶段:

  • 发现所有可用的工厂。
  • 按工厂类过滤(例如,StreamTableSourceFactory)。
  • 通过匹配上下文过滤。
  • 按支持的属性过滤。
  • 验证一个工厂是否匹配,否则抛出 AmbiguousTableFactoryExceptionNoMatchingTableFactoryException

以下示例显示如何为参数化提供附加 connector.debug 属性标志的自定义流式源。

  1. import org.apache.flink.table.sources.StreamTableSource;
  2. import org.apache.flink.types.Row;
  3. import java.util.ArrayList;
  4. import java.util.HashMap;
  5. import java.util.List;
  6. import java.util.Map;
  7. class MySystemTableSourceFactory implements StreamTableSourceFactory<Row> {
  8. @Override
  9. public Map<String, String> requiredContext() {
  10. Map<String, String> context = new HashMap<>();
  11. context.put("update-mode", "append");
  12. context.put("connector.type", "my-system");
  13. return context;
  14. }
  15. @Override
  16. public List<String> supportedProperties() {
  17. List<String> list = new ArrayList<>();
  18. list.add("connector.debug");
  19. return list;
  20. }
  21. @Override
  22. public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
  23. boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));
  24. # additional validation of the passed properties can also happen here
  25. return new MySystemAppendTableSource(isDebug);
  26. }
  27. }
  1. import java.util
  2. import org.apache.flink.table.sources.StreamTableSource
  3. import org.apache.flink.types.Row
  4. class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] {
  5. override def requiredContext(): util.Map[String, String] = {
  6. val context = new util.HashMap[String, String]()
  7. context.put("update-mode", "append")
  8. context.put("connector.type", "my-system")
  9. context
  10. }
  11. override def supportedProperties(): util.List[String] = {
  12. val properties = new util.ArrayList[String]()
  13. properties.add("connector.debug")
  14. properties
  15. }
  16. override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = {
  17. val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug"))
  18. # additional validation of the passed properties can also happen here
  19. new MySystemAppendTableSource(isDebug)
  20. }
  21. }

在 SQL 客户端中使用 TableFactory

在 SQL 客户端环境文件中,先前显示的工厂可以声明为:

  1. tables:
  2. - name: MySystemTable
  3. type: source
  4. update-mode: append
  5. connector:
  6. type: my-system
  7. debug: true

YAML 文件被转换为扁平的(flattened)字符串属性,并使用描述与外部系统的连接的属性调用表工厂:

  1. update-mode=append
  2. connector.type=my-system
  3. connector.debug=true

注意,属性如 tables.#.nametables.#.type 是 SQL 客户端的特定类型,不传递给任何工厂。type 属性根据执行环境决定是否需要发现 BatchTableSourceFactory/StreamTableSourceFactory (用于source)、BatchTableSinkFactory/StreamTableSinkFactory(用于 sink)或两者(用于 both)。

在 Table 和 SQL API 中使用 TableFactory

对于具有解释性 Scaladoc/Javadoc 的类型安全的编程方法,Table 和 SQL API 在 org.apache.flink.table.descriptors 中提供了转换为基于字符串的属性的描述符。有关源(sources),接收器(sinks)和格式(formats)的信息,请参阅内置描述符(built-in descriptors)作为参考。

在我们的示例中,MySystem 的连接器可以扩展 ConnectorDescriptor,如下所示:

  1. import org.apache.flink.table.descriptors.ConnectorDescriptor;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. /**
  5. * Connector to MySystem with debug mode.
  6. */
  7. public class MySystemConnector extends ConnectorDescriptor {
  8. public final boolean isDebug;
  9. public MySystemConnector(boolean isDebug) {
  10. super("my-system", 1, false);
  11. this.isDebug = isDebug;
  12. }
  13. @Override
  14. protected Map<String, String> toConnectorProperties() {
  15. Map<String, String> properties = new HashMap<>();
  16. properties.put("connector.debug", Boolean.toString(isDebug));
  17. return properties;
  18. }
  19. }
  1. import org.apache.flink.table.descriptors.ConnectorDescriptor
  2. import java.util.HashMap
  3. import java.util.Map
  4. /**
  5. * Connector to MySystem with debug mode.
  6. */
  7. class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, false) {
  8. override protected def toConnectorProperties(): Map[String, String] = {
  9. val properties = new HashMap[String, String]
  10. properties.put("connector.debug", isDebug.toString)
  11. properties
  12. }
  13. }

然后可以在 API 中使用描述符,如下所示:

  1. StreamTableEnvironment tableEnv = // ...
  2. tableEnv
  3. .connect(new MySystemConnector(true))
  4. .inAppendMode()
  5. .registerTableSource("MySystemTable");
  1. val tableEnv: StreamTableEnvironment = // ...
  2. tableEnv
  3. .connect(new MySystemConnector(isDebug = true))
  4. .inAppendMode()
  5. .registerTableSource("MySystemTable")