SQL 例子
%flink.ssql-- =====================-- 数据类型-- =====================-- 字符串-- ---------------------CHARCHAR(1)VARCHARVARCHAR(1)STRING-- 二进制字符串-- ---------------------BINARYBINARY(1)VARBINARYVARBINARY(1)BYTES-- 精确数值-- ---------------------DECIMALDECIMAL(10)DECIMAL(10, 0)DECDEC(10)DEC(10, 0)NUMERICNUMERIC(10)NUMERIC(10, 0)TINYINTSMALLINTINTINTEGERBIGINTFLOATDOUBLEDOUBLE PRECISION-- 日期和时间-- ---------------------DATETIMETIME(0)TIMESTAMPTIMESTAMP(6)TIMESTAMP WITHOUT TIME ZONETIMESTAMP(6) WITHOUT TIME ZONETIMESTAMP WITH TIME ZONETIMESTAMP(6) WITH TIME ZONETIMESTAMP WITH LOCAL TIME ZONETIMESTAMP(6) WITH LOCAL TIME ZONEINTERVAL YEARINTERVAL YEAR(2)INTERVAL YEAR(2) TO MONTHINTERVAL MONTHINTERVAL DAYINTERVAL DAY(2)INTERVAL DAY(2) TO HOURINTERVAL DAY(2) TO MINUTEINTERVAL DAY(2) TO SECOND(2)INTERVAL HOURINTERVAL HOUR TO MINUTEINTERVAL HOUR TO SECOND(6)INTERVAL MINUTEINTERVAL MINUTE TO SECOND(6)INTERVAL SECONDINTERVAL SECOND(6)-- 结构化的数据类型-- ---------------------ARRAY<INT>INT ARRAYMAP<VARCHAR(1), INT>MULTISET<INT>INT MULTISETROW<id, INT, `name` VARCHAR(1), price DECIMAL>ROW<id, INT 'id', `name` VARCHAR(1) '名称', price DECIMAL '价格'>ROW(id, INT, `name` VARCHAR(1), price DECIMAL)ROW(id, INT 'id', `name` VARCHAR(1) '名称', price DECIMAL '价格')-- 其他数据类型-- ---------------------BOOLEANRAW('class', 'snapshot')NULL-- =====================-- 系统(内置)函数-- =====================-- Comparison Functions-- ---------------------value1 = value2value1 <> value2value1 > value2value1 >= value2value1 < value2value1 <= value2value1 IS NULLvalue1 IS NOT NULLvalue1 IS DISTINCT FROM value2value1 IS NOT DISTINCT FROM value2value1 BETWEEN ASYMMETRIC value2 AND value3value1 BETWEEN SYMMETRIC value2 AND value3value1 NOT BETWEEN value2 AND value3string1 LIKE string2 ESCAPE 'xxx'string1 NOT LIKE string2 ESCAPE 'xxx'string1 SIMILAR TO string2string1 NOT SIMILAR TO string2value1 IN (value2, value3)value1 NOT IN (value2, value3)EXISTS(sub-query)value1 IN (sub-query)value2 NOT IN (sub-query)-- Logical Functions-- ---------------------boolean1 OR boolean2boolean1 AND boolean2NOT boolean1boolean1 IS FALSEboolean1 IS NOT FALSEboolean1 IS TRUEboolean1 IS NOT TRUEboolean1 IS UNKNOWNboolean1 IS NOT UNKNOWN-- Arithmetic Functions-- ---------------------+ numeric1- numeric1numeric1 + numeric2numeric1 - numeric2numeric1 * numeric2numeric1 / numeric2POWER(numeric1, numeric2)ABS(numeric1)MOD(numeric1, numeric2)SQRT(numeric1)LN(numeric1)LOG10(numeric1)LOG2(numeric1)LOG(numeric2)LOG(numeric1, numeric2)EXP(numeric1)CEIL(numeric1)CEILING(numeric1)FLOOR(numeric1)SIN(numeric1)SINH(numeric1)COS(numeric1)TAN(numeric1)TANH(numeric1)COT(numeric1)ASIN(numeric1)ACOS(numeric1)ATAN(numeric1)ATAN2(numeric1, numeric2)COSH(numeric1)DEGREES(numeric1)RADIANS(numeric1)SIGN(numeric1)ROUND(numeric1, integer1)PIE()RAND()RAND(integer1)RAND_INTEGER(integer1)RAND_INTEGER(integer1, integer2)UUID()BIN(integer1)HEX(numeric1)HEX(string1)TRUNCATE(numeric1, integer2)PI()-- String Functions-- ---------------------string1 || string2CHAR_LENGTH(string1)CHARACTER_LENGTH(string1)UPPER(string1)LOWER(string1)POSITION(string1 IN string2)TRIM(BOTH string1 FROM string2)TRIM(LEADING string1 FROM string2)TRIM(TRAILING string1 FROM string2)LTRIM(string1)RTRIM(string1)REPEAT(string1, integer1)REGEXP_REPLACE(string1, string2, string3)OVERLAY(string1 PLACING string2 FROM integer1 FOR integer2)SUBSTRING(string1 FROM integer1 FOR integer2)REPLACE(string1, string2, string3)REGEXP_EXTRACT(string1, string2[, integer1])INITCAP(string1)CONCAT(string1, string2,...)CONCAT_WS(string1, string2, string3,...)LPAD(string1, integer1, string2)RPAD(string1, integer1, string2)FROM_BASE64(string1)TO_BASE64(string1)ASCII(string1)CHR(integer1)DECODE(binary1, string1)ENCODE(string1, string2)INSTR(string1, string2)LEFT(string1, integer1)RIGHT(string1, integer1)LOCATE(string1, string2[, integer1])PARSE_URL(string1, string2[, string3])REGEXP(string1, string2)REVERSE(string1)SPLIT_INDEX(string1, string2, integer1)STR_TO_MAP(string1[, string2, string3]])SUBSTR(string1[, integer1[, integer2]])-- Temporal Functions-- ---------------------DATE string1DATE '2020-08-09'TIME string1TIME '20:19:18'TIMESTAMP string1TIMESTAMP '2020-08-09 20:19:18'TIMESTAMP '2020-08-09T20:19:18.001'INTERVAL string1 range1CURRENT_DATECURRENT_TIMECURRENT_TIMESTAMPLOCALTIMELOCALTIMESTAMPEXTRACT(timeintervalunit FROM temporal)YEAR(date1)QUARTER(date1)MONTH(date1)WEEK(date1)DAYOFYEAR(date1)DAYOFMONTH(date1)DAYOFWEEK(date1)HOUR(timestamp1)MINUTE(timestamp1)SECOND(timestamp1)FLOOR(timepoint TO timeintervalunit)CEIL(timepoint TO timeintervalunit)(timepoint1, temporal1) OVERLAPS (timepoint2, temporal2)DATE_FORMAT(timestamp1, string1)TIMESTAMPADD(timeintervalunit, interval1, timepoint)TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)CONVERT_TZ(string1, string2, string3)FROM_UNIXTIME(numeric1[, string1])UNIX_TIMESTAMP()UNIX_TIMESTAMP(string1[, string2])TO_DATE(string1[, string2])TO_TIMESTAMP(string1[, string2])NOW()-- Conditional Functions-- ---------------------CASE value WHEN value1_1, value1_2 THEN result1 WHEN value2_1, value2_2 THEN result2ELSE resultZENDCASE WHEN condition1 THEN result1 WHEN condition2 THEN result2ELSE resultZENDNULLIF(value1, value2)COALESCE(value1, value2, value3)IF(condition1, true_value, false_value)IS_ALPHA(string1)IS_DECIMAL(string1)IS_DIGIT(string1)-- Type Conversion Functions-- ---------------------CAST(value1 AS type1)-- Collection Functions-- ---------------------CARDINALITY(array1)array1[integer1]array1[1]ELEMENT(array1)CARDINALITY(map1)map1[value1]map1['key']-- Value Construction Functions-- ---------------------ROW(value1, value2)(value1, value2)ARRAY[value1, value2]MAP[value1, value2, value3, value4]-- Hash Functions-- ---------------------GROUP_ID()GROUPING(expression1, expression2)GROUPING_ID(expression1, expression2)-- Grouping Functions-- ---------------------MD5(string1)SHA1(string1)SHA224(string1)SHA256(string1)SHA384(string1)SHA512(string1)SHA2(string1, hashLength)-- Aggregate Functions-- ---------------------COUNT(ALL expression)COUNT(DISTINCT expression1, expression2)COUNT(*)COUNT(1)AVG(expression)MAX(expression)MIN(expression)STDDEV_POP(expression)STDDEV_SAMP(expression)VAR_POP(expression)VAR_SAMP(expression)COLLECT(expression)VARIANCE(expression)RANK()DENSE_RANK()ROW_NUMBER()LEAD(expression, offset, default)LAG(expression, offset, default)FIRST_VALUE(expression)LAST_VALUE(expression)LISTAGG(expression, separator)-- =====================-- 查询语句-- =====================-- Scan、Projection 与 Filter-- ---------------------SELECT * FROM OrdersSELECT `a`, `c` AS d FROM OrdersSELECT * FROM Orders WHERE b = 'red'SELECT * FROM Orders WHERE `a` % 2 = 0SELECT PRETTY_PRINT(user) FROM Orders-- 聚合-- ---------------------SELECT `a`, SUM(b) as dFROM OrdersGROUP BY `a`SELECT `user`, SUM(amount)FROM OrdersGROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), `user`SELECT COUNT(amount) OVER ( PARTITION BY `user` ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)FROM OrdersSELECT COUNT(amount) OVER w, SUM(amount) OVER wFROM Orders WINDOW w AS ( PARTITION BY `user` ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)SELECT DISTINCT users FROM OrdersSELECT SUM(amount)FROM OrdersGROUP BY GROUPING SETS ((`user`), (product))SELECT SUM(amount)FROM OrdersGROUP BY usersHAVING SUM(amount) > 50SELECT MyAggregate(amount)FROM OrdersGROUP BY users-- Joins-- ---------------------SELECT *FROM Orders INNER JOIN Product ON Orders.productId = Product.idSELECT *FROM Orders LEFT JOIN Product ON Orders.productId = Product.idSELECT *FROM Orders RIGHT JOIN Product ON Orders.productId = Product.idSELECT *FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.idSELECT *FROM Orders o, Shipments sWHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptimeSELECT users, tagFROM Orders CROSS JOIN UNNEST(tags) AS t (tag)SELECT users, tagFROM Orders, LATERAL TABLE(unnest_udtf(tags)) AS t(tag)SELECT users, tagFROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) AS t(tag) ON TRUESELECT o_amount, r_rateFROM Orders, LATERAL TABLE (Rates(o_proctime))WHERE r_currency = o_currencySELECT o.amout, o.currency, r.rate, o.amount * r.rateFROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency-- 集合操作-- ---------------------SELECT *FROM ( (SELECT `user` FROM Orders WHERE `a` % 2 = 0) UNION (SELECT `user` FROM Orders WHERE b = 0))SELECT *FROM ( (SELECT `user` FROM Orders WHERE `a` % 2 = 0) UNION ALL (SELECT `user` FROM Orders WHERE b = 0))SELECT *FROM ( (SELECT `user` FROM Orders WHERE `a` % 2 = 0) INTERSECT (SELECT `user` FROM Orders WHERE b = 0))SELECT *FROM ( (SELECT `user` FROM Orders WHERE `a` % 2 = 0) EXCEPT (SELECT `user` FROM Orders WHERE b = 0))SELECT `user`, amountFROM OrdersWHERE product IN ( SELECT product FROM NewProducts)SELECT `user`, amountFROM OrdersWHERE product EXISTS ( SELECT product FROM NewProducts)-- OrderBy & Limit-- ---------------------SELECT *FROM OrdersORDER BY orderTimeSELECT *FROM OrdersORDER BY orderTimeLIMIT 3-- 窗口-- ---------------------TUMBLE(time_attr, interval1)HOP(time_attr, interval1, interval1)SESSION(time_attr, interval1)TUMBLE_START(time_attr, interval1)HOP_START(time_attr, interval1, interval1)SESSION_START(time_attr, interval1)TUMBLE_END(time_attr, interval1)HOP_END(time_attr, interval1, interval1)SESSION_END(time_attr, interval1)TUMBLE_ROWTIME(time_attr, interval1)HOP_ROWTIME(time_attr, interval1, interval1)SESSION_ROWTIME(time_attr, interval1)TUMBLE_PROCTIME(time_attr, interval1)HOP_PROCTIME(time_attr, interval1, interval1)SESSION_PROCTIME(time_attr, interval1)-- 模式匹配-- ---------------------SELECT T.aid, T.bid, T.cidFROM MyTableMATCH_RECOGNIZE ( PARTITION BY userid ORDER BY proctime MEASURES `A`.id AS aid, B.id AS bid, `C`.id AS cid PATTERN (`A` B `C`) DEFINE `A` AS `name` = 'a', B AS `name` = 'b', `C` AS `name` = 'c') AS TSELECT *FROM Ticker MATCH_RECOGNIZE ( PARTITION BY symbol ORDER BY rowtime MEASURES START_ROW.rowtime AS start_tstamp, LAST(PRICE_DOWN.rowtime) AS bottom_tstamp, LAST(PRICE_UP.rowtime) AS end_tstamp ONE ROW PER MATCH AFTER MATCH SKIP TO LAST PRICE_UP PATTERN (START_ROW PRICE_DOWN+ PRICE_UP) DEFINE PRICE_DOWN AS (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1), PRICE_UP AS PRICE_UP.price > LAST(PRICE_DOWN.price, 1) ) MR;SELECT *FROM Ticker MATCH_RECOGNIZE ( PARTITION BY symbol ORDER BY rowtime MEASURES FIRST(`A`.rowtime) AS start_tstamp, LAST(`A`.rowtime) AS end_tstamp, AVG(`A`.price) AS avgPrice ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (`A`+ B) DEFINE `A` AS AVG(`A`.price) < 15 ) MR;SELECT *FROM Ticker MATCH_RECOGNIZE( PARTITION BY symbol ORDER BY rowtime MEASURES `C`.price AS lastPrice ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (`A` B* `C`) DEFINE `A` AS `A`.price > 10, B AS B.price < 15, `C` AS `C`.price > 12 )SELECT *FROM Ticker MATCH_RECOGNIZE( PARTITION BY symbol ORDER BY rowtime MEASURES `C`.rowtime AS dropTime, `A`.price - `C`.price AS dropDiff ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (`A` B* `C`) WITHIN INTERVAL '1' HOUR DEFINE B AS B.price > `A`.price - 10 `C` AS `C`.price < `A`.price - 10 )SELECT *FROM Ticker MATCH_RECOGNIZE( PARTITION BY symbol ORDER BY rowtime MEASURES FIRST(`A`.price) AS startPrice, LAST(`A`.price) AS topPrice, B.price AS lastPrice ONE ROW PER MATCH PATTERN (`A`+ B) DEFINE `A` AS LAST(A.price, 1) IS NULL OR `A`.price > LAST(`A`.price, 1), B AS B.price < LAST(`A`.price) )SELECT *FROM Ticker MATCH_RECOGNIZE( PARTITION BY symbol ORDER BY rowtime MEASURES SUM(A.price) AS sumPrice, FIRST(rowtime) AS startTime, LAST(rowtime) AS endTime ONE ROW PER MATCH [AFTER MATCH STRATEGY] PATTERN (`A`+ C) DEFINE `A` AS SUM(`A`.price) < 30 )-- =====================-- CREATE 语句-- =====================-- CREATE TABLE-- ---------------------CREATE TABLE Orders ( `user` BIGINT, product STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND) WITH ( 'connector' = 'kafka', 'topic' = 'topic1' 'properties.bootstrap.servers' = 'broker1' 'properties.group.id' = 'group1' 'format' = 'json', 'scan.startup.mode' = 'earliest-offset', 'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300', 'scan.startup.timestamp-millis' = '0', 'sink.partitioner' = 'fixed');CREATE TABLE kafkaTable ( user_id BIGINT CONSTRAINT primary_key PRIMARY KEY NOT ENFORCED COMMENT '主键', item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset');CREATE TABLE MyUserTable ( id BIGINT, `name` STRING, age INT, status BOOLEAN, CONSTRAINT primary_key PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'table1', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'user1', 'password' = 'password1', 'scan.partition.column' = 'column1', 'scan.partition.num' = '5', 'scan.partition.lower-bound' = '0', 'scan.partition.upper-bound' = '10000', 'scan.fetch-size' = '1000', 'lookup.cache.max-rows' = '1000', 'lookup.cache.ttl' = '3600', 'lookup.max-retries' = '3', 'sink.buffer-flush.max-rows' = '1000', 'sink.buffer-flush.interval' = '1s', 'sink.max-retries' = '3');CREATE TABLE myUserTable ( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = 'http://host_name:9092;http://host_name:9093', 'index' = 'myIndex', 'document-type' = 'json', 'document-id.key-delimiter' = '_', 'failure-handler' = 'fail', 'sink.flush-on-checkpoint' = 'true', 'sink.bulk-flush.max-size' = '2mb', 'sink.bulk-flush.interval' = '1s', 'sink.bulk-flush.backoff.strategy' = 'DISABLED', 'sink.bulk-flush.backoff.max-retries' = '8', 'sink.bulk-flush.backoff.delay' = '50ms', 'connection.max-retry-timeout' = '1000', 'connection.path-prefix' = '/v1', 'format' = 'json');CREATE TABLE fs_table ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING) PARTITION BY (dt, `hour`) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://xxx', 'format' = 'orc', 'sink.rolling-policy.file-size' = '128MB', 'sink.rolling-policy.rollover-interval' = '30 min', 'sink.rolling-policy.check-interval' = '1 min', 'sink.partition-commit.trigger' = 'process-time', 'sink.partition-commit.delay' = '0 s', 'partition.time-extractor.kind' = 'default', 'partition.time-extractor.class' = 'PartitionTimeExtractor', 'partition.time-extractor.timestamp-pattern' = '$year-$month-$day $hour:00:00', 'sink.partition-commit.policy.kind' = 'metastore,success-file', 'sink.partition-commit.policy.class' = 'PartitionCommitPolicy', 'sink.partition-commit.success-file.name' = '_SUCCESS');CREATE TABLE hTable ( rowkey INT, family1 ROW<q1 INT>, family2 ROW<q2 STRING, q3 BIGINT>, family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>, PRIMARY KEY (rowkey) NOT ENFORCED) WITH ( 'connector' = 'hbase-1.4', 'table-name' = 'htable', 'zookeeper.quorum' = 'xxxx', 'zookeeper.znode.parent' = '/hbase', 'null-string-literal' = 'null', 'sink.buffer-flush.max-size' = '2mb', 'sink.buffer-flush.max-rows' = '1000', 'sink.buffer-flush.interval' = '1s');CREATE TABLE datagen ( f_sequence INT, f_random INT, f_random_str STRING, ts AS LOCALTIMESTAMP COMMENT '计算列', WATERMARK FOR ts AS ts) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10000', 'fields.f_sequence.kind' = 'sequence', 'fields.f_sequence.tart' = '1', 'fields.f_sequence.end' = '1000', 'fields.f_random.min' = '1', 'fields.f_random.max' = '1000', 'fields.f_random_str.length' = '10');CREATE TABLE print_table ( f0 INT, f1 INT, f2 STRING, f3 DOUBLE) WITH ( 'connector' = 'print', 'print-identifier' = 'prefix', 'standard-error' = 'false');CREATE TABLE print_table WITH ('connector' = 'print')LIKE source_table (EXCLUDING ALL);CREATE TABLE blackhole_table ( f0 INT, f1 INT, f2 STRING, f3 DOUBLE) WITH ( 'connector' = 'blackhole');CREATE TABLE blackhole_table WITH ('connector' = 'blackhole')LIKE source_table (EXCLUDING ALL);-- like optionsINCLUDING EXCLUDINGALL CONSTRAINTS PARTITIONSOVERWRITING GENERATED OPTIONS WATERMARKS-- CREATE CATALOG-- ---------------------CREATE CATALOG catalog_name1 WITH ('key1' = 'val1', 'key2' = 'val2');-- CREATE DATABASE-- ---------------------CREATE DATABASE IF NOT EXISTS db_name COMMENT 'database_comment' WITH ('key1' = 'val1', 'key2' = 'val2');-- CREATE VIEW-- ---------------------CREATE TEMPORARY VIEW IF NOT EXISTS db_name.view_name col1, col2 COMMENT 'create view' AS SELECT col1, col2 FROM table1;-- CREATE FUNCTION-- ---------------------CREATE TEMPORARY SYSTEM FUNCTION IF NOT EXISTS db_name.function_name AS identifier1 LANGUAGE JAVA-- =====================-- DROP 语句-- =====================DROP TABLE IF EXISTS db_name.table_name1;DROP DATABASE IF EXISTS db_name RESTRICT;DROP VIEW view_name;DROP FUNCTION function_name;-- =====================-- ALTER 语句-- =====================ALTER TABLE db_name.table_name1 RENAME TO new_table_name;ALTER TABLE db_name.table_name1 SET ('key1' = 'val1', 'key2' = 'valu2');ALTER DATABASE db_name SET ('key1' = 'val1', 'key2' = 'valu2');ALTER FUNCTION function_name AS identifier LANGUAGE PYTHON;-- =====================-- INSERT 语句-- =====================-- 追加行到该静态分区中 (date='2019-8-30', country='China')INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China') SELECT `user`, cnt FROM page_view_source;-- 追加行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定INSERT INTO country_page_view PARTITION (date='2019-8-30') SELECT `user`, cnt, country FROM page_view_source;-- 覆盖行到静态分区 (date='2019-8-30', country='China')INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China') SELECT `user`, cnt FROM page_view_source;-- 覆盖行到分区 (date, country) 中,其中 date 是静态分区 '2019-8-30';country 是动态分区,其值由每一行动态决定INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30') SELECT `user`, cnt, country FROM page_view_source;INSERT INTO students VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);-- =====================-- SQL Hints-- =====================CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);-- override table options in query sourceselect id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;-- override table options in joinselect * from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1 join kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2 on t1.id = t2.id;-- override table options for INSERT target tableinsert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;-- =====================-- DESCRIBE 语句-- =====================DESCRIBE db_name.table_name1;-- =====================-- EXPLAIN 语句-- =====================EXPLAIN PLAN FOR SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%';-- =====================-- USE 语句-- =====================USE CATALOG catalog_name1;USE database_name;-- =====================-- SHOW 语句-- =====================SHOW CATALOGS;SHOW DATABASES;SHOW TABLES;SHOW VIEWS;SHOW FUNCTIONS;