连接到外部系统

译者:flink.sojb.cn

Flink的 Table API和SQL程序可以连接到其他外部系统,以便读取和写入批处理表和流表。表源提供对存储在外部系统(例如数据库,键值存储,消息队列或文件系统)中的数据的访问。表接收器向外部存储系统发出表。根据源和接收器的类型,它们支持不同的格式,如CSV,Parquet或ORC。

本页介绍如何声明内置表源和/或表接收器,并在Flink中注册它们。注册源或接收器后,可以通过 Table API和SQL语句访问它。

注意如果要实现自己的自定义表源或接收器,请查看用户定义的源和接收器页面

依赖

下表列出了所有可用的连接器和格式。它们的相互兼容性在表连接器表格格式的相应部分中标记。下表提供了使用构建自动化工具(如Maven或SBT)和带有SQL JAR包的SQL Client的两个项目的依赖关系信息。

此表仅适用于稳定版本。

概览

从Flink 1.6开始,与外部系统的连接声明与实际实现分开。

也可以指定连接

  • 以编程方式使用Descriptorunder org.apache.flink.table.descriptorsfor Table&SQL API
  • 声明性地通过SQL客户端的YAML配置文件

这不仅可以更好地统一API和SQL Client,还可以在自定义实现的情况下实现更好的可扩展性,而无需更改实际声明。

每个声明都类似于SQL CREATE TABLE语句。可以预先定义表的名称,表的模式,连接器以及用于连接到外部系统的数据格式。

连接器描述了存储表的数据的外部系统。可以在此处声明Apacha Kafka或常规文件系统等存储系统。连接器可能已经提供了具有字段和架构的固定格式。

某些系统支持不同的数据格式。例如,存储在Kafka或文件中的表可以使用CSV,JSON或Avro对其行进行编码。数据库连接器可能需要此处的表模式。每个连接器都记录了存储系统是否需要定义格式。不同的系统还需要不同类型的格式(例如,面向列的格式与面向行的格式)。该文档说明了哪些格式类型和连接器兼容。

表架构定义了暴露在SQL查询表的架构。它描述了源如何将数据格式映射到表模式,反之亦然。模式可以访问连接器或格式定义的字段。它可以使用一个或多个字段来提取或插入时间属性。如果输入字段没有确定的字段顺序,则架构清楚地定义列名称,它们的顺序和原点。

后续部分将更详细地介绍每个定义部分(连接器格式架构)。以下示例显示了如何传递它们:

  1. tableEnvironment
  2. .connect(...)
  3. .withFormat(...)
  4. .withSchema(...)
  5. .inAppendMode()
  6. .registerTableSource("MyTable")
  1. name: MyTable
  2. type: source
  3. update-mode: append
  4. connector: ...
  5. format: ...
  6. schema: ...

表的类型(sourcesinkboth)确定表的注册方式。在表类型的情况下,表both源和表接收器都以相同的名称注册。从逻辑上讲,这意味着我们可以读取和写入这样的表,类似于常规DBMS中的表。

对于流式查询,更新模式声明如何在动态表和存储系统之间进行通信以进行连续查询。

以下代码显示了如何连接到Kafka以读取Avro记录的完整示例。

  1. tableEnvironment
  2. // declare the external system to connect to
  3. .connect(
  4. new Kafka()
  5. .version("0.10")
  6. .topic("test-input")
  7. .startFromEarliest()
  8. .property("zookeeper.connect", "localhost:2181")
  9. .property("bootstrap.servers", "localhost:9092")
  10. )
  11. // declare a format for this system
  12. .withFormat(
  13. new Avro()
  14. .avroSchema(
  15. "{" +
  16. " \"namespace\": \"org.myorganization\"," +
  17. " \"type\": \"record\"," +
  18. " \"name\": \"UserMessage\"," +
  19. " \"fields\": [" +
  20. " {\"name\": \"timestamp\", \"type\": \"string\"}," +
  21. " {\"name\": \"user\", \"type\": \"long\"}," +
  22. " {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
  23. " ]" +
  24. "}" +
  25. )
  26. )
  27. // declare the schema of the table
  28. .withSchema(
  29. new Schema()
  30. .field("rowtime", Types.SQL_TIMESTAMP)
  31. .rowtime(new Rowtime()
  32. .timestampsFromField("ts")
  33. .watermarksPeriodicBounded(60000)
  34. )
  35. .field("user", Types.LONG)
  36. .field("message", Types.STRING)
  37. )
  38. // specify the update-mode for streaming tables
  39. .inAppendMode()
  40. // register as source, sink, or both and under a name
  41. .registerTableSource("MyUserTable");
  1. tables:
  2. - name: MyUserTable # name the new table
  3. type: source # declare if the table should be "source", "sink", or "both"
  4. update-mode: append # specify the update-mode for streaming tables
  5. # declare the external system to connect to
  6. connector:
  7. type: kafka
  8. version: "0.10"
  9. topic: test-input
  10. startup-mode: earliest-offset
  11. properties:
  12. - key: zookeeper.connect
  13. value: localhost:2181
  14. - key: bootstrap.servers
  15. value: localhost:9092
  16. # declare a format for this system
  17. format:
  18. type: avro
  19. avro-schema: >
  20. {
  21. "namespace": "org.myorganization",
  22. "type": "record",
  23. "name": "UserMessage",
  24. "fields": [
  25. {"name": "ts", "type": "string"},
  26. {"name": "user", "type": "long"},
  27. {"name": "message", "type": ["string", "null"]}
  28. ]
  29. }
  30. # declare the schema of the table
  31. schema:
  32. - name: rowtime
  33. type: TIMESTAMP
  34. rowtime:
  35. timestamps:
  36. type: from-field
  37. from: ts
  38. watermarks:
  39. type: periodic-bounded
  40. delay: "60000"
  41. - name: user
  42. type: BIGINT
  43. - name: message
  44. type: VARCHAR

在两种方式中,所需的连接属性都转换为规范化的,基于字符串的键值对。所谓的表工厂从键值对创建配置的表源,表接收器和相应的格式。在搜索完全匹配的表工厂时,会考虑通过Java服务提供程序接口(SPI)找到的所有表工厂。

如果找不到工厂或多个工厂匹配给定的属性,则会抛出一个异常,其中包含有关已考虑的工厂和支持的属性的其他信息。

表格式

表模式定义类的名称和类型,类似于SQL CREATE TABLE语句的列定义。另外,可以指定列的表示方式和表格数据编码格式的字段。如果列的名称应与输入/输出格式不同,则字段的来源可能很重要。例如,列user_name应从$$-user-nameJSON格式引用该字段。此外,需要架构将类型从外部系统映射到Flink的表示。对于表接收器,它确保仅将具有有效模式的数据写入外部系统。

以下示例显示了一个没有时间属性的简单模式,以及输入/输出到表列的一对一字段映射。

  1. .withSchema(
  2. new Schema()
  3. .field("MyField1", Types.INT) // required: specify the fields of the table (in this order)
  4. .field("MyField2", Types.STRING)
  5. .field("MyField3", Types.BOOLEAN)
  6. )
  1. schema:
  2. - name: MyField1 # required: specify the fields of the table (in this order)
  3. type: INT
  4. - name: MyField2
  5. type: VARCHAR
  6. - name: MyField3
  7. type: BOOLEAN

对于每个字段,除了列的名称和类型之外,还可以声明以下属性:

  1. .withSchema(
  2. new Schema()
  3. .field("MyField1", Types.SQL_TIMESTAMP)
  4. .proctime() // optional: declares this field as a processing-time attribute
  5. .field("MyField2", Types.SQL_TIMESTAMP)
  6. .rowtime(...) // optional: declares this field as a event-time attribute
  7. .field("MyField3", Types.BOOLEAN)
  8. .from("mf3") // optional: original field in the input that is referenced/aliased by this field
  9. )
  1. schema:
  2. - name: MyField1
  3. type: TIMESTAMP
  4. proctime: true # optional: boolean flag whether this field should be a processing-time attribute
  5. - name: MyField2
  6. type: TIMESTAMP
  7. rowtime: ... # optional: wether this field should be a event-time attribute
  8. - name: MyField3
  9. type: BOOLEAN
  10. from: mf3 # optional: original field in the input that is referenced/aliased by this field

使用无界流表时,时间属性是必不可少的。因此,处理时间和事件时间(也称为“行时”)属性都可以定义为模式的一部分。

有关Flink中时间处理的更多信息,特别是事件时间,我们建议使用常规事件时间部分

行时属性

为了控制表的事件时间行​​为,Flink提供了预定义的时间戳提取器和水印策略。

支持以下时间戳提取器:

  1. // Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute.
  2. .rowtime(
  3. new Rowtime()
  4. .timestampsFromField("ts_field") // required: original field name in the input
  5. )
  6. // Converts the assigned timestamps from a DataStream API record into the rowtime attribute
  7. // and thus preserves the assigned timestamps from the source.
  8. // This requires a source that assigns timestamps (e.g., Kafka 0.10+).
  9. .rowtime(
  10. new Rowtime()
  11. .timestampsFromSource()
  12. )
  13. // Sets a custom timestamp extractor to be used for the rowtime attribute.
  14. // The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
  15. .rowtime(
  16. new Rowtime()
  17. .timestampsFromExtractor(...)
  18. )
  1. # Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
  2. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  3. rowtime:
  4. timestamps:
  5. type: from-field
  6. from: "ts_field" # required: original field name in the input
  7. # Converts the assigned timestamps from a DataStream API record into the rowtime attribute
  8. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  9. # and thus preserves the assigned timestamps from the source.
  10. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  11. rowtime:
  12. timestamps:
  13. type: from-source

支持以下水印策略:

  1. // Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
  2. // observed timestamp so far minus 1\. Rows that have a timestamp equal to the max timestamp
  3. // are not late.
  4. .rowtime(
  5. new Rowtime()
  6. .watermarksPeriodicAscending()
  7. )
  8. // Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
  9. // Emits watermarks which are the maximum observed timestamp minus the specified delay.
  10. .rowtime(
  11. new Rowtime()
  12. .watermarksPeriodicBounded(2000) // delay in milliseconds
  13. )
  14. // Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
  15. // underlying DataStream API and thus preserves the assigned watermarks from the source.
  16. .rowtime(
  17. new Rowtime()
  18. .watermarksFromSource()
  19. )
  1. # Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
  2. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  3. # observed timestamp so far minus 1\. Rows that have a timestamp equal to the max timestamp
  4. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  5. # are not late.
  6. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  7. rowtime:
  8. watermarks:
  9. type: periodic-ascending
  10. # Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
  11. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  12. # Emits watermarks which are the maximum observed timestamp minus the specified delay.
  13. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  14. rowtime:
  15. watermarks:
  16. type: periodic-bounded
  17. delay: ... # required: delay in milliseconds
  18. # Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
  19. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  20. # underlying DataStream API and thus preserves the assigned watermarks from the source.
  21. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  22. rowtime:
  23. watermarks:
  24. type: from-source

确保始终声明时间戳和水印。触发基于时间的 算子操作需要水印。

类型字符串

由于类型信息仅在编程语言中可用,因此支持在YAML文件中定义以下类型字符串:

  1. VARCHAR
  2. BOOLEAN
  3. TINYINT
  4. SMALLINT
  5. INT
  6. BIGINT
  7. FLOAT
  8. DOUBLE
  9. DECIMAL
  10. DATE
  11. TIME
  12. TIMESTAMP
  13. ROW(fieldtype, ...) # unnamed row; e.g. ROW(VARCHAR, INT) that is mapped to Flink's RowTypeInfo
  14. # with indexed fields names f0, f1, ...
  15. ROW(fieldname fieldtype, ...) # named row; e.g., ROW(myField VARCHAR, myOtherField INT) that
  16. # is mapped to Flink's RowTypeInfo
  17. POJO(class) # e.g., POJO(org.mycompany.MyPojoClass) that is mapped to Flink's PojoTypeInfo
  18. ANY(class) # e.g., ANY(org.mycompany.MyClass) that is mapped to Flink's GenericTypeInfo
  19. ANY(class, serialized) # used for type information that is not supported by Flink's Table & SQL API

更新模式

对于流式查询,需要声明如何在动态表和外部连接器之间执行转换。的更新模式指定哪些类型的消息应与外部系统进行交换:

追加模式:在追加模式下,动态表和外部连接器仅交换INSERT消息。

回退模式:在回退模式下,动态表和外部连接器交换ADD和RETRACT消息。INSERT更改被编码为ADD消息,DELETE更改被编码为RETRACT消息,UPDATE更改被编码为更新(先前)行的RETRACT消息和更新(新)行的ADD消息。在此模式下,不能定义键而不是upsert模式。但是,每次更新都包含两个效率较低的消息。

Upsert模式:在upsert模式下,动态表和外部连接器交换UPSERT和DELETE消息。此模式需要一个(可能是复合的)唯一键,通过该键可以传播更新。外部连接器需要知道唯一键属性才能正确应用消息。INSERT和UPDATE更改被编码为UPSERT消息。DELETE更改为DELETE消息。与收回流的主要区别在于UPDATE更改使用单个消息进行编码,因此更有效。

注意每个连接器的文档都说明支持哪些更新模式。

  1. .connect(...)
  2. .inAppendMode() // otherwise: inUpsertMode() or inRetractMode()
  1. tables:
  2. - name: ...
  3. update-mode: append # otherwise: "retract" or "upsert"

有关更多信息,另请参阅常规流概念文档

表连接器

Flink提供了一组用于连接外部系统的连接器。

请注意,并非所有连接器都可用于批量和流式传输。此外,并非每个流连接器都支持每种流模式。因此,相应地标记每个连接器。格式标记表示连接器需要特定类型的格式。

文件系统连接器

来源:批量 来源:流处理附加模式 接收器:批量 接收器:流式附加模式 格式:仅限CSV

文件系统连接器允许从本地或分布式文件系统进行读写。文件系统可以定义为:

  1. .connect(
  2. new FileSystem()
  3. .path("file:///path/to/whatever") // required: path to a file or directory
  4. )
  1. connector:
  2. type: filesystem
  3. path: "file:///path/to/whatever" # required: path to a file or directory

文件系统连接器本身包含在Flink中,不需要额外的依赖项。需要指定相应的格式,以便从文件系统读取和写入行。

注意确保包含特定于Flink文件系统的依赖项

注意用于流式传输的文件系统源和接收器仅是实验性的。将来,我们将支持实际的流式使用案例,即目录监控和桶输出。

Kafka连接器

来源:流附加模式 接收器:流附加模式 格式:序列化模式 格式:反序列化模式

Kafka连接器允许从Apache Kafka主题读取和写入。它可以定义如下:

  1. .connect(
  2. new Kafka()
  3. .version("0.11") // required: valid connector versions are "0.8", "0.9", "0.10", and "0.11"
  4. .topic("...") // required: topic name from which the table is read
  5. // optional: connector specific properties
  6. .property("zookeeper.connect", "localhost:2181")
  7. .property("bootstrap.servers", "localhost:9092")
  8. .property("group.id", "testGroup")
  9. // optional: select a startup mode for Kafka offsets
  10. .startFromEarliest()
  11. .startFromLatest()
  12. .startFromSpecificOffsets(...)
  13. // optional: output partitioning from Flink's partitions into Kafka's partitions
  14. .sinkPartitionerFixed() // each Flink partition ends up in at-most one Kafka partition (default)
  15. .sinkPartitionerRoundRobin() // a Flink partition is distributed to Kafka partitions round-robin
  16. .sinkPartitionerCustom(MyCustom.class) // use a custom FlinkKafkaPartitioner subclass
  17. )
  1. connector:
  2. type: kafka
  3. version: 0.11 # required: valid connector versions are "0.8", "0.9", "0.10", and "0.11"
  4. topic: ... # required: topic name from which the table is read
  5. properties: # optional: connector specific properties
  6. - key: zookeeper.connect
  7. value: localhost:2181
  8. - key: bootstrap.servers
  9. value: localhost:9092
  10. - key: group.id
  11. value: testGroup
  12. startup-mode: ... # optional: valid modes are "earliest-offset", "latest-offset",
  13. # "group-offsets", or "specific-offsets"
  14. specific-offsets: # optional: used in case of startup mode with specific offsets
  15. - partition: 0
  16. offset: 42
  17. - partition: 1
  18. offset: 300
  19. sink-partitioner: ... # optional: output partitioning from Flink's partitions into Kafka's partitions
  20. # valid are "fixed" (each Flink partition ends up in at most one Kafka partition),
  21. # "round-robin" (a Flink partition is distributed to Kafka partitions round-robin)
  22. # "custom" (use a custom FlinkKafkaPartitioner subclass)
  23. sink-partitioner-class: org.mycompany.MyPartitioner # optional: used in case of sink partitioner custom

指定开始读取位置:默认情况下,Kafka源将开始从Zookeeper或Kafka代理中的已提交组偏移量中读取数据。您可以指定其他起始位置,这些位置对应于Kafka消费者起始位置配置部分中的配置

Flink-Kafka接收器分区:默认情况下,Kafka接收器最多写入与其自身并行性一样多的分区(接收器的每个并行实例仅写入一个分区)。为了将写入分发到更多分区或控制行到分区的路由,可以提供自定义接收器分区器。循环分区器可用于避免不平衡的分区。但是,它会在所有Flink实例和所有Kafka代理之间产生大量网络连接。

一致性保证:默认情况下,如果在启用检查点的情况下执行查询,则Kafka接收器会使用至少一次保证将数据提取到Kafka主题中。

Kafka 0.10+时间戳:自Kafka 0.10起,Kafka消息的时间戳作为元数据,指定记录何时写入Kafka主题。通过分别在YAML和Java / Scala中选择,可以将这些时间戳用于行时属性timestamps: from-source``timestampsFromSource()

确保添加特定于版本的Kafka依赖项。此外,需要指定相应的格式以便从Kafka读取和写入行。

表格格式

Flink提供了一组可与表连接器一起使用的表格格式。

格式标记表示与连接器匹配的格式类型。

CSV格式

CSV格式允许读取和写入以逗号分隔的行。

  1. .withFormat(
  2. new Csv()
  3. .field("field1", Types.STRING) // required: ordered format fields
  4. .field("field2", Types.TIMESTAMP)
  5. .fieldDelimiter(",") // optional: string delimiter "," by default
  6. .lineDelimiter("\n") // optional: string delimiter "\n" by default
  7. .quoteCharacter('"') // optional: single character for string values, empty by default
  8. .commentPrefix('#') // optional: string to indicate comments, empty by default
  9. .ignoreFirstLine() // optional: ignore the first line, by default it is not skipped
  10. .ignoreParseErrors() // optional: skip records with parse error instead of failing by default
  11. )
  1. format:
  2. type: csv
  3. fields: # required: ordered format fields
  4. - name: field1
  5. type: VARCHAR
  6. - name: field2
  7. type: TIMESTAMP
  8. field-delimiter: "," # optional: string delimiter "," by default
  9. line-delimiter: "\n" # optional: string delimiter "\n" by default
  10. quote-character: '"' # optional: single character for string values, empty by default
  11. comment-prefix: '#' # optional: string to indicate comments, empty by default
  12. ignore-first-line: false # optional: boolean flag to ignore the first line, by default it is not skipped
  13. ignore-parse-errors: true # optional: skip records with parse error instead of failing by default

CSV格式包含在Flink中,不需要其他依赖项。

注意目前写入行的CSV格式有限。仅支持自定义字段分隔符作为可选参数。

JSON格式

格式:序列化架构 格式:反序列化架构

JSON格式允许读取和写入与给定格式模式相对应的JSON数据。格式模式可以定义为Flink类型,JSON模式,也可以从所需的表模式派生。Flink类型支持更类似SQL的定义并映射到相应的SQL数据类型。JSON模式允许更复杂和嵌套的结构。

如果格式架构等于表架构,则还可以自动派生架构。这允许仅定义一次架构信息。格式的名称,类型和字段顺序由表的架构决定。如果时间属性的来源不是字段,则会忽略它们。一个from表中的模式定义解释为格式字段命名。

  1. .withFormat(
  2. new Json()
  3. .failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
  4. // required: define the schema either by using type information which parses numbers to corresponding types
  5. .schema(Type.ROW(...))
  6. // or by using a JSON schema which parses to DECIMAL and TIMESTAMP
  7. .jsonSchema(
  8. "{" +
  9. " type: 'object'," +
  10. " properties: {" +
  11. " lon: {" +
  12. " type: 'number'" +
  13. " }," +
  14. " rideTime: {" +
  15. " type: 'string'," +
  16. " format: 'date-time'" +
  17. " }" +
  18. " }" +
  19. "}"
  20. )
  21. // or use the table's schema
  22. .deriveSchema()
  23. )
  1. format:
  2. type: json
  3. fail-on-missing-field: true # optional: flag whether to fail if a field is missing or not, false by default
  4. # required: define the schema either by using a type string which parses numbers to corresponding types
  5. schema: "ROW(lon FLOAT, rideTime TIMESTAMP)"
  6. # or by using a JSON schema which parses to DECIMAL and TIMESTAMP
  7. json-schema: >
  8. {
  9. type: 'object',
  10. properties: {
  11. lon: {
  12. type: 'number'
  13. },
  14. rideTime: {
  15. type: 'string',
  16. format: 'date-time'
  17. }
  18. }
  19. }
  20. # or use the table's schema
  21. derive-schema: true

下表显示了JSON模式类型到Flink SQL类型的映射:

JSON模式 Flink SQL
object ROW
boolean BOOLEAN
array ARRAY[_]
number DECIMAL
integer DECIMAL
string VARCHAR
string with format: date-time TIMESTAMP
string with format: date DATE
string with format: time TIME
string with encoding: base64 ARRAY[TINYINT]
null NULL (尚不支持)

目前,Flink仅支持JSON模式规范 的子集draft-07。Union类型(以及allOfanyOfnot)尚未支持。oneOf和类型数组仅支持指定可为空性。

支持链接到文档中的通用定义的简单引用,如下面更复杂的示例所示:

  1. { "definitions": { "address": { "type": "object", "properties": { "street_address": { "type": "string" }, "city": { "type": "string" }, "state": { "type": "string" } }, "required": [ "street_address", "city", "state" ] } }, "type": "object", "properties": { "billing_address": { "$ref": "#/definitions/address" }, "shipping_address": { "$ref": "#/definitions/address" }, "optional_address": { "oneOf": [ { "type": "null" }, { "$ref": "#/definitions/address" } ] } } }

缺少字段处理:默认情况下,缺少的JSON字段设置为null。您可以启用严格的JSON解析,如果缺少字段,将取消源(和查询)。

确保将JSON格式添加为依赖项。

Apache Avro格式

格式:序列化架构 格式:反序列化架构

Apache的Avro的格式允许读取和写入对应于给定的格式模式Avro的数据。格式模式可以定义为Avro特定记录的完全限定类名,也可以定义为Avro架构字符串。如果使用类名,则在运行时期间类必须在类路径中可用。

  1. .withFormat(
  2. new Avro()
  3. // required: define the schema either by using an Avro specific record class
  4. .recordClass(User.class)
  5. // or by using an Avro schema
  6. .avroSchema(
  7. "{" +
  8. " \"type\": \"record\"," +
  9. " \"name\": \"test\"," +
  10. " \"fields\" : [" +
  11. " {\"name\": \"a\", \"type\": \"long\"}," +
  12. " {\"name\": \"b\", \"type\": \"string\"}" +
  13. " ]" +
  14. "}"
  15. )
  16. )
  1. format:
  2. type: avro
  3. # required: define the schema either by using an Avro specific record class
  4. record-class: "org.organization.types.User"
  5. # or by using an Avro schema
  6. avro-schema: >
  7. {
  8. "type": "record",
  9. "name": "test",
  10. "fields" : [
  11. {"name": "a", "type": "long"},
  12. {"name": "b", "type": "string"}
  13. ]
  14. }

Avro类型映射到相应的SQL数据类型。仅支持联合类型以指定可为空性,否则它们将转换为ANY类型。下表显示了映射:

Avro架构 Flink SQL
record ROW
enum VARCHAR
array ARRAY[_]
map MAP[VARCHAR, _]
union 非null类型或 ANY
fixed ARRAY[TINYINT]
string VARCHAR
bytes ARRAY[TINYINT]
int INT
long BIGINT
float FLOAT
double DOUBLE
boolean BOOLEAN
int with logicalType: date DATE
int with logicalType: time-millis TIME
int with logicalType: time-micros INT
long with logicalType: timestamp-millis TIMESTAMP
long with logicalType: timestamp-micros BIGINT
bytes with logicalType: decimal DECIMAL
fixed with logicalType: decimal DECIMAL
null NULL (尚不支持)

Avro使用Joda-Time来表示特定记录类中的逻辑日期和时间类型。Joda-Time依赖不是Flink分发的一部分。因此,请确保Joda-Time在运行时期间与您的特定记录类一起位于类路径中。通过模式字符串指定的Avro格式不需要存在Joda-Time。

确保添加Apache Avro依赖项。

进一步的TableSources和TableSinks

尚未将以下表源和接收器迁移(或尚未完全迁移)到新的统一接口。

这些是TableSourceFlink提供的附加函数:

班级名称 Maven依赖 批量? 流? 描述
OrcTableSource flink-orc Y N 一个TableSourceORC文件。

这些是TableSinkFlink提供的附加函数:

班级名称 Maven依赖 批量? 流? 描述
CsvTableSink flink-table Y 附加 CSV文件的简单接收器。
JDBCAppendTableSink flink-jdbc Y 附加 将表写入JDBC表。
CassandraAppendTableSink flink-connector-cassandra N 附加 将表写入Cassandra表。

OrcTableSource

OrcTableSource读取ORC文件。ORC是结构化数据的文件格式,并以压缩的柱状表示形式存储数据。ORC非常高效,支持Projection和滤波器下推。

一个OrcTableSource被创建,如下所示:

  1. // create Hadoop Configuration
  2. Configuration config = new Configuration();
  3. OrcTableSource orcTableSource = OrcTableSource.builder()
  4. // path to ORC file(s). NOTE: By default, directories are recursively scanned.
  5. .path("file:///path/to/data")
  6. // schema of ORC files
  7. .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
  8. // Hadoop configuration
  9. .withConfiguration(config)
  10. // build OrcTableSource
  11. .build();
  1. // create Hadoop Configuration val config = new Configuration()
  2. val orcTableSource = OrcTableSource.builder()
  3. // path to ORC file(s). NOTE: By default, directories are recursively scanned.
  4. .path("file:///path/to/data")
  5. // schema of ORC files
  6. .forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
  7. // Hadoop configuration
  8. .withConfiguration(config)
  9. // build OrcTableSource
  10. .build()

注:OrcTableSource不支持ORC的Union类型呢。

CsvTableSink

CsvTableSink发出Table一个或一个以上的CSV文件。

接收器仅支持仅附加流表。它不能用于发出Table不断更新的内容。有关详细信息,请参阅表到流转换文档。发出流表时,行至少写入一次(如果启用了检查点),CsvTableSink并且不将输出文件拆分为存储桶文件,而是连续写入相同的文件。

  1. Table table = ...
  2. table.writeToSink(
  3. new CsvTableSink(
  4. path, // output path
  5. "|", // optional: delimit files by '|'
  6. 1, // optional: write to a single file
  7. WriteMode.OVERWRITE)); // optional: override existing files
  1. val table: Table = ???
  2. table.writeToSink(
  3. new CsvTableSink(
  4. path, // output path
  5. fieldDelim = "|", // optional: delimit files by '|'
  6. numFiles = 1, // optional: write to a single file
  7. writeMode = WriteMode.OVERWRITE)) // optional: override existing files

JDBCAppendTableSink

JDBCAppendTableSink发出Table一个JDBC连接。接收器仅支持仅附加流表。它不能用于发出Table不断更新的内容。有关详细信息,请参阅表到流转换文档

所述JDBCAppendTableSink插入物每Table行至少一次到数据库表(如果启用了检查点)。但是,您可以使用REPLACE或指定插入查询来指定INSERT OVERWRITE对数据库的写入。

要使用JDBC接收器,必须将JDBC连接器依赖项(flink-jdbc)添加到项目中。然后您可以使用JDBCAppendSinkBuilder以下方法创建接收器

  1. JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
  2. .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
  3. .setDBUrl("jdbc:derby:memory:ebookshop")
  4. .setQuery("INSERT INTO books (id) VALUES (?)")
  5. .setParameterTypes(INT_TYPE_INFO)
  6. .build();
  7. Table table = ...
  8. table.writeToSink(sink);
  1. val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
  2. .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
  3. .setDBUrl("jdbc:derby:memory:ebookshop")
  4. .setQuery("INSERT INTO books (id) VALUES (?)")
  5. .setParameterTypes(INT_TYPE_INFO)
  6. .build()
  7. val table: Table = ???
  8. table.writeToSink(sink)

与using类似JDBCOutputFormat,您必须显式指定JDBC驱动程序的名称,JDBC URL,要执行的查询以及JDBC表的字段类型。

CassandraAppendTableSink

CassandraAppendTableSink发射Table到卡桑德拉表。接收器仅支持仅附加流表。它不能用于发出Table不断更新的内容。有关详细信息,请参阅表到流转换文档

CassandraAppendTableSink插入所有行至少一次到Cassandra的表,如果检查点已启用。但是,您可以将查询指定为upsert查询。

要使用CassandraAppendTableSink,必须将Cassandra连接器依赖项(flink-connector-cassandra)添加到项目中。以下示例显示了如何使用CassandraAppendTableSink

  1. ClusterBuilder builder = ... // configure Cassandra cluster connection
  2. CassandraAppendTableSink sink = new CassandraAppendTableSink(
  3. builder,
  4. // the query must match the schema of the table
  5. INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));
  6. Table table = ...
  7. table.writeToSink(sink);
  1. val builder: ClusterBuilder = ... // configure Cassandra cluster connection
  2. val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
  3. builder,
  4. // the query must match the schema of the table
  5. INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?))
  6. val table: Table = ???
  7. table.writeToSink(sink)