v1.14.0

Both Table API and DataStream API are equally important when it comes to defining a data processing pipeline.
在定义数据处理管道时,Table API 和 DataStream API 同样重要。

The DataStream API offers the primitives of stream processing (namely time, state, and dataflow management) in a relatively low-level imperative programming API. The Table API abstracts away many internals and provides a structured and declarative API.
DataStream API 在相对低级的命令式编程 API 中提供流处理的原语(即时间、状态和数据流管理)。Table API 抽象了许多内部结构,并提供了结构化和声明性的 API。

Both APIs can work with bounded and unbounded streams.
这两个 API 都可以处理有界无界流。

Bounded streams need to be managed when processing historical data. Unbounded streams occur in real-time processing scenarios that might be initialized with historical data first.
处理历史数据时需要对有界流进行管理。无界流出现在可能首先用历史数据初始化的实时处理场景中。

For efficient execution, both APIs offer processing bounded streams in an optimized batch execution mode. However, since batch is just a special case of streaming, it is also possible to run pipelines of bounded streams in regular streaming execution mode.
为了高效执行,这两个 API 都以优化的批处理执行模式处理有界流。但是,由于批处理只是流的一种特殊情况,因此也可以在常规流执行模式下运行有界流的管道。

Pipelines in one API can be defined end-to-end without dependencies on the other API. However, it might be useful to mix both APIs for various reasons:
一个 API 中的管道可以端到端地定义,而不依赖于另一个 API。但是,出于各种原因,混合使用这两种 API 可能会很有用:

  • Use the table ecosystem for accessing catalogs or connecting to external systems easily, before implementing the main pipeline in DataStream API.
  • 在 DataStream API 中实现主管道之前,使用表生态系统轻松访问目录或连接到外部系统。

  • Access some of the SQL functions for stateless data normalization and cleansing, before implementing the main pipeline in DataStream API.

  • 在 DataStream API 中实现主管道之前,访问一些用于无状态数据规范化和清理的 SQL 函数。

  • Switch to DataStream API every now and then if a more low-level operation (e.g. custom timer handling) is not present in Table API.

  • 如果 Table API 中不存在更底层的操作(例如自定义计时器处理),则时不时地切换到 DataStream API。

Flink provides special bridging functionalities to make the integration with DataStream API as smooth as possible.
Flink 提供了特殊的桥接功能,使与 DataStream API 的集成尽可能顺畅。

Switching between DataStream and Table API adds some conversion overhead. For example, internal data structures of the table runtime (i.e. RowData) that partially work on binary data need to be converted to more user-friendly data structures (i.e. Row). Usually, this overhead can be neglected but is mentioned here for completeness. 在 DataStream 和 Table API 之间切换会增加一些转换开销。例如,RowData部分处理二进制数据的表运行时的内部数据结构(即)需要转换为对用户更友好的数据结构(即Row)。通常,可以忽略此开销,但为了完整性在此处提及。

DataStream 和 Table 之间的转换

Flink provides a specialized StreamTableEnvironment for integrating with the DataStream API. Those environments extend the regular TableEnvironment with additional methods and take the StreamExecutionEnvironment used in the DataStream API as a parameter.
Flink 提供了专门StreamTableEnvironment用于与 DataStream 集成的 API。这些TableEnvironment使用其他方法扩展常规TableEnvironment,并在 DataStream API 中使用 StreamExecutionEnvironment 用作参数。

The following code shows an example of how to go back and forth between the two APIs. Column names and types of the Table are automatically derived from the TypeInformation of the DataStream. Since the DataStream API does not support changelog processing natively, the code assumes append-only/insert-only semantics during the stream-to-table and table-to-stream conversion.
以下代码显示了如何在两个 API 之间来回切换的示例。Table 的列名和类型自动派生自DataStream的TypeInformation。由于 DataStream API 本身不支持更改日志处理,因此代码在流到表和表到流转换期间假定仅追加/仅插入语义。

  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.Table;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. import org.apache.flink.types.Row;
  6. // create environments of both APIs
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  9. // create a DataStream
  10. DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
  11. // interpret the insert-only DataStream as a Table
  12. Table inputTable = tableEnv.fromDataStream(dataStream);
  13. // register the Table object as a view and query it
  14. tableEnv.createTemporaryView("InputTable", inputTable);
  15. Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");
  16. // interpret the insert-only Table as a DataStream again
  17. DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
  18. // add a printing sink and execute in DataStream API
  19. resultStream.print();
  20. env.execute();
  21. // prints:
  22. // +I[Alice]
  23. // +I[Bob]
  24. // +I[John]

The complete semantics of fromDataStream and toDataStream can be found in the dedicated section below. In particular, the section discusses how to influence the schema derivation with more complex and nested types. It also covers working with event-time and watermarks.
完整的fromDataStreamtoDataStream语义可在以下专用部分找到。特别是,本节讨论了如何使用更复杂和嵌套的类型影响模式派生。它还包括使用 事件时间 和 水印。

Depending on the kind of query, in many cases the resulting dynamic table is a pipeline that does not only produce insert-only changes when converting the Table to a DataStream but also produces retractions and other kinds of updates. During table-to-stream conversion, this could lead to an exception similar to
根据查询的类型,在许多情况下,生成的动态表是一个管道,它不仅在将 Table 转换为 DataStream 时产生仅插入更改,还会产生撤回和其他类型的更新。在 表转流 转换期间,这可能会导致类似于以下异常:

  1. Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].

in which case one needs to revise the query again or switch to toChangelogStream.
在这种情况下,需要再次修改查询或切换到 toChangelogStream.

The following example shows how updating tables can be converted. Every result row represents an entry in a changelog with a change flag that can be queried by calling row.getKind() on it. In the example, the second score for Alice creates an update before (-U) and update after (+U) change.
以下示例显示了如何更新 可转换的表。每个结果行代表一个变更日志中的一个条目,带有一个变更标志,可以通过调用row.getKind()来查询。在该示例中,对于第二得分Alice创建更新之前(-U)和后更新(+U)的改变。

  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.Table;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. import org.apache.flink.types.Row;
  6. // create environments of both APIs
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  9. // create a DataStream
  10. DataStream<Row> dataStream = env.fromElements(
  11. Row.of("Alice", 12),
  12. Row.of("Bob", 10),
  13. Row.of("Alice", 100));
  14. // interpret the insert-only DataStream as a Table
  15. Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
  16. // register the Table object as a view and query it
  17. // the query contains an aggregation that produces updates
  18. tableEnv.createTemporaryView("InputTable", inputTable);
  19. Table resultTable = tableEnv.sqlQuery(
  20. "SELECT name, SUM(score) FROM InputTable GROUP BY name");
  21. // interpret the updating Table as a changelog DataStream
  22. DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
  23. // add a printing sink and execute in DataStream API
  24. resultStream.print();
  25. env.execute();
  26. // prints:
  27. // +I[Alice, 12]
  28. // +I[Bob, 10]
  29. // -U[Alice, 12]
  30. // +U[Alice, 112]

The complete semantics of fromChangelogStream and toChangelogStream can be found in the dedicated section below. In particular, the section discusses how to influence the schema derivation with more complex and nested types. It covers working with event-time and watermarks. It discusses how to declare a primary key and changelog mode for the input and output streams.
完整的fromChangelogStreamtoChangelogStream语义可在以下专用部分发现。特别是,本节讨论了如何使用更复杂和嵌套的类型影响模式派生。它涵盖了事件时间和水印的使用。它讨论了如何为输入和输出流声明主键和更改日志模式。

The example above shows how the final result is computed incrementally by continuously emitting row-wise updates for each incoming record. However, in cases where the input streams are finite (i.e. bounded), a result can be computed more efficiently by leveraging batch processing principles.
上面的示例显示了如何通过为每个传入记录连续发出逐行更新来逐步计算最终结果。然而,在输入流有限(即有界)的情况下,可以通过利用批处理原理更有效地计算结果。

In batch processing, operators can be executed in successive stages that consume the entire input table before emitting results. For example, a join operator can sort both bounded inputs before performing the actual joining (i.e. sort-merge join algorithm), or build a hash table from one input before consuming the other (i.e. build/probe phase of the hash join algorithm).
在批处理中,操作符可以在连续的阶段执行,在发出结果之前消耗整个输入表。例如,join 运算符可以在执行实际连接之前对两个有界输入进行排序(即排序-合并连接算法),或者在使用另一个输入之前从一个输入构建散列表(即散列连接算法的构建/探测阶段)。

Both DataStream API and Table API offer a specialized batch runtime mode.
DataStream API 和 Table API 都提供专门的批处理运行时模式

The following example illustrates that the unified pipeline is able to process both batch and streaming data by just switching a flag.
下面的例子说明了统一管道能够通过切换一个标志来处理批处理和流数据。

  1. import org.apache.flink.api.common.RuntimeExecutionMode;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  4. // setup DataStream API
  5. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. // set the batch runtime mode
  7. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  8. // uncomment this for streaming mode
  9. // env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
  10. // setup Table API
  11. // the table environment adopts the runtime mode during initialization
  12. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  13. // define the same pipeline as above
  14. // prints in BATCH mode:
  15. // +I[Bob, 10]
  16. // +I[Alice, 112]
  17. // prints in STREAMING mode:
  18. // +I[Alice, 12]
  19. // +I[Bob, 10]
  20. // -U[Alice, 12]
  21. // +U[Alice, 112]

Once the changelog is applied to an external system (e.g. a key-value store), one can see that both modes are able to produce exactly the same output table. By consuming all input data before emitting results, the changelog of the batch mode consists solely of insert-only changes. See also the dedicated batch mode section below for more insights.
一旦将变更日志应用于外部系统(例如键值存储),就可以看到两种模式都能够生成完全相同的输出表。通过在发出结果之前消耗所有输入数据,批处理模式的更改日志仅包含插入更改。另请参阅下文专用批处理模式部分以获取更多见解。