CREATE TABLE ods_k (
id INT,
name STRING,
PROCTIME AS PROCTIME()
) WITH (
'connector' = 'kafka-x',
'topic' = 'luna',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'luna_g',
'format' = 'json',
'json.timestamp-format.standard' = 'SQL'
);
CREATE TABLE lookup_m
(
id int,
name varchar,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://172.16.100.186:3306/dev',
'table-name' = 'ada_dim',
'username' = 'dev',
'password' = 'Abc12345',
'lookup.cache-type' = 'lru'
);
CREATE TABLE sink_s (
id int,
name varchar
) WITH (
'connector' = 'print'
);
INSERT INOT sink_p
SELECT l.id, l.name
FROM ods_k s
LEFT JOIN lookup_m FOR SYSTEM_TIME AS OF s.PROCTIME AS l
ON s.id = l.id;
三大关键点
Flink启动时:
- 解析DDL/解析配置
Flink运行时:
- 数据类型转换(与Flink内置类型的转换)
- 读写数据(建立连接,读写)
那些代码是Client执行的
运行时哪些代码是TaskManager中线程执行的
更进一步哪些代码在JobManager里执行