CreateTableEnv
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
public class CreateTableEnv {
public static void main(String[] args) {
// **********************
// FLINK STREAMING QUERY
// **********************
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// ******************
// FLINK BATCH QUERY
// ******************
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK STREAMING QUERY
// **********************
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
}
}
Register a Table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// table is the result of a simple projection query
Table projTable = tableEnv.scan("table").select("");
// register the Table projTable as table "projectedTable"
tableEnv.registerTable("projectedTable", projTable);
Table resultS = tableEnv.scan("").select(""); //扫描
tableEnv.sqlQuery(""); //查询
tableEnv.fromDataStream(null); //序列化后
tableEnv.fromDataStream(null, ""); //字段映射
tableEnv.registerDataStream("myTable",null); //序列化后
tableEnv.registerDataStream("myTable2", null, "myLong, myString"); //字段映射
resultS.insertInto("CsvSinkTable");
Register a TableSource
CsvTableSource path = new CsvTableSource("Path", new String[]{"a1", "a2", "a3"}, null);
tableEnv.registerTableSource("", path);
Register a TableSink
tableEnv.registerTableSink(null, new CsvTableSink("", null));
// create a TableSink
TableSink csvSink = new CsvTableSink("/path/to/file", null);
// define the field names and types
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
Register an External Catalog
// create an external catalog
ExternalCatalog catalog = new InMemoryExternalCatalog("catalog");
// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog);
Query a Table
Table API
// register Orders table
tableEnv.registerTableSource("Orders", new CsvTableSource("", null, null));
// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenueTable = orders
//.filter("cCountry === 'FRANCE'")
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
// emit or convert Table
// execute query
SQL
// register Orders table
// compute revenue for all customers from France
Table revenueSql = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
- The following example shows how to specify an update query that inserts its result into a registered table. ```java // get a TableEnvironment TableEnvironment tableEnv = …; // see “Create a TableEnvironment” section
// register “Orders” table // register “RevenueFrance” output table
// compute revenue for all customers from France and emit to “RevenueFrance” tableEnv.sqlUpdate( “INSERT INTO RevenueFrance “ + “SELECT cID, cName, SUM(revenue) AS revSum “ + “FROM Orders “ + “WHERE cCountry = ‘FRANCE’ “ + “GROUP BY cID, cName” );
// execute query
<a name="register-a-datastream-or-dataset-as-table"></a>
### Register a DataStream or DataSet as Table
```java
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);
// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
Convert a DataStream or DataSet into a Table
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
Convert a Table into a DataStream or DataSet(Table To DataStream)
- Row: 字段按位置,任意数量的字段,对
null
值的支持,没有类型安全的访问进行映射 - POJO: 字段按名称映射(POJO字段必须命名为
Table
字段),任意数量的字段,支持null
值,类型安全访问 - Case Class: 字段按位置映射,不支持
null
值,类型安全访问 - Tuple: 按位置映射字段,限制为22(Scala)或25(Java)字段,不支持
null
值,类型安全访问 Atomic Type:
Table
必须具有单个字段,不支持null
值,类型安全访问Convert a Table into a DataStream
Append Mode: Insert
Retract Mode: Delete
Table resultS = tableEnv.scan("").select(""); // convert the Table into an append DataStream of Row by specifying the class tableEnv.toAppendStream(resultS, Row.class); // convert the Table into an append DataStream of Tuple2<String, Integer> via a TypeInformation TupleTypeInfo<Tuple2<String, Integer>> typeInfo = new TupleTypeInfo<>( //BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO Types.STRING, Types.INT); DataStream<Tuple2<String, Integer>> tuple2DataStream = tableEnv.toAppendStream(resultS, typeInfo); // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(resultS, Row.class);