JSON 反序列化的变更

  1. 'format' = 'json',
  2. 'json.fail-on-missing-field' = 'false',
  3. 'json.ignore-parse-errors' = 'true'

DDL冗余裁剪

FlinkStreamSQL不存在。Flink1.11之前存在冗余问题connector

SQL执行 API 升级

Flink 1.11 不再需要env.execute()
立即执行 env.executeSql(“INSERT xxxx”)

  1. StatementSet stmtSet = tEnv.createStatementSet();
  2. // only single INSERT query can be accepted by `addInsertSql` method
  3. stmtSet.addInsertSql(
  4. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  5. stmtSet.addInsertSql(
  6. "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'");
  7. // execute all statements together
  8. TableResult tableResult2 = stmtSet.execute();

原生异步维表JOIN

  1. INSERT INTO ads_m
  2. SELECT id, ct AS name
  3. FROM ods_k, LATERAL TABLE(lookup_redis(name));
  4. INSERT INTO ads_m
  5. SELECT id, ct AS name
  6. FROM ods_k LEFT JOIN LATERAL TABLE(lookup_redis(name));


JDBC相关

driver 自选
JDBC一个类
https://issues.apache.org/

执行DDL时会把表先GenericInMemoryCatalog类的tables中。

TableFactory相关

KafkaDynamicTableFactoryBase
TableFactoryService