CreateTableEnv

  1. import org.apache.flink.api.java.ExecutionEnvironment;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.EnvironmentSettings;
  4. import org.apache.flink.table.api.TableEnvironment;
  5. import org.apache.flink.table.api.java.BatchTableEnvironment;
  6. import org.apache.flink.table.api.java.StreamTableEnvironment;
  7. public class CreateTableEnv {
  8. public static void main(String[] args) {
  9. // **********************
  10. // FLINK STREAMING QUERY
  11. // **********************
  12. EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
  15. // ******************
  16. // FLINK BATCH QUERY
  17. // ******************
  18. ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
  19. BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
  20. // **********************
  21. // BLINK STREAMING QUERY
  22. // **********************
  23. StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  24. EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  25. StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
  26. // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
  27. // ******************
  28. // BLINK BATCH QUERY
  29. // ******************
  30. EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
  31. TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
  32. }
  33. }

Register a Table

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useOldPlanner().build();
  3. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
  4. // table is the result of a simple projection query
  5. Table projTable = tableEnv.scan("table").select("");
  6. // register the Table projTable as table "projectedTable"
  7. tableEnv.registerTable("projectedTable", projTable);
  8. Table resultS = tableEnv.scan("").select(""); //扫描
  9. tableEnv.sqlQuery(""); //查询
  10. tableEnv.fromDataStream(null); //序列化后
  11. tableEnv.fromDataStream(null, ""); //字段映射
  12. tableEnv.registerDataStream("myTable",null); //序列化后
  13. tableEnv.registerDataStream("myTable2", null, "myLong, myString"); //字段映射
  14. resultS.insertInto("CsvSinkTable");

Register a TableSource

  1. CsvTableSource path = new CsvTableSource("Path", new String[]{"a1", "a2", "a3"}, null);
  2. tableEnv.registerTableSource("", path);

Register a TableSink

  1. tableEnv.registerTableSink(null, new CsvTableSink("", null));
  2. // create a TableSink
  3. TableSink csvSink = new CsvTableSink("/path/to/file", null);
  4. // define the field names and types
  5. String[] fieldNames = {"a", "b", "c"};
  6. TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
  7. // register the TableSink as table "CsvSinkTable"
  8. tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);

Register an External Catalog

  1. // create an external catalog
  2. ExternalCatalog catalog = new InMemoryExternalCatalog("catalog");
  3. // register the ExternalCatalog catalog
  4. tableEnv.registerExternalCatalog("InMemCatalog", catalog);

Query a Table

Table API

  1. // register Orders table
  2. tableEnv.registerTableSource("Orders", new CsvTableSource("", null, null));
  3. // scan registered Orders table
  4. Table orders = tableEnv.scan("Orders");
  5. // compute revenue for all customers from France
  6. Table revenueTable = orders
  7. //.filter("cCountry === 'FRANCE'")
  8. .filter("cCountry === 'FRANCE'")
  9. .groupBy("cID, cName")
  10. .select("cID, cName, revenue.sum AS revSum");
  11. // emit or convert Table
  12. // execute query

SQL

  1. // register Orders table
  2. // compute revenue for all customers from France
  3. Table revenueSql = tableEnv.sqlQuery(
  4. "SELECT cID, cName, SUM(revenue) AS revSum " +
  5. "FROM Orders " +
  6. "WHERE cCountry = 'FRANCE' " +
  7. "GROUP BY cID, cName"
  8. );
  9. // emit or convert Table
  10. // execute query
  • The following example shows how to specify an update query that inserts its result into a registered table. ```java // get a TableEnvironment TableEnvironment tableEnv = …; // see “Create a TableEnvironment” section

// register “Orders” table // register “RevenueFrance” output table

// compute revenue for all customers from France and emit to “RevenueFrance” tableEnv.sqlUpdate( “INSERT INTO RevenueFrance “ + “SELECT cID, cName, SUM(revenue) AS revSum “ + “FROM Orders “ + “WHERE cCountry = ‘FRANCE’ “ + “GROUP BY cID, cName” );

// execute query

<a name="register-a-datastream-or-dataset-as-table"></a>
### Register a DataStream or DataSet as Table
```java
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

DataStream<Tuple2<Long, String>> stream = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");

Convert a DataStream or DataSet into a Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

DataStream<Tuple2<Long, String>> stream = ...

// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);

// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

Convert a Table into a DataStream or DataSet(Table To DataStream)

  • Row: 字段按位置,任意数量的字段,对null值的支持,没有类型安全的访问进行映射
  • POJO: 字段按名称映射(POJO字段必须命名为Table字段),任意数量的字段,支持null值,类型安全访问
  • Case Class: 字段按位置映射,不支持null值,类型安全访问
  • Tuple: 按位置映射字段,限制为22(Scala)或25(Java)字段,不支持null值,类型安全访问
  • Atomic Type: Table必须具有单个字段,不支持null值,类型安全访问

    Convert a Table into a DataStream

  • Append Mode: Insert

  • Retract Mode: Delete

             Table resultS = tableEnv.scan("").select("");
          // convert the Table into an append DataStream of Row by specifying the class
          tableEnv.toAppendStream(resultS, Row.class);
    
          // convert the Table into an append DataStream of Tuple2<String, Integer>   via a TypeInformation
          TupleTypeInfo<Tuple2<String, Integer>> typeInfo = new TupleTypeInfo<>(
                  //BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO
                  Types.STRING,
                  Types.INT);
          DataStream<Tuple2<String, Integer>> tuple2DataStream = tableEnv.toAppendStream(resultS, typeInfo);
    
          // convert the Table into a retract DataStream of Row.
          //   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
          //   The boolean field indicates the type of the change.
          //   True is INSERT, false is DELETE.
          DataStream<Tuple2<Boolean, Row>> retractStream =
                  tableEnv.toRetractStream(resultS, Row.class);