概述

由于要在Flink SQL上二次开发,所以得研究下Flink SQL的SQL编译模块。
SQL解析主流程、Flink对Calcite进行的扩展(例如FlinkSqlParserImpl类就是一个Calcite扩展)。
主要有两个类 Parser和Planner,TableEnvironment的初始化会这两个类的工厂。

Flink SQL解析器的创建

Parser的创建

ParserImpl类

作用:Flink抽象出的解析器,主要方法是parse()。
Calcite解析成SqlNode

SqlToOperationConverter用SqlNode和Planner转换算子。

FlinkSqlParserImpl
SqlParser是Flink自定义的config参数传进来。

  1. SqlParser parser = SqlParser.create(sql, config);
  2. return parser.parseStmt();

巧妙的运用Supplier,这个其实是工厂方法,每次调用parse时重新创建parser和planner。这么设计的目的是catalog在每次调用时会变化,所以才动态创建对象。

  1. @Override
  2. public List<Operation> parse(String statement) {
  3. CalciteParser parser = calciteParserSupplier.get();
  4. FlinkPlannerImpl planner = validatorSupplier.get();
  5. // parse the sql query
  6. SqlNode parsed = parser.parse(statement);
  7. Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
  8. .orElseThrow(() -> new TableException("Unsupported query: " + statement));
  9. return Collections.singletonList(operation);
  10. }

自定义的Calcite解析逻辑
在这个目录下:flink/flink-table/flink-sql-parser

参考资料

https://matt33.com/2019/03/07/apache-calcite-process-flow/
https://github.com/quxiucheng/apache-calcite-tutorial
https://miaowenting.site/2019/11/10/Flink-SQL-with-Calcite/