类org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl中的方法convertToRexNodes会将SQL表达式转化成关系表达式,然后获取到其中的的RexNode,如果在转化的时候,字段是保留字但没有使用反引号,则在parse的时候就会报错,convertToRexNodes方法传入参数也可能是表达式,所以不能在convertToRexNodes中直接加反引号,而应该在调用这个方法的代码中给字段加反引号。代码如下
// 类 org.apache.flink.table.planner.plan.schema.CatalogSourceTable
override def toRel(context: RelOptTable.ToRelContext): RelNode = {
val cluster = context.getCluster
val tableSourceTable = new TableSourceTable[T](
relOptSchema,
schemaTable.getTableIdentifier,
rowType,
statistic,
tableSource,
schemaTable.isStreamingMode,
catalogTable)
// 1. push table scan
// Get row type of physical fields.
val physicalFields = getRowType
.getFieldList
.filter(f => !columnExprs.contains(f.getName))
.map(f => f.getIndex)
.toArray
// Copy this table with physical scan row type.
val newRelTable = tableSourceTable.copy(tableSource, physicalFields)
val scan = LogicalTableScan.create(cluster, newRelTable)
val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)
relBuilder.push(scan)
val toRexFactory = cluster
.getPlanner
.getContext
.unwrap(classOf[FlinkContext])
.getSqlExprToRexConverterFactory
// 2. push computed column project
val fieldNames = rowType.getFieldNames.asScala
if (columnExprs.nonEmpty) {
val fieldExprs = fieldNames
.map { name =>
if (columnExprs.contains(name)) {
columnExprs(name)
} else {
name // 应该把这个name加上反引号,因为name可以是保留字
}
}.toArray
val rexNodes = toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs)
relBuilder.projectNamed(rexNodes.toList, fieldNames, true)
}
// 3. push watermark assigner
val watermarkSpec = catalogTable
.getSchema
// we only support single watermark currently
.getWatermarkSpecs.asScala.headOption
if (schemaTable.isStreamingMode && watermarkSpec.nonEmpty) {
if (TableSourceValidation.hasRowtimeAttribute(tableSource)) {
throw new TableException(
"If watermark is specified in DDL, the underlying TableSource of connector" +
" shouldn't return an non-empty list of RowtimeAttributeDescriptor" +
" via DefinedRowtimeAttributes interface.")
}
val rowtime = watermarkSpec.get.getRowtimeAttribute
if (rowtime.contains(".")) {
throw new TableException(
s"Nested field '$rowtime' as rowtime attribute is not supported right now.")
}
val rowtimeIndex = fieldNames.indexOf(rowtime)
val watermarkRexNode = toRexFactory
.create(rowType)
.convertToRexNode(watermarkSpec.get.getWatermarkExpr)
relBuilder.watermark(rowtimeIndex, watermarkRexNode)
}
// 4. returns the final RelNode
relBuilder.build()
}
// 类 org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl
@Override
public RexNode[] convertToRexNodes(String[] exprs) {
String query = String.format(QUERY_FORMAT, String.join(",", exprs));
SqlNode parsed = planner.parser().parse(query);
SqlNode validated = planner.validate(parsed);
RelNode rel = planner.rel(validated).rel;
// The plan should in the following tree
// LogicalProject
// +- TableScan
if (rel instanceof LogicalProject
&& rel.getInput(0) != null
&& rel.getInput(0) instanceof TableScan) {
return ((LogicalProject) rel).getProjects().toArray(new RexNode[0]);
} else {
throw new IllegalStateException("The root RelNode should be LogicalProject, but is " + rel.toString());
}
}