SqlNode转化成对应的Operation(RelNode)在类org.apache.flink.table.planner.operations.SqlToOperationConverter
中完成,代码如下
// org.apache.flink.table.planner.operations.SqlToOperationConverter
/**
* This is the main entrance for executing all kinds of DDL/DML {@code SqlNode}s, different
* SqlNode will have it's implementation in the #convert(type) method whose 'type' argument
* is subclass of {@code SqlNode}.
*
* @param flinkPlanner FlinkPlannerImpl to convertCreateTable sql node to rel node
* @param catalogManager CatalogManager to resolve full path for operations
* @param sqlNode SqlNode to execute on
*/
public static Optional<Operation> convert(
FlinkPlannerImpl flinkPlanner,
CatalogManager catalogManager,
SqlNode sqlNode) {
// validate the query
final SqlNode validated = flinkPlanner.validate(sqlNode);
SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner, catalogManager);
if (validated instanceof SqlCreateTable) {
return Optional.of(converter.convertCreateTable((SqlCreateTable) validated));
} else if (validated instanceof SqlDropTable) {
return Optional.of(converter.convertDropTable((SqlDropTable) validated));
} else if (validated instanceof SqlAlterTable) {
return Optional.of(converter.convertAlterTable((SqlAlterTable) validated));
} else if (validated instanceof SqlCreateFunction) {
return Optional.of(converter.convertCreateFunction((SqlCreateFunction) validated));
} else if (validated instanceof SqlAlterFunction) {
return Optional.of(converter.convertAlterFunction((SqlAlterFunction) validated));
} else if (validated instanceof SqlDropFunction) {
return Optional.of(converter.convertDropFunction((SqlDropFunction) validated));
} else if (validated instanceof RichSqlInsert) {
return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
} else if (validated instanceof SqlUseCatalog) {
return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated));
} else if (validated instanceof SqlUseDatabase) {
return Optional.of(converter.convertUseDatabase((SqlUseDatabase) validated));
} else if (validated instanceof SqlCreateDatabase) {
return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated));
} else if (validated instanceof SqlDropDatabase) {
return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated));
} else if (validated instanceof SqlAlterDatabase) {
return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
return Optional.empty();
}
}