JdbcCatalog parameters

  • name: required, name of the catalog
  • default database: required, default database to connect to
  • username: required, username of Postgres account
  • password: required, password of the account
  • base url: required, should be of format “jdbc:postgresql://:”, and should not contain database name here

    注册CalLog

    ```java EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings);

String name = “mypg”; String defaultDatabase = “mydb”; String username = “…”; String password = “…”; String baseUrl = “…”

JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl); tableEnv.registerCatalog(“mypg”, catalog);

// set the JdbcCatalog as the current catalog of the session tableEnv.useCatalog(“mypg”);

* SQL **

CREATE CATALOG mypg WITH( ‘type’=’jdbc’, ‘default-database’=’…’, ‘username’=’…’, ‘password’=’…’, ‘base-url’=’…’ );

USE CATALOG mypg;

* YAML **

execution: planner: blink … current-catalog: mypg # set the JdbcCatalog as the current catalog of the session current-database: mydb

catalogs:

  • name: mypg type: jdbc default-database: mydb username: … password: … base-url: … ```
  • Hive Catalog
  • 官网 介绍Catalog

    How to Create and Register Flink Tables to Catalog

    ```java TableEnvironment tableEnv = …

// Create a HiveCatalog Catalog catalog = new HiveCatalog(“myhive”, null, ““, ““);

// Register the catalog tableEnv.registerCatalog(“myhive”, catalog);

// Create a catalog database tableEnv.sqlUpdate(“CREATE DATABASE mydb WITH (…)”);

// Create a catalog table tableEnv.sqlUpdate(“CREATE TABLE mytable (name STRING, age INT) WITH (…)”);

tableEnv.listTables(); // should return the tables in current catalog and database.

  1. <a name="using-javascalapython-api"></a>
  2. ### 使用Java / Scala / Python API
  3. ```java
  4. TableEnvironment tableEnv = ...
  5. // Create a HiveCatalog
  6. Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");
  7. // Register the catalog
  8. tableEnv.registerCatalog("myhive", catalog);
  9. // Create a catalog database
  10. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
  11. // Create a catalog table
  12. TableSchema schema = TableSchema.builder()
  13. .field("name", DataTypes.STRING())
  14. .field("age", DataTypes.INT())
  15. .build();
  16. catalog.createTable(
  17. new ObjectPath("mydb", "mytable"),
  18. new CatalogTableImpl(
  19. schema,
  20. new Kafka()
  21. .version("0.11")
  22. ....
  23. .startFromEarlist(),
  24. "my comment"
  25. )
  26. );
  27. List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"

数据库操作

// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);

// drop database
catalog.dropDatabase("mydb", false);

// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);

// get databse
catalog.getDatabase("mydb");

// check if a database exist
catalog.databaseExists("mydb");

// list databases in a catalog
catalog.listDatabases("mycatalog");

表操作

// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");

// get table
catalog.getTable("mytable");

// check if a table exist or not
catalog.tableExists("mytable");

// list tables in a database
catalog.listTables("mydb");

查看操作

// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);

// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);

// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);

// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view");

// get view
catalog.getTable("myview");

// check if a view exist or not
catalog.tableExists("mytable");

// list views in a database
catalog.listViews("mydb");

分区操作

// create view
catalog.createPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);

// alter partition
catalog.alterPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));

// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table by expression filter
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

功能操作

// create function
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);

// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// get function
catalog.getFunction("myfunc");

// check if a function exist or not
catalog.functionExists("myfunc");

// list functions in a database
catalog.listFunctions("mydb");

Table API and SQL for Catalog

Registering a Catalog

Java/Scala
tableEnv.registerCatalog(new CustomCatalog("myCatalog"));

Yaml
catalogs:
   - name: myCatalog
     type: custom_catalog
     hive-conf-dir: ...

Changing the Current Catalog And Database

Flink将始终在当前目录和数据库中搜索表,视图和UDF。

tableEnv.useCatalog("myCatalog");
tableEnv.useDatabase("myDb");

Flink SQL> USE CATALOG myCatalog;
Flink SQL> USE myDB;

通过提供以下格式的标准名称,可以访问不是当前目录的目录中的元数据catalog.database.object

tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");

Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;

List Available Catalogs

List Available Catalogs

Flink SQL> show catalogs;

List Available Databases

tableEnv.listDatabases();

Flink SQL> show databases;

List Available Tables

tableEnv.listTables();

Flink SQL> show tables;