JSON 反序列化的变更
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
DDL冗余裁剪
FlinkStreamSQL不存在。Flink1.11之前存在冗余问题connector
SQL执行 API 升级
Flink 1.11 不再需要env.execute()
立即执行 env.executeSql(“INSERT xxxx”)
StatementSet stmtSet = tEnv.createStatementSet();
// only single INSERT query can be accepted by `addInsertSql` method
stmtSet.addInsertSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
stmtSet.addInsertSql(
"INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'");
// execute all statements together
TableResult tableResult2 = stmtSet.execute();
原生异步维表JOIN
INSERT INTO ads_m
SELECT id, ct AS name
FROM ods_k, LATERAL TABLE(lookup_redis(name));
INSERT INTO ads_m
SELECT id, ct AS name
FROM ods_k LEFT JOIN LATERAL TABLE(lookup_redis(name));
JDBC相关
driver 自选
JDBC一个类
https://issues.apache.org/
执行DDL时会把表先GenericInMemoryCatalog类的tables中。
TableFactory相关
KafkaDynamicTableFactoryBase
TableFactoryService