JdbcCatalog parameters
- name: required, name of the catalog
- default database: required, default database to connect to
- username: required, username of Postgres account
- password: required, password of the account
- base url: required, should be of format “jdbc:postgresql://:”, and should not contain database name here
注册CalLog
```java EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = “mypg”; String defaultDatabase = “mydb”; String username = “…”; String password = “…”; String baseUrl = “…”
JdbcCatalog catalog = new JdbcCatalog(name, defaultDatabase, username, password, baseUrl); tableEnv.registerCatalog(“mypg”, catalog);
// set the JdbcCatalog as the current catalog of the session tableEnv.useCatalog(“mypg”);
* SQL **
CREATE CATALOG mypg WITH( ‘type’=’jdbc’, ‘default-database’=’…’, ‘username’=’…’, ‘password’=’…’, ‘base-url’=’…’ );
USE CATALOG mypg;
* YAML **
execution: planner: blink … current-catalog: mypg # set the JdbcCatalog as the current catalog of the session current-database: mydb
catalogs:
- name: mypg type: jdbc default-database: mydb username: … password: … base-url: … ```
- Hive Catalog
- 官网 介绍Catalog
How to Create and Register Flink Tables to Catalog
```java TableEnvironment tableEnv = …
// Create a HiveCatalog
Catalog catalog = new HiveCatalog(“myhive”, null, “
// Register the catalog tableEnv.registerCatalog(“myhive”, catalog);
// Create a catalog database tableEnv.sqlUpdate(“CREATE DATABASE mydb WITH (…)”);
// Create a catalog table tableEnv.sqlUpdate(“CREATE TABLE mytable (name STRING, age INT) WITH (…)”);
tableEnv.listTables(); // should return the tables in current catalog and database.
<a name="using-javascalapython-api"></a>
### 使用Java / Scala / Python API
```java
TableEnvironment tableEnv = ...
// Create a HiveCatalog
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");
// Register the catalog
tableEnv.registerCatalog("myhive", catalog);
// Create a catalog database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
// Create a catalog table
TableSchema schema = TableSchema.builder()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();
catalog.createTable(
new ObjectPath("mydb", "mytable"),
new CatalogTableImpl(
schema,
new Kafka()
.version("0.11")
....
.startFromEarlist(),
"my comment"
)
);
List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"
数据库操作
// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
// drop database
catalog.dropDatabase("mydb", false);
// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
// get databse
catalog.getDatabase("mydb");
// check if a database exist
catalog.databaseExists("mydb");
// list databases in a catalog
catalog.listDatabases("mycatalog");
表操作
// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
// get table
catalog.getTable("mytable");
// check if a table exist or not
catalog.tableExists("mytable");
// list tables in a database
catalog.listTables("mydb");
查看操作
// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);
// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view");
// get view
catalog.getTable("myview");
// check if a view exist or not
catalog.tableExists("mytable");
// list views in a database
catalog.listViews("mydb");
分区操作
// create view
catalog.createPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);
// alter partition
catalog.alterPartition(
new ObjectPath("mydb", "mytable"),
new CatalogPartitionSpec(...),
new CatalogPartitionImpl(...),
false);
// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));
// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));
// list partitions of a table by expression filter
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
功能操作
// create function
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);
// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
// get function
catalog.getFunction("myfunc");
// check if a function exist or not
catalog.functionExists("myfunc");
// list functions in a database
catalog.listFunctions("mydb");
Table API and SQL for Catalog
Registering a Catalog
Java/Scala
tableEnv.registerCatalog(new CustomCatalog("myCatalog"));
Yaml
catalogs:
- name: myCatalog
type: custom_catalog
hive-conf-dir: ...
Changing the Current Catalog And Database
Flink将始终在当前目录和数据库中搜索表,视图和UDF。
tableEnv.useCatalog("myCatalog");
tableEnv.useDatabase("myDb");
Flink SQL> USE CATALOG myCatalog;
Flink SQL> USE myDB;
通过提供以下格式的标准名称,可以访问不是当前目录的目录中的元数据catalog.database.object
。
tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");
Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;
List Available Catalogs
List Available Catalogs
Flink SQL> show catalogs;
List Available Databases
tableEnv.listDatabases();
Flink SQL> show databases;
List Available Tables
tableEnv.listTables();
Flink SQL> show tables;