API整合

Table API

  • 用于流和批处理的统一关系 API
    • SQL 语言的超集
    • 许多概念和部分与 Flink SQL 共享
  • 适用于 Java、Scala 和 Python
  • 表 API 查询是用编程语言定义的(相对于 SQL 字符串)
    • IDE 支持自动完成和语法验证

      纯Table API程序

      ```java EnvironmentSettings settings = EnvironmentSettings .newInstance() .inBatchMode() // or .inStreamingMode() .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

  1. <a name="pX6fc"></a>
  2. ### 纯Table API程序与DataStream API整合
  3. ```java
  4. StreamExecutionEnvironment env =
  5. StreamExecutionEnvironment.getExecutionEnvironment();
  6. EnvironmentSettings settings = EnvironmentSettings
  7. .newInstance()
  8. .inStreamingMode() // not yet supported: .inBatchMode()
  9. .build();
  10. StreamTableEnvironment tEnv =
  11. StreamTableEnvironment.create(env, settings);

Table API example

  1. /** Example Table API program. */
  2. public class FlinkTableExample {
  3. public static void main(String[] args) {
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. env.enableCheckpointing(10_000);
  6. env.getConfig().enableObjectReuse();
  7. EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
  8. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
  9. //将connector数据转化为table
  10. tEnv.executeSql(
  11. "CREATE TABLE Orders (\n"
  12. + " `a` STRING,\n"
  13. + " `b` NUMERIC,\n"
  14. + " `rowtime` TIMESTAMP(3),\n"
  15. + " WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '10' SECOND"
  16. + ") WITH (\n"
  17. + " 'connector' = 'datagen',\n"
  18. + " 'rows-per-second' = '1'\n"
  19. + ")");
  20. Table orders = tEnv.from("Orders");
  21. Table result =
  22. orders
  23. .filter(and($("a").isNotNull(), $("b").isNotNull()))
  24. .select($("a").lowerCase().as("a"), $("b"), $("rowtime"))
  25. .window(Tumble.over(lit(1).minutes()).on($("rowtime")).as("minuteWindow"))
  26. .groupBy($("minuteWindow"), $("a"))
  27. .select(
  28. $("a"), $("minuteWindow").end().as("minute"), $("b").avg().as("avgBillingAmount"));
  29. result.execute().print();
  30. }
  31. }

如果将 Table API 初始化与 DataStream 互操作性一起使用,我们还可以通过在代码里微调环境配置,例如检查点或对象重用,如上所示。确保在实例化 StreamTableEnvironment 之前进行配置,因为 Flink 不保证选项会在实例化之后转发。

在某些情况下,对象重用通过在堆上分配更少的对象来提高效率。与 DataStream API 相比,只要留在 Table API / SQL 生态系统中,就可以不受限制地启用它。
如果需要创建表,其引用位于其中一个可用连接器,最简单的选择是通过 SQL 表 DDL。
如果使用的编目(catalog)已经暴露了需要使用的表,可以通过 tEnv.from() 检索它。

与表DDL相比,在编目中创建表(例如默认内存编目)相当繁琐。例如:

  1. Schema schema =
  2. Schema.newBuilder()
  3. .column("a", DataTypes.STRING())
  4. .column("b", DataTypes.INT())
  5. .column("rowtime", DataTypes.TIMESTAMP_LTZ(3))
  6. .watermark("rowtime", $("rowtime").minus(lit(10).seconds()))
  7. .build();
  8. TableDescriptor tableDescriptor =
  9. TableDescriptor.forConnector("datagen")
  10. .option(DataGenConnectorOptions.ROWS_PER_SECOND, 1L)
  11. .schema(schema)
  12. .build();
  13. tEnv.createTable("Orders", tableDescriptor);

前面的语句声明了从源(输入表)到接收器(输出表)的管道,即最终将处理数据的 Flink 作业。调用 execute 将校验、转换、提交、运行作业,并收集可以打印的结果。
在实际工作中,可能更希望调用 executeInsert(

) 将工作结果写入已定义的输出接收器,例如 Kafka 主题。
通过 tEnv.createStatementSet() 的语句集允许将多个插入包装到(可能)不同的输出表中。

DataStream API集成

SQL / Table API vs. DataStream API

  • DataStream API
    • 经典的核心流处理使用场景
    • 显式控制流、状态和时间的原语
    • 复杂的计算和定制
    • 目标:最大化性能和可靠性
  • SQL / Table API
    • 可以使用高阶API, SQL 建模
    • 专注于逻辑,而不是实现
    • 混合工作负载(批处理和流式处理)
    • 目标:最大化开发人员的速度和自主权

在一个 API 下将批处理和流结合在一起一直是 SQL / Table API 的强项。但是,DataStream API 也正在通过批处理模式执行进行扩展(以前仅在 DataSet API 中可用)并且最终可能会赶上。

混合SQL / Table and DataStream API

混合使用两种API可能更有利的场景。

  • 从 Table / SQL 生态系统开始,用于访问catalog和外部系统
    • 在 DataStream API 中继续
  • 使用 SQL / Table API 函数进行无状态数据规范化和清理
    • 在 DataStream API 中继续
  • 使用 SQL / Table API,但是
    • 切换到 DataStream API 进行低阶操作,例如自定义计时器
  • 在DataStream API 中开始和/或结束时,自定义连接器。
    • 但是为简单起见,在 SQL / Table API 中实现主管道运算

使用一种API中的管道可以完成端到端定义与运算,而无需其他类型API。但是,由于各种原因,混合两个API可能很有用。

DataStream ↔ Table Conversion

  • StreamTableEnvironment使用集成选项扩展TableEnvironment。
  • TableEnvironment对仅插入流的简单处理
    • tableEnv.fromDataStream() 和 tableEnv.toDataStream()
  • 通用变更日志流的更多可能性:
    • tableEnv.fromChangelogStream() 和 tableEnv.toChangelogStream()
  • 数据类型将自动转换:
    • TypeInformation (DataStream API) ↔ DataType (SQL / Table API)
    • 与用户定义函数类似的规则和优化选项

下面案例来自Flink Community Data Analytics Repository。显示了将 Flink 项目的 git 提交导入 Kafka 主题的作业。

  1. /**
  2. * Flink job that reads commits in the apache/flink Github repository (using the Github API) and
  3. * writes commit metadata to Kafka.
  4. */
  5. public class FlinkCommitsToKafka {
  6. public static final String APACHE_FLINK_REPOSITORY = "https://github.com/apache/flink.git";
  7. public static final String APACHE_FLINK_BRANCH = "refs/heads/master";
  8. public static void main(String[] args) {
  9. ParameterTool params = ParameterTool.fromArgs(args);
  10. // Sink
  11. String kafkaServer = params.get("kafka-server", "kafka.vvp.svc");
  12. String kafkaTopic = params.get("kafka-topic", "flink-commits");
  13. // Source
  14. long delayBetweenQueries = params.getLong("poll-interval-ms", 10_000L);
  15. String ignoreCommitsBefore = params.get("ignore-commits-before", null);
  16. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  18. env.getConfig().enableObjectReuse();
  19. DataStream<Commit> commits =
  20. env.addSource(getGitCommitSource(delayBetweenQueries, ignoreCommitsBefore))
  21. .name("flink-commit-source")
  22. .uid("flink-commit-source");
  23. tableEnv.executeSql(
  24. "CREATE TABLE commits (\n"
  25. + "`author` STRING,\n"
  26. + "`authorDate` TIMESTAMP(3),\n"
  27. + "`authorEmail` STRING,\n"
  28. + "`commitDate` TIMESTAMP(3),\n"
  29. + "`committer` STRING,\n"
  30. + "`committerEmail` STRING,\n"
  31. + "`filesChanged` ARRAY<ROW<filename STRING, linesAdded INT, linesChanged INT, linesRemoved INT>>,\n"
  32. + "`sha1` STRING,\n"
  33. + "`shortInfo` STRING\n"
  34. + ") WITH (\n"
  35. + "'connector' = 'kafka',\n"
  36. + "'topic' = '"
  37. + kafkaTopic
  38. + "',\n"
  39. + "'properties.bootstrap.servers' = '"
  40. + kafkaServer
  41. + "',\n"
  42. + "'properties.max.request.size' = '"
  43. + 5 * 1024 * 1024
  44. + "',"
  45. + "'format' = 'json'\n"
  46. + ")");
  47. Schema schema =
  48. Schema.newBuilder()
  49. .column("author", "STRING")
  50. .column("authorDate", "TIMESTAMP(3)")
  51. .column("authorEmail", "STRING")
  52. .column("commitDate", "TIMESTAMP(3)")
  53. .column("committer", "STRING")
  54. .column("committerEmail", "STRING")
  55. .column(
  56. "filesChanged",
  57. "ARRAY<ROW<filename STRING, linesAdded INT, linesChanged INT, linesRemoved INT>>")
  58. .column("sha1", "STRING")
  59. .column("shortInfo", "STRING")
  60. .build();
  61. tableEnv.fromDataStream(commits, schema).executeInsert("commits");
  62. }
  63. private static JGitCommitSource getGitCommitSource(
  64. final long delayBetweenQueries, final String ignoreCommitsBefore) {
  65. return new JGitCommitSource(
  66. APACHE_FLINK_REPOSITORY, APACHE_FLINK_BRANCH, ignoreCommitsBefore, delayBetweenQueries);
  67. }
  68. }

它从 DataStream API 的自定义源实现开始。注意,建议为所有DataStream操作设置uid;此处名称只是为了方便在 Web UI 和日志中设置。

从这个源数据流到 SQL 表的转换是通过 tableEnv.fromDataStream 完成的。此函数的几个重载均可以使用。这里显示的字段确保是使用的输出表的正确模式(列顺序和类型)中。还可以删除字段、添加计算字段或基于元数据的字段,以及定义 proctime 或 rowtime 属性(稍后会详细介绍)。
还可以通过 tableEnv.createTemporaryView() 直接将此表注册为临时视图,以便可以在任何 SQL 查询中按名称访问它。

executeInsert编译job grapgh,提交到 Flink 集群,并触发执行。结果将流式数据传输到声明的输出表。

Table Conversion → DataStream

仅插入流

  1. tableEnv.executeSql(
  2. "CREATE TABLE GeneratedTable ("
  3. + " name STRING,"
  4. + " score INT,"
  5. + " event_time TIMESTAMP_LTZ(3),"
  6. + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
  7. + ") WITH ( 'connector'='datagen' )");
  8. Table table = tableEnv.from("GeneratedTable");
  • 事件时间和watermark被转移到 DataStream API。
  • DataStream dataStream = tableEnv.toDataStream(table);
  • DataStream dataStream = tableEnv.toDataStream(table, User.class);

上面示例表示从 Table API 中开始而不是定义自己的 DataStream 源:利用 datagen SQL / Table API 连接器,随机生成数据。
默认情况下,表被转换为 Row 类的实例,但也可以让运行时将其转换为兼容的 POJO 或指定 DataType 定义来表达所需的流记录类型。
高级:您还可以通过定义适当的 DataType 来转换为内部数据结构,如下例所示:

  1. DataStream<RowData> resultStream =
  2. tableEnv.toDataStream(
  3. table,
  4. DataTypes.ROW(
  5. DataTypes.FIELD("category", DataTypes.STRING()),
  6. DataTypes.FIELD("avgPrice", DataTypes.DOUBLE()))
  7. .bridgedTo(RowData.class));

执行行为

  • DataStream API构建一个Flink job
    • StreamExecutionEnvironment.execute()提交pipeline,清理掉builder
  • Table API:

    • 源到单个(!)接收器管道通过 Table.execute() 或 Table.executeInsert() 执行
    • 管道写入多sinks可以这样创建和执行:
      1. tableEnv.createStatementSet()
      2. .addInsert("OutputTable", tableEnv.from("InputTable"))
      3. .addInsert("OutputTable2", tableEnv.from("InputTable"))
      4. .execute()
  • 组合API

    • 转换为 DataStream,编译 Table API 部分并将它们添加到 DataStream 构建器。
      • 只有在 DataStream API 中的执行才会触发运行这些部分!

注意:StreamExecutionEnvironment.execute() 和 DataStream.executeAndCollect() 在管道执行方面表现出相同的行为。后者只增加了一个额外的接收器来收集结果回本地客户端。

DataStream ↔ Table Conversion

提供Schema

  • 一些转换函数允许指定schema
    • 丰富列数据类型
    • 增加时间属性、watermarks策略、计算列、主键
  • 举例
    1. Schema schema =
    2. Schema.newBuilder()
    3. .column("a", DataTypes.STRING())
    4. .column("b", DataTypes.INT())
    5. .columnByExpression("c", $("b").times(lit(2)))
    6. .column("rowtime", DataTypes.TIMESTAMP_LTZ(3))
    7. .watermark("rowtime", "`rowtime` - INTERVAL '10' SECOND")
    8. .build();
    可以声明 DataTypes 并使用如图所示的 Table API 表达式,或者使用 SQL 表达式并让运行时将其转换为内部数据结构。

另一个以事件时间和watermarks为例:

  1. Schema schema =
  2. Schema.newBuilder()
  3. .column("a", DataTypes.STRING())
  4. .column("b", DataTypes.INT())
  5. .columnByExpression("c", $("b").times(lit(2)))
  6. .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
  7. .watermark("rowtime", "SOURCE_WATERMARK()")
  8. .build();

特殊的SOURCE_WATERMARK() watermark 策略结合 columnByMetadata ,可以从 DataStream API 继承事件时间和水印。需要在转换的 DataStream 管道中设置watermark策略。

  1. /** Simple Table API <-> DataStream conversion example using changelog streams. */
  2. public class FlinkTableChangelogStreamExample extends AbstractTableTestBase {
  3. public static void main(String[] args) throws Exception {
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. // configure environment as needed
  6. env.setParallelism(4);
  7. env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
  8. env.setStateBackend(new EmbeddedRocksDBStateBackend());
  9. env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
  10. // create table environment
  11. EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
  12. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
  13. DataStreamSource<Row> changelogStream =
  14. env.fromElements(
  15. Row.ofKind(RowKind.INSERT, "john", 32),
  16. Row.ofKind(RowKind.UPDATE_BEFORE, "john", 32),
  17. Row.ofKind(RowKind.UPDATE_AFTER, "john", 33),
  18. Row.ofKind(RowKind.INSERT, "alice", 33));
  19. tEnv.createTemporaryView("input", tEnv.fromChangelogStream(changelogStream).as("name", "age"));
  20. Table resultTable = tEnv.sqlQuery("SELECT age, LISTAGG(DISTINCT name) FROM input GROUP BY age");
  21. DataStream<Row> resultStream = tEnv.toChangelogStream(resultTable);
  22. resultStream.print();
  23. env.execute();
  24. }
  25. }

可以在 fromChangelogStream() 内部提供一个 Schema(类似于 toChangelogStream()),而不是在创建更改日志流之后重命名字段。
与 fromDataStream() 一样,默认情况下不会传播事件时间和水印,并且需要提供具有适当字段的 Schema。
在这两种转换中,还可以选择提供一个 ChangelogMode 来定义 Table API 规划器必须从这个 changelog 流中期望的更改,例如:

  • ChangelogMode.upsert()
  • ChangelogMode.insertOnly()

请注意,对于 toChangelogStream(),存在几个重载函数来提供手动 Schema 定义和通过 ChangelogMode 进行的预期更改。
传播事件时间和watermark,需要转换的表具有时间戳属性。

故障排除

常见问题:

  • 从 DataStream 转换为 Table 会导致 RAW 类型的单个列
    • 这通常表明 DataStream API 中的 TypeInformation 有问题
    • 可以通过例如 dataStream.returns() 指定正确的 TypeInformation 来修复
    • 由例如 StreamExecutionEnvironment.fromElements() 未正确派生类型引起