Kudu DDL
CREATE TABLE customer( c_custkey INT, c_name STRING, c_address STRING, c_city STRING, c_nation STRING, c_region STRING, c_phone STRING, c_mktsegment STRING, primary key(c_custkey))PARTITION BY HASH PARTITIONS 16STORED AS KUDU;CREATE TABLE dates( d_datekey INT, d_date STRING, d_dayofweek STRING, d_month STRING, d_year INT, d_yearmonthnum INT, d_yearmonth STRING, d_daynuminweek INT, d_daynuminmonth INT, d_daynuminyear INT, d_monthnuminyear INT, d_weeknuminyear INT, d_sellingseason STRING, d_lastdayinweekfl INT, d_lastdayinmonthfl INT, d_holidayfl INT, d_weekdayfl INT, primary key(d_datekey))PARTITION BY HASH PARTITIONS 16STORED AS KUDU;CREATE TABLE lineorder( `lo_orderkey` bigint, `lo_linenumber` bigint, `lo_custkey` int, `lo_partkey` int, `lo_suppkey` int, `lo_orderdate` int, `lo_orderpriotity` string, `lo_shippriotity` int, `lo_quantity` bigint, `lo_extendedprice` bigint, `lo_ordtotalprice` bigint, `lo_discount` bigint, `lo_revenue` bigint, `lo_supplycost` bigint, `lo_tax` bigint, `lo_commitdate` int, `lo_shipmode` string, PRIMARY KEY(lo_orderkey, lo_linenumber))PARTITION BY HASH PARTITIONS 16STORED AS KUDU;CREATE TABLE part( p_partkey INT, p_name STRING, p_mfgr STRING, p_category STRING, p_brand STRING, p_color STRING, p_type STRING, p_size INT, p_container STRING, primary key(p_partkey))PARTITION BY HASH PARTITIONS 16STORED AS KUDU;CREATE TABLE supplier( s_suppkey INT, s_name STRING, s_address STRING, s_city STRING, s_nation STRING, s_region STRING, s_phone STRING, primary key(s_suppkey))PARTITION BY HASH PARTITIONS 16STORED AS KUDU;CREATE VIEW `p_lineorder` AS SELECT `lineorder`.`lo_orderkey`, `lineorder`.`lo_linenumber`, `lineorder`.`lo_custkey`, `lineorder`.`lo_partkey`, `lineorder`.`lo_suppkey`, `lineorder`.`lo_orderdate`, `lineorder`.`lo_orderpriotity`, `lineorder`.`lo_shippriotity`, `lineorder`.`lo_quantity`, `lineorder`.`lo_extendedprice`, `lineorder`.`lo_ordtotalprice`, `lineorder`.`lo_discount`, `lineorder`.`lo_revenue`, `lineorder`.`lo_supplycost`, `lineorder`.`lo_tax`, `lineorder`.`lo_commitdate`, `lineorder`.`lo_shipmode`, `lineorder`.`lo_extendedprice`*`lineorder`.`lo_discount` AS `V_REVENUE` FROM `LINEORDER`;
Flink DDL
CREATE TABLE default_catalog.default_database.customer( c_custkey INT, c_name STRING, c_address STRING, c_city STRING, c_nation STRING, c_region STRING, c_phone STRING, c_mktsegment STRING) with ( 'connector.master' = 'hz-hadoop-test-199-141-27:7051', 'connector.table' = 'impala::yjuny_kudu.customer', 'connector.property-version' = '1', 'connector.type' = 'kudu-retract', 'connector.primary-key.0' = 'c_custkey')CREATE TABLE default_catalog.default_database.dates( d_datekey INT, d_date STRING, d_dayofweek STRING, d_month STRING, d_year INT, d_yearmonthnum INT, d_yearmonth STRING, d_daynuminweek INT, d_daynuminmonth INT, d_daynuminyear INT, d_monthnuminyear INT, d_weeknuminyear INT, d_sellingseason STRING, d_lastdayinweekfl INT, d_lastdayinmonthfl INT, d_holidayfl INT, d_weekdayfl INT) with ( 'connector.master' = 'hz-hadoop-test-199-141-27:7051', 'connector.table' = 'impala::yjuny_kudu.dates', 'connector.property-version' = '1', 'connector.type' = 'kudu-retract', 'connector.primary-key.0' = 'd_datekey')CREATE TABLE default_catalog.default_database.lineorder( `lo_orderkey` bigint, `lo_linenumber` bigint, `lo_custkey` int, `lo_partkey` int, `lo_suppkey` int, `lo_orderdate` int, `lo_orderpriotity` string, `lo_shippriotity` int, `lo_quantity` bigint, `lo_extendedprice` bigint, `lo_ordtotalprice` bigint, `lo_discount` bigint, `lo_revenue` bigint, `lo_supplycost` bigint, `lo_tax` bigint, `lo_commitdate` int, `lo_shipmode` string) with ( 'connector.master' = 'hz-hadoop-test-199-141-27:7051', 'connector.table' = 'impala::yjuny_kudu.lineorder', 'connector.property-version' = '1', 'connector.type' = 'kudu-retract', 'connector.primary-key.0' = 'lo_orderkey', 'connector.primary-key.1' = 'lo_linenumber')CREATE TABLE default_catalog.default_database.part( p_partkey INT, p_name STRING, p_mfgr STRING, p_category STRING, p_brand STRING, p_color STRING, p_type STRING, p_size INT, p_container STRING) with ( 'connector.master' = 'hz-hadoop-test-199-141-27:7051', 'connector.table' = 'impala::yjuny_kudu.part', 'connector.property-version' = '1', 'connector.type' = 'kudu-retract', 'connector.primary-key.0' = 'p_partkey')CREATE TABLE default_catalog.default_database.supplier( s_suppkey INT, s_name STRING, s_address STRING, s_city STRING, s_nation STRING, s_region STRING, s_phone STRING) with ( 'connector.master' = 'hz-hadoop-test-199-141-27:7051', 'connector.table' = 'impala::yjuny_kudu.supplier', 'connector.property-version' = '1', 'connector.type' = 'kudu-retract', 'connector.primary-key.0' = 's_suppkey')
作业配置
INSERT INTO job (job_id, name, description, tenant_id, project_id) VALUES (1, 'ssb_etl', null, 1, 1);INSERT INTO job_exec_env (job_id, mode, parallelism) VALUES (1, 'BATCH', 1);INSERT INTO job_script_sql (job_id, name, position, script, description, sql_type, dialect) VALUES (1, 'customer_insert', 1, 'insert into customer select * from flink_hive.ssb.customer', null, 'INSERT', 'FLINK');INSERT INTO job_script_sql (job_id, name, position, script, description, sql_type, dialect) VALUES (1, 'dates_insert', 1, 'insert into dates select * from flink_hive.ssb.dates', null, 'INSERT', 'FLINK');INSERT INTO job_script_sql (job_id, name, position, script, description, sql_type, dialect) VALUES (1, 'supplier_insert', 1, 'insert into lineorder select * from flink_hive.ssb.lineorder', null, 'INSERT', 'FLINK');INSERT INTO job_script_sql (job_id, name, position, script, description, sql_type, dialect) VALUES (1, 'part_insert', 1, 'insert into part select * from flink_hive.ssb.part', null, 'INSERT', 'FLINK');INSERT INTO job_script_sql (job_id, name, position, script, description, sql_type, dialect) VALUES (1, 'supplier_insert', 1, 'insert into supplier select * from flink_hive.ssb.supplier', null, 'INSERT', 'FLINK');INSERT INTO cluster (cluster_id, cluster_name, type, deploy_type, status, description, cluster_master_host, cluster_master_port) VALUES (1, 'flink1.10-yjuny', 'FLINK', 'YARN-SESSION', 'RUNNING', null, 'hz-hadoop-test-199-141-30', 43420);INSERT INTO cluster_yarn (cluster_id, application_id, user, queue) VALUES (1, 'application_1581335885532_12895', 'flink', 'root.flink');INSERT INTO cluster_config (cluster_id, key, value) VALUES (1, '__cmd.start', './bin/yarn-session.sh -tm 8192 -d -s 2 -jm 4096 -nm flink1.10-yjuny ');