CreateTableEnv
import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.BatchTableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;public class CreateTableEnv {public static void main(String[] args) {// **********************// FLINK STREAMING QUERY// **********************EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// ******************// FLINK BATCH QUERY// ******************ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);// **********************// BLINK STREAMING QUERY// **********************StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);// ******************// BLINK BATCH QUERY// ******************EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);}}
Register a Table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// table is the result of a simple projection queryTable projTable = tableEnv.scan("table").select("");// register the Table projTable as table "projectedTable"tableEnv.registerTable("projectedTable", projTable);Table resultS = tableEnv.scan("").select(""); //扫描tableEnv.sqlQuery(""); //查询tableEnv.fromDataStream(null); //序列化后tableEnv.fromDataStream(null, ""); //字段映射tableEnv.registerDataStream("myTable",null); //序列化后tableEnv.registerDataStream("myTable2", null, "myLong, myString"); //字段映射resultS.insertInto("CsvSinkTable");
Register a TableSource
CsvTableSource path = new CsvTableSource("Path", new String[]{"a1", "a2", "a3"}, null);tableEnv.registerTableSource("", path);
Register a TableSink
tableEnv.registerTableSink(null, new CsvTableSink("", null));// create a TableSinkTableSink csvSink = new CsvTableSink("/path/to/file", null);// define the field names and typesString[] fieldNames = {"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};// register the TableSink as table "CsvSinkTable"tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
Register an External Catalog
// create an external catalogExternalCatalog catalog = new InMemoryExternalCatalog("catalog");// register the ExternalCatalog catalogtableEnv.registerExternalCatalog("InMemCatalog", catalog);
Query a Table
Table API
// register Orders tabletableEnv.registerTableSource("Orders", new CsvTableSource("", null, null));// scan registered Orders tableTable orders = tableEnv.scan("Orders");// compute revenue for all customers from FranceTable revenueTable = orders//.filter("cCountry === 'FRANCE'").filter("cCountry === 'FRANCE'").groupBy("cID, cName").select("cID, cName, revenue.sum AS revSum");// emit or convert Table// execute query
SQL
// register Orders table// compute revenue for all customers from FranceTable revenueSql = tableEnv.sqlQuery("SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders " +"WHERE cCountry = 'FRANCE' " +"GROUP BY cID, cName");// emit or convert Table// execute query
- The following example shows how to specify an update query that inserts its result into a registered table. ```java // get a TableEnvironment TableEnvironment tableEnv = …; // see “Create a TableEnvironment” section
// register “Orders” table // register “RevenueFrance” output table
// compute revenue for all customers from France and emit to “RevenueFrance” tableEnv.sqlUpdate( “INSERT INTO RevenueFrance “ + “SELECT cID, cName, SUM(revenue) AS revSum “ + “FROM Orders “ + “WHERE cCountry = ‘FRANCE’ “ + “GROUP BY cID, cName” );
// execute query
<a name="register-a-datastream-or-dataset-as-table"></a>
### Register a DataStream or DataSet as Table
```java
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);
// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
Convert a DataStream or DataSet into a Table
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
Convert a Table into a DataStream or DataSet(Table To DataStream)
- Row: 字段按位置,任意数量的字段,对
null值的支持,没有类型安全的访问进行映射 - POJO: 字段按名称映射(POJO字段必须命名为
Table字段),任意数量的字段,支持null值,类型安全访问 - Case Class: 字段按位置映射,不支持
null值,类型安全访问 - Tuple: 按位置映射字段,限制为22(Scala)或25(Java)字段,不支持
null值,类型安全访问 Atomic Type:
Table必须具有单个字段,不支持null值,类型安全访问Convert a Table into a DataStream
Append Mode: Insert
Retract Mode: Delete
Table resultS = tableEnv.scan("").select(""); // convert the Table into an append DataStream of Row by specifying the class tableEnv.toAppendStream(resultS, Row.class); // convert the Table into an append DataStream of Tuple2<String, Integer> via a TypeInformation TupleTypeInfo<Tuple2<String, Integer>> typeInfo = new TupleTypeInfo<>( //BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO Types.STRING, Types.INT); DataStream<Tuple2<String, Integer>> tuple2DataStream = tableEnv.toAppendStream(resultS, typeInfo); // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(resultS, Row.class);
