API整合
Table API
- 用于流和批处理的统一关系 API
- SQL 语言的超集
- 许多概念和部分与 Flink SQL 共享
- 适用于 Java、Scala 和 Python
- 表 API 查询是用编程语言定义的(相对于 SQL 字符串)
TableEnvironment tEnv = TableEnvironment.create(settings);
<a name="pX6fc"></a>
### 纯Table API程序与DataStream API整合
```java
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode() // not yet supported: .inBatchMode()
.build();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, settings);
Table API example
/** Example Table API program. */
public class FlinkTableExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10_000);
env.getConfig().enableObjectReuse();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
//将connector数据转化为table
tEnv.executeSql(
"CREATE TABLE Orders (\n"
+ " `a` STRING,\n"
+ " `b` NUMERIC,\n"
+ " `rowtime` TIMESTAMP(3),\n"
+ " WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '10' SECOND"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '1'\n"
+ ")");
Table orders = tEnv.from("Orders");
Table result =
orders
.filter(and($("a").isNotNull(), $("b").isNotNull()))
.select($("a").lowerCase().as("a"), $("b"), $("rowtime"))
.window(Tumble.over(lit(1).minutes()).on($("rowtime")).as("minuteWindow"))
.groupBy($("minuteWindow"), $("a"))
.select(
$("a"), $("minuteWindow").end().as("minute"), $("b").avg().as("avgBillingAmount"));
result.execute().print();
}
}
如果将 Table API 初始化与 DataStream 互操作性一起使用,我们还可以通过在代码里微调环境配置,例如检查点或对象重用,如上所示。确保在实例化 StreamTableEnvironment 之前进行配置,因为 Flink 不保证选项会在实例化之后转发。
在某些情况下,对象重用通过在堆上分配更少的对象来提高效率。与 DataStream API 相比,只要留在 Table API / SQL 生态系统中,就可以不受限制地启用它。
如果需要创建表,其引用位于其中一个可用连接器,最简单的选择是通过 SQL 表 DDL。
如果使用的编目(catalog)已经暴露了需要使用的表,可以通过 tEnv.from(
与表DDL相比,在编目中创建表(例如默认内存编目)相当繁琐。例如:
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.STRING())
.column("b", DataTypes.INT())
.column("rowtime", DataTypes.TIMESTAMP_LTZ(3))
.watermark("rowtime", $("rowtime").minus(lit(10).seconds()))
.build();
TableDescriptor tableDescriptor =
TableDescriptor.forConnector("datagen")
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 1L)
.schema(schema)
.build();
tEnv.createTable("Orders", tableDescriptor);
前面的语句声明了从源(输入表)到接收器(输出表)的管道,即最终将处理数据的 Flink 作业。调用 execute 将校验、转换、提交、运行作业,并收集可以打印的结果。
在实际工作中,可能更希望调用 executeInsert(
通过 tEnv.createStatementSet() 的语句集允许将多个插入包装到(可能)不同的输出表中。
DataStream API集成
SQL / Table API vs. DataStream API
- DataStream API
- 经典的核心流处理使用场景
- 显式控制流、状态和时间的原语
- 复杂的计算和定制
- 目标:最大化性能和可靠性
- SQL / Table API
- 可以使用高阶API, SQL 建模
- 专注于逻辑,而不是实现
- 混合工作负载(批处理和流式处理)
- 目标:最大化开发人员的速度和自主权
在一个 API 下将批处理和流结合在一起一直是 SQL / Table API 的强项。但是,DataStream API 也正在通过批处理模式执行进行扩展(以前仅在 DataSet API 中可用)并且最终可能会赶上。
混合SQL / Table and DataStream API
混合使用两种API可能更有利的场景。
- 从 Table / SQL 生态系统开始,用于访问catalog和外部系统
- 在 DataStream API 中继续
- 使用 SQL / Table API 函数进行无状态数据规范化和清理
- 在 DataStream API 中继续
- 使用 SQL / Table API,但是
- 切换到 DataStream API 进行低阶操作,例如自定义计时器
- 在DataStream API 中开始和/或结束时,自定义连接器。
- 但是为简单起见,在 SQL / Table API 中实现主管道运算
使用一种API中的管道可以完成端到端定义与运算,而无需其他类型API。但是,由于各种原因,混合两个API可能很有用。
DataStream ↔ Table Conversion
- StreamTableEnvironment使用集成选项扩展TableEnvironment。
- TableEnvironment对仅插入流的简单处理
- tableEnv.fromDataStream() 和 tableEnv.toDataStream()
- 通用变更日志流的更多可能性:
- tableEnv.fromChangelogStream() 和 tableEnv.toChangelogStream()
- 数据类型将自动转换:
- TypeInformation (DataStream API) ↔ DataType (SQL / Table API)
- 与用户定义函数类似的规则和优化选项
下面案例来自Flink Community Data Analytics Repository。显示了将 Flink 项目的 git 提交导入 Kafka 主题的作业。
/**
* Flink job that reads commits in the apache/flink Github repository (using the Github API) and
* writes commit metadata to Kafka.
*/
public class FlinkCommitsToKafka {
public static final String APACHE_FLINK_REPOSITORY = "https://github.com/apache/flink.git";
public static final String APACHE_FLINK_BRANCH = "refs/heads/master";
public static void main(String[] args) {
ParameterTool params = ParameterTool.fromArgs(args);
// Sink
String kafkaServer = params.get("kafka-server", "kafka.vvp.svc");
String kafkaTopic = params.get("kafka-topic", "flink-commits");
// Source
long delayBetweenQueries = params.getLong("poll-interval-ms", 10_000L);
String ignoreCommitsBefore = params.get("ignore-commits-before", null);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.getConfig().enableObjectReuse();
DataStream<Commit> commits =
env.addSource(getGitCommitSource(delayBetweenQueries, ignoreCommitsBefore))
.name("flink-commit-source")
.uid("flink-commit-source");
tableEnv.executeSql(
"CREATE TABLE commits (\n"
+ "`author` STRING,\n"
+ "`authorDate` TIMESTAMP(3),\n"
+ "`authorEmail` STRING,\n"
+ "`commitDate` TIMESTAMP(3),\n"
+ "`committer` STRING,\n"
+ "`committerEmail` STRING,\n"
+ "`filesChanged` ARRAY<ROW<filename STRING, linesAdded INT, linesChanged INT, linesRemoved INT>>,\n"
+ "`sha1` STRING,\n"
+ "`shortInfo` STRING\n"
+ ") WITH (\n"
+ "'connector' = 'kafka',\n"
+ "'topic' = '"
+ kafkaTopic
+ "',\n"
+ "'properties.bootstrap.servers' = '"
+ kafkaServer
+ "',\n"
+ "'properties.max.request.size' = '"
+ 5 * 1024 * 1024
+ "',"
+ "'format' = 'json'\n"
+ ")");
Schema schema =
Schema.newBuilder()
.column("author", "STRING")
.column("authorDate", "TIMESTAMP(3)")
.column("authorEmail", "STRING")
.column("commitDate", "TIMESTAMP(3)")
.column("committer", "STRING")
.column("committerEmail", "STRING")
.column(
"filesChanged",
"ARRAY<ROW<filename STRING, linesAdded INT, linesChanged INT, linesRemoved INT>>")
.column("sha1", "STRING")
.column("shortInfo", "STRING")
.build();
tableEnv.fromDataStream(commits, schema).executeInsert("commits");
}
private static JGitCommitSource getGitCommitSource(
final long delayBetweenQueries, final String ignoreCommitsBefore) {
return new JGitCommitSource(
APACHE_FLINK_REPOSITORY, APACHE_FLINK_BRANCH, ignoreCommitsBefore, delayBetweenQueries);
}
}
它从 DataStream API 的自定义源实现开始。注意,建议为所有DataStream操作设置uid;此处名称只是为了方便在 Web UI 和日志中设置。
从这个源数据流到 SQL 表的转换是通过 tableEnv.fromDataStream 完成的。此函数的几个重载均可以使用。这里显示的字段确保是使用的输出表的正确模式(列顺序和类型)中。还可以删除字段、添加计算字段或基于元数据的字段,以及定义 proctime 或 rowtime 属性(稍后会详细介绍)。
还可以通过 tableEnv.createTemporaryView() 直接将此表注册为临时视图,以便可以在任何 SQL 查询中按名称访问它。
executeInsert编译job grapgh,提交到 Flink 集群,并触发执行。结果将流式数据传输到声明的输出表。
Table Conversion → DataStream
仅插入流
tableEnv.executeSql(
"CREATE TABLE GeneratedTable ("
+ " name STRING,"
+ " score INT,"
+ " event_time TIMESTAMP_LTZ(3),"
+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
+ ") WITH ( 'connector'='datagen' )");
Table table = tableEnv.from("GeneratedTable");
- 事件时间和watermark被转移到 DataStream API。
- DataStream
dataStream = tableEnv.toDataStream(table);
- DataStream
dataStream = tableEnv.toDataStream(table, User.class);
上面示例表示从 Table API 中开始而不是定义自己的 DataStream 源:利用 datagen SQL / Table API 连接器,随机生成数据。
默认情况下,表被转换为 Row 类的实例,但也可以让运行时将其转换为兼容的 POJO 或指定 DataType 定义来表达所需的流记录类型。
高级:您还可以通过定义适当的 DataType 来转换为内部数据结构,如下例所示:
DataStream<RowData> resultStream =
tableEnv.toDataStream(
table,
DataTypes.ROW(
DataTypes.FIELD("category", DataTypes.STRING()),
DataTypes.FIELD("avgPrice", DataTypes.DOUBLE()))
.bridgedTo(RowData.class));
执行行为
- DataStream API构建一个Flink job
- StreamExecutionEnvironment.execute()提交pipeline,清理掉builder
Table API:
- 源到单个(!)接收器管道通过 Table.execute() 或 Table.executeInsert() 执行
- 管道写入多sinks可以这样创建和执行:
tableEnv.createStatementSet()
.addInsert("OutputTable", tableEnv.from("InputTable"))
.addInsert("OutputTable2", tableEnv.from("InputTable"))
.execute()
组合API
- 转换为 DataStream,编译 Table API 部分并将它们添加到 DataStream 构建器。
- 只有在 DataStream API 中的执行才会触发运行这些部分!
- 转换为 DataStream,编译 Table API 部分并将它们添加到 DataStream 构建器。
注意:StreamExecutionEnvironment.execute() 和 DataStream.executeAndCollect() 在管道执行方面表现出相同的行为。后者只增加了一个额外的接收器来收集结果回本地客户端。
DataStream ↔ Table Conversion
提供Schema
- 一些转换函数允许指定schema
- 丰富列数据类型
- 增加时间属性、watermarks策略、计算列、主键
- 举例
可以声明 DataTypes 并使用如图所示的 Table API 表达式,或者使用 SQL 表达式并让运行时将其转换为内部数据结构。Schema schema =
Schema.newBuilder()
.column("a", DataTypes.STRING())
.column("b", DataTypes.INT())
.columnByExpression("c", $("b").times(lit(2)))
.column("rowtime", DataTypes.TIMESTAMP_LTZ(3))
.watermark("rowtime", "`rowtime` - INTERVAL '10' SECOND")
.build();
另一个以事件时间和watermarks为例:
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.STRING())
.column("b", DataTypes.INT())
.columnByExpression("c", $("b").times(lit(2)))
.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
.watermark("rowtime", "SOURCE_WATERMARK()")
.build();
特殊的SOURCE_WATERMARK() watermark 策略结合 columnByMetadata ,可以从 DataStream API 继承事件时间和水印。需要在转换的 DataStream 管道中设置watermark策略。
/** Simple Table API <-> DataStream conversion example using changelog streams. */
public class FlinkTableChangelogStreamExample extends AbstractTableTestBase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure environment as needed
env.setParallelism(4);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
// create table environment
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
DataStreamSource<Row> changelogStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "john", 32),
Row.ofKind(RowKind.UPDATE_BEFORE, "john", 32),
Row.ofKind(RowKind.UPDATE_AFTER, "john", 33),
Row.ofKind(RowKind.INSERT, "alice", 33));
tEnv.createTemporaryView("input", tEnv.fromChangelogStream(changelogStream).as("name", "age"));
Table resultTable = tEnv.sqlQuery("SELECT age, LISTAGG(DISTINCT name) FROM input GROUP BY age");
DataStream<Row> resultStream = tEnv.toChangelogStream(resultTable);
resultStream.print();
env.execute();
}
}
可以在 fromChangelogStream() 内部提供一个 Schema(类似于 toChangelogStream()),而不是在创建更改日志流之后重命名字段。
与 fromDataStream() 一样,默认情况下不会传播事件时间和水印,并且需要提供具有适当字段的 Schema。
在这两种转换中,还可以选择提供一个 ChangelogMode 来定义 Table API 规划器必须从这个 changelog 流中期望的更改,例如:
- ChangelogMode.upsert()
- ChangelogMode.insertOnly()
请注意,对于 toChangelogStream(),存在几个重载函数来提供手动 Schema 定义和通过 ChangelogMode 进行的预期更改。
传播事件时间和watermark,需要转换的表具有时间戳属性。
故障排除
- 查看文档Flink docs for the DataStream API integration。
- 有广泛的描述和
- 很多例子可以帮助理解有问题的用例
常见问题:
- 从 DataStream 转换为 Table 会导致 RAW 类型的单个列
- 这通常表明 DataStream API 中的 TypeInformation 有问题
- 可以通过例如 dataStream.returns() 指定正确的 TypeInformation 来修复
- 由例如 StreamExecutionEnvironment.fromElements() 未正确派生类型引起