1. CREATE TABLE ods_k (
  2. id INT,
  3. name STRING,
  4. PROCTIME AS PROCTIME()
  5. ) WITH (
  6. 'connector' = 'kafka-x',
  7. 'topic' = 'luna',
  8. 'properties.bootstrap.servers' = 'localhost:9092',
  9. 'properties.group.id' = 'luna_g',
  10. 'format' = 'json',
  11. 'json.timestamp-format.standard' = 'SQL'
  12. );
  13. CREATE TABLE lookup_m
  14. (
  15. id int,
  16. name varchar,
  17. PRIMARY KEY (id) NOT ENFORCED
  18. ) WITH (
  19. 'connector' = 'mysql-x',
  20. 'url' = 'jdbc:mysql://172.16.100.186:3306/dev',
  21. 'table-name' = 'ada_dim',
  22. 'username' = 'dev',
  23. 'password' = 'Abc12345',
  24. 'lookup.cache-type' = 'lru'
  25. );
  26. CREATE TABLE sink_s (
  27. id int,
  28. name varchar
  29. ) WITH (
  30. 'connector' = 'print'
  31. );
  32. INSERT INOT sink_p
  33. SELECT l.id, l.name
  34. FROM ods_k s
  35. LEFT JOIN lookup_m FOR SYSTEM_TIME AS OF s.PROCTIME AS l
  36. ON s.id = l.id;

三大关键点

Flink启动时:

  1. 解析DDL/解析配置

Flink运行时:

  1. 数据类型转换(与Flink内置类型的转换)
  2. 读写数据(建立连接,读写)

那些代码是Client执行的
运行时哪些代码是TaskManager中线程执行的

更进一步哪些代码在JobManager里执行