org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl中的方法convertToRexNodes会将SQL表达式转化成关系表达式,然后获取到其中的的RexNode,如果在转化的时候,字段是保留字但没有使用反引号,则在parse的时候就会报错,convertToRexNodes方法传入参数也可能是表达式,所以不能在convertToRexNodes中直接加反引号,而应该在调用这个方法的代码中给字段加反引号。代码如下

    1. // 类 org.apache.flink.table.planner.plan.schema.CatalogSourceTable
    2. override def toRel(context: RelOptTable.ToRelContext): RelNode = {
    3. val cluster = context.getCluster
    4. val tableSourceTable = new TableSourceTable[T](
    5. relOptSchema,
    6. schemaTable.getTableIdentifier,
    7. rowType,
    8. statistic,
    9. tableSource,
    10. schemaTable.isStreamingMode,
    11. catalogTable)
    12. // 1. push table scan
    13. // Get row type of physical fields.
    14. val physicalFields = getRowType
    15. .getFieldList
    16. .filter(f => !columnExprs.contains(f.getName))
    17. .map(f => f.getIndex)
    18. .toArray
    19. // Copy this table with physical scan row type.
    20. val newRelTable = tableSourceTable.copy(tableSource, physicalFields)
    21. val scan = LogicalTableScan.create(cluster, newRelTable)
    22. val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)
    23. relBuilder.push(scan)
    24. val toRexFactory = cluster
    25. .getPlanner
    26. .getContext
    27. .unwrap(classOf[FlinkContext])
    28. .getSqlExprToRexConverterFactory
    29. // 2. push computed column project
    30. val fieldNames = rowType.getFieldNames.asScala
    31. if (columnExprs.nonEmpty) {
    32. val fieldExprs = fieldNames
    33. .map { name =>
    34. if (columnExprs.contains(name)) {
    35. columnExprs(name)
    36. } else {
    37. name // 应该把这个name加上反引号,因为name可以是保留字
    38. }
    39. }.toArray
    40. val rexNodes = toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs)
    41. relBuilder.projectNamed(rexNodes.toList, fieldNames, true)
    42. }
    43. // 3. push watermark assigner
    44. val watermarkSpec = catalogTable
    45. .getSchema
    46. // we only support single watermark currently
    47. .getWatermarkSpecs.asScala.headOption
    48. if (schemaTable.isStreamingMode && watermarkSpec.nonEmpty) {
    49. if (TableSourceValidation.hasRowtimeAttribute(tableSource)) {
    50. throw new TableException(
    51. "If watermark is specified in DDL, the underlying TableSource of connector" +
    52. " shouldn't return an non-empty list of RowtimeAttributeDescriptor" +
    53. " via DefinedRowtimeAttributes interface.")
    54. }
    55. val rowtime = watermarkSpec.get.getRowtimeAttribute
    56. if (rowtime.contains(".")) {
    57. throw new TableException(
    58. s"Nested field '$rowtime' as rowtime attribute is not supported right now.")
    59. }
    60. val rowtimeIndex = fieldNames.indexOf(rowtime)
    61. val watermarkRexNode = toRexFactory
    62. .create(rowType)
    63. .convertToRexNode(watermarkSpec.get.getWatermarkExpr)
    64. relBuilder.watermark(rowtimeIndex, watermarkRexNode)
    65. }
    66. // 4. returns the final RelNode
    67. relBuilder.build()
    68. }
    1. // 类 org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl
    2. @Override
    3. public RexNode[] convertToRexNodes(String[] exprs) {
    4. String query = String.format(QUERY_FORMAT, String.join(",", exprs));
    5. SqlNode parsed = planner.parser().parse(query);
    6. SqlNode validated = planner.validate(parsed);
    7. RelNode rel = planner.rel(validated).rel;
    8. // The plan should in the following tree
    9. // LogicalProject
    10. // +- TableScan
    11. if (rel instanceof LogicalProject
    12. && rel.getInput(0) != null
    13. && rel.getInput(0) instanceof TableScan) {
    14. return ((LogicalProject) rel).getProjects().toArray(new RexNode[0]);
    15. } else {
    16. throw new IllegalStateException("The root RelNode should be LogicalProject, but is " + rel.toString());
    17. }
    18. }