类org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl中的方法convertToRexNodes会将SQL表达式转化成关系表达式,然后获取到其中的的RexNode,如果在转化的时候,字段是保留字但没有使用反引号,则在parse的时候就会报错,convertToRexNodes方法传入参数也可能是表达式,所以不能在convertToRexNodes中直接加反引号,而应该在调用这个方法的代码中给字段加反引号。代码如下
// 类 org.apache.flink.table.planner.plan.schema.CatalogSourceTableoverride def toRel(context: RelOptTable.ToRelContext): RelNode = {val cluster = context.getClusterval tableSourceTable = new TableSourceTable[T](relOptSchema,schemaTable.getTableIdentifier,rowType,statistic,tableSource,schemaTable.isStreamingMode,catalogTable)// 1. push table scan// Get row type of physical fields.val physicalFields = getRowType.getFieldList.filter(f => !columnExprs.contains(f.getName)).map(f => f.getIndex).toArray// Copy this table with physical scan row type.val newRelTable = tableSourceTable.copy(tableSource, physicalFields)val scan = LogicalTableScan.create(cluster, newRelTable)val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)relBuilder.push(scan)val toRexFactory = cluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getSqlExprToRexConverterFactory// 2. push computed column projectval fieldNames = rowType.getFieldNames.asScalaif (columnExprs.nonEmpty) {val fieldExprs = fieldNames.map { name =>if (columnExprs.contains(name)) {columnExprs(name)} else {name // 应该把这个name加上反引号,因为name可以是保留字}}.toArrayval rexNodes = toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs)relBuilder.projectNamed(rexNodes.toList, fieldNames, true)}// 3. push watermark assignerval watermarkSpec = catalogTable.getSchema// we only support single watermark currently.getWatermarkSpecs.asScala.headOptionif (schemaTable.isStreamingMode && watermarkSpec.nonEmpty) {if (TableSourceValidation.hasRowtimeAttribute(tableSource)) {throw new TableException("If watermark is specified in DDL, the underlying TableSource of connector" +" shouldn't return an non-empty list of RowtimeAttributeDescriptor" +" via DefinedRowtimeAttributes interface.")}val rowtime = watermarkSpec.get.getRowtimeAttributeif (rowtime.contains(".")) {throw new TableException(s"Nested field '$rowtime' as rowtime attribute is not supported right now.")}val rowtimeIndex = fieldNames.indexOf(rowtime)val watermarkRexNode = toRexFactory.create(rowType).convertToRexNode(watermarkSpec.get.getWatermarkExpr)relBuilder.watermark(rowtimeIndex, watermarkRexNode)}// 4. returns the final RelNoderelBuilder.build()}
// 类 org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl@Overridepublic RexNode[] convertToRexNodes(String[] exprs) {String query = String.format(QUERY_FORMAT, String.join(",", exprs));SqlNode parsed = planner.parser().parse(query);SqlNode validated = planner.validate(parsed);RelNode rel = planner.rel(validated).rel;// The plan should in the following tree// LogicalProject// +- TableScanif (rel instanceof LogicalProject&& rel.getInput(0) != null&& rel.getInput(0) instanceof TableScan) {return ((LogicalProject) rel).getProjects().toArray(new RexNode[0]);} else {throw new IllegalStateException("The root RelNode should be LogicalProject, but is " + rel.toString());}}
