Kudu DDL
CREATE TABLE customer
(
c_custkey INT,
c_name STRING,
c_address STRING,
c_city STRING,
c_nation STRING,
c_region STRING,
c_phone STRING,
c_mktsegment STRING,
primary key(c_custkey)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
CREATE TABLE dates
(
d_datekey INT,
d_date STRING,
d_dayofweek STRING,
d_month STRING,
d_year INT,
d_yearmonthnum INT,
d_yearmonth STRING,
d_daynuminweek INT,
d_daynuminmonth INT,
d_daynuminyear INT,
d_monthnuminyear INT,
d_weeknuminyear INT,
d_sellingseason STRING,
d_lastdayinweekfl INT,
d_lastdayinmonthfl INT,
d_holidayfl INT,
d_weekdayfl INT,
primary key(d_datekey)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
CREATE TABLE lineorder
(
`lo_orderkey` bigint,
`lo_linenumber` bigint,
`lo_custkey` int,
`lo_partkey` int,
`lo_suppkey` int,
`lo_orderdate` int,
`lo_orderpriotity` string,
`lo_shippriotity` int,
`lo_quantity` bigint,
`lo_extendedprice` bigint,
`lo_ordtotalprice` bigint,
`lo_discount` bigint,
`lo_revenue` bigint,
`lo_supplycost` bigint,
`lo_tax` bigint,
`lo_commitdate` int,
`lo_shipmode` string,
PRIMARY KEY(lo_orderkey, lo_linenumber)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
CREATE TABLE part
(
p_partkey INT,
p_name STRING,
p_mfgr STRING,
p_category STRING,
p_brand STRING,
p_color STRING,
p_type STRING,
p_size INT,
p_container STRING,
primary key(p_partkey)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
CREATE TABLE supplier
(
s_suppkey INT,
s_name STRING,
s_address STRING,
s_city STRING,
s_nation STRING,
s_region STRING,
s_phone STRING,
primary key(s_suppkey)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
CREATE VIEW `p_lineorder` AS SELECT `lineorder`.`lo_orderkey`,
`lineorder`.`lo_linenumber`,
`lineorder`.`lo_custkey`,
`lineorder`.`lo_partkey`,
`lineorder`.`lo_suppkey`,
`lineorder`.`lo_orderdate`,
`lineorder`.`lo_orderpriotity`,
`lineorder`.`lo_shippriotity`,
`lineorder`.`lo_quantity`,
`lineorder`.`lo_extendedprice`,
`lineorder`.`lo_ordtotalprice`,
`lineorder`.`lo_discount`,
`lineorder`.`lo_revenue`,
`lineorder`.`lo_supplycost`,
`lineorder`.`lo_tax`,
`lineorder`.`lo_commitdate`,
`lineorder`.`lo_shipmode`,
`lineorder`.`lo_extendedprice`*`lineorder`.`lo_discount` AS `V_REVENUE`
FROM `LINEORDER`;
Flink DDL
CREATE TABLE default_catalog.default_database.customer
(
c_custkey INT,
c_name STRING,
c_address STRING,
c_city STRING,
c_nation STRING,
c_region STRING,
c_phone STRING,
c_mktsegment STRING
) with (
'connector.master' = 'hz-hadoop-test-199-141-27:7051',
'connector.table' = 'impala::yjuny_kudu.customer',
'connector.property-version' = '1',
'connector.type' = 'kudu-retract',
'connector.primary-key.0' = 'c_custkey'
)
CREATE TABLE default_catalog.default_database.dates
(
d_datekey INT,
d_date STRING,
d_dayofweek STRING,
d_month STRING,
d_year INT,
d_yearmonthnum INT,
d_yearmonth STRING,
d_daynuminweek INT,
d_daynuminmonth INT,
d_daynuminyear INT,
d_monthnuminyear INT,
d_weeknuminyear INT,
d_sellingseason STRING,
d_lastdayinweekfl INT,
d_lastdayinmonthfl INT,
d_holidayfl INT,
d_weekdayfl INT
) with (
'connector.master' = 'hz-hadoop-test-199-141-27:7051',
'connector.table' = 'impala::yjuny_kudu.dates',
'connector.property-version' = '1',
'connector.type' = 'kudu-retract',
'connector.primary-key.0' = 'd_datekey'
)
CREATE TABLE default_catalog.default_database.lineorder
(
`lo_orderkey` bigint,
`lo_linenumber` bigint,
`lo_custkey` int,
`lo_partkey` int,
`lo_suppkey` int,
`lo_orderdate` int,
`lo_orderpriotity` string,
`lo_shippriotity` int,
`lo_quantity` bigint,
`lo_extendedprice` bigint,
`lo_ordtotalprice` bigint,
`lo_discount` bigint,
`lo_revenue` bigint,
`lo_supplycost` bigint,
`lo_tax` bigint,
`lo_commitdate` int,
`lo_shipmode` string
) with (
'connector.master' = 'hz-hadoop-test-199-141-27:7051',
'connector.table' = 'impala::yjuny_kudu.lineorder',
'connector.property-version' = '1',
'connector.type' = 'kudu-retract',
'connector.primary-key.0' = 'lo_orderkey',
'connector.primary-key.1' = 'lo_linenumber'
)
CREATE TABLE default_catalog.default_database.part
(
p_partkey INT,
p_name STRING,
p_mfgr STRING,
p_category STRING,
p_brand STRING,
p_color STRING,
p_type STRING,
p_size INT,
p_container STRING
) with (
'connector.master' = 'hz-hadoop-test-199-141-27:7051',
'connector.table' = 'impala::yjuny_kudu.part',
'connector.property-version' = '1',
'connector.type' = 'kudu-retract',
'connector.primary-key.0' = 'p_partkey'
)
CREATE TABLE default_catalog.default_database.supplier
(
s_suppkey INT,
s_name STRING,
s_address STRING,
s_city STRING,
s_nation STRING,
s_region STRING,
s_phone STRING
) with (
'connector.master' = 'hz-hadoop-test-199-141-27:7051',
'connector.table' = 'impala::yjuny_kudu.supplier',
'connector.property-version' = '1',
'connector.type' = 'kudu-retract',
'connector.primary-key.0' = 's_suppkey'
)
作业配置
INSERT INTO job (job_id, name, description, tenant_id, project_id) VALUES (1, 'ssb_etl', null, 1, 1);
INSERT INTO job_exec_env (job_id, mode, parallelism) VALUES (1, 'BATCH', 1);
INSERT INTO job_script_sql (job_id, name, position, script, description, sql_type, dialect) VALUES (1, 'customer_insert', 1, 'insert into customer select * from flink_hive.ssb.customer', null, 'INSERT', 'FLINK');
INSERT INTO job_script_sql (job_id, name, position, script, description, sql_type, dialect) VALUES (1, 'dates_insert', 1, 'insert into dates select * from flink_hive.ssb.dates', null, 'INSERT', 'FLINK');
INSERT INTO job_script_sql (job_id, name, position, script, description, sql_type, dialect) VALUES (1, 'supplier_insert', 1, 'insert into lineorder select * from flink_hive.ssb.lineorder', null, 'INSERT', 'FLINK');
INSERT INTO job_script_sql (job_id, name, position, script, description, sql_type, dialect) VALUES (1, 'part_insert', 1, 'insert into part select * from flink_hive.ssb.part', null, 'INSERT', 'FLINK');
INSERT INTO job_script_sql (job_id, name, position, script, description, sql_type, dialect) VALUES (1, 'supplier_insert', 1, 'insert into supplier select * from flink_hive.ssb.supplier', null, 'INSERT', 'FLINK');
INSERT INTO cluster (cluster_id, cluster_name, type, deploy_type, status, description, cluster_master_host, cluster_master_port) VALUES (1, 'flink1.10-yjuny', 'FLINK', 'YARN-SESSION', 'RUNNING', null, 'hz-hadoop-test-199-141-30', 43420);
INSERT INTO cluster_yarn (cluster_id, application_id, user, queue) VALUES (1, 'application_1581335885532_12895', 'flink', 'root.flink');
INSERT INTO cluster_config (cluster_id, key, value) VALUES (1, '__cmd.start', './bin/yarn-session.sh -tm 8192 -d -s 2 -jm 4096 -nm flink1.10-yjuny ');