概述
由于要在Flink SQL上二次开发,所以得研究下Flink SQL的SQL编译模块。
SQL解析主流程、Flink对Calcite进行的扩展(例如FlinkSqlParserImpl类就是一个Calcite扩展)。
主要有两个类 Parser和Planner,TableEnvironment的初始化会这两个类的工厂。
Flink SQL解析器的创建
ParserImpl类
作用:Flink抽象出的解析器,主要方法是parse()。
Calcite解析成SqlNode
SqlToOperationConverter用SqlNode和Planner转换算子。
FlinkSqlParserImpl
SqlParser是Flink自定义的config参数传进来。
SqlParser parser = SqlParser.create(sql, config);
return parser.parseStmt();
巧妙的运用Supplier,这个其实是工厂方法,每次调用parse时重新创建parser和planner。这么设计的目的是catalog在每次调用时会变化,所以才动态创建对象。
@Override
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
自定义的Calcite解析逻辑
在这个目录下:flink/flink-table/flink-sql-parser
参考资料
https://matt33.com/2019/03/07/apache-calcite-process-flow/
https://github.com/quxiucheng/apache-calcite-tutorial
https://miaowenting.site/2019/11/10/Flink-SQL-with-Calcite/