• 时间属性概览">时间属性概览
    • 查询计划转换">查询计划转换
    • 物化时间属性">物化时间属性
    • 生成物理执行计划">生成物理执行计划
  • 小结">小结
  • 参考">参考

    在流计算中,时间属性承担了一个极其重要的作用,所有基于时间的操作,例如窗口操作,都需要正确获取时间信息。我们曾经在Flink 源码阅读笔记(12)- 时间、定时器和窗口这篇文章中分析过 Flink 内部时间属性、水位线等机制的具体实现。在这篇文章中,我们将介绍在 SQL 和 Table API 中时间属性相关的一些细节。

    时间属性概览

    在 Flink SQL 中,表可以提供逻辑上的时间属性用于获取时间信息,时间属性可以是处理时间也可以是事件时间。在声明一张表的时候,时间属性可以在表的 schema 中定义。有些特定的操作,如窗口关联和窗口聚合操作必须基于时间属性字段,因而时间属性可以被看作一种特殊的字段类型;但是时间属性可以当作常规的时间戳字段来使用,一旦需要在计算中使用到时间属性,就需要“物化”(materialized)时间属性,时间属性字段就会被转换成一个常规的时间戳类型。被物化后的时间属性不再与 Flink 的时间系统和水位线相关联,因而也就不可以再应用在基于时间的操作中。
    在 Flink SQL 的类型系统中,时间属性和常规的时间戳类型共用同样的逻辑类型 TimestampType,但是通过 TimestampKind 进行区分:

    | ``` public enum TimestampKind { REGULAR, //常规的时间戳类型 ROWTIME, //事件时间 PROCTIME //处理时间 }

    1. |
    2. | --- |
    3. 由于 Flink SQL 使用 Calcite 完成查询计划的优化,Flink 的所有逻辑类型在 Calcite 中都有对应的 `RelDataType`,并且为时间属性单独创建了一种新的 `RelDataType`,即 `TimeIndicatorRelDataType`:
    4. |

    class TimeIndicatorRelDataType( typeSystem: RelDataTypeSystem, originalType: BasicSqlType, val isEventTime: Boolean) //通过 isEventTime 区分是事件时间还是处理时间 extends BasicSqlType( typeSystem, originalType.getSqlTypeName, originalType.getPrecision) { }

    1. |
    2. | --- |
    3. <a name="8bf990b7"></a>
    4. ## [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#%E5%AE%9A%E4%B9%89%E8%A1%A8%E7%9A%84%E6%97%B6%E9%97%B4%E5%B1%9E%E6%80%A7)定义表的时间属性
    5. 有两种方式来定义一张表的时间属性,一种方式是在将 `DataStream` 转换成 `Table` 的过程中,另一种方式是直接在 `TableSource` 的具体实现中定义。
    6. <a name="3eb28836"></a>
    7. ### [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#datastream-%E8%BD%AC%E6%8D%A2%E4%B8%BA-table)DataStream 转换为 Table
    8. 在将 `DataStream` 转换成一个 Table 的过程中,可以用特殊的表达式来声明时间属性对应的列:
    9. |

    //处理时间 val table = tEnv.fromDataStream(stream, ‘UserActionTimestamp, ‘Username, ‘Data, ‘UserActionTime.proctime) //事件时间 val table = tEnv.fromDataStream(stream, ‘Username, ‘Data, ‘UserActionTime.rowtime)

    1. |
    2. | --- |
    3. 其中 `'field.proctime` 或这 `'field.rowtime` 即声明时间属性,表达式会被转换为 `PROCTIME` `ROWTIME` 这两个内置函数的调用 `UnresolvedCallExpression`
    4. |

    // Time definition /**

    1. * Declares a field as the rowtime attribute for indicating, accessing, and working in
    2. * Flink's event time.
    3. */

    def rowtime: Expression = unresolvedCall(ROWTIME, expr) /**

    1. * Declares a field as the proctime attribute for indicating, accessing, and working in
    2. * Flink's processing time.
    3. */

    def proctime: Expression = unresolvedCall(PROCTIME, expr)

    1. |
    2. | --- |
    3. Scala 环境下则通过 `ExpressionParser` 完成表达式的解析。
    4. <a name="78503a48"></a>
    5. ### [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#%E4%BD%BF%E7%94%A8-tablesource)使用 TableSource
    6. 如果要在 `TableSource` 中定义时间属性,则需要 `TableSource` 实现 `DefinedProctimeAttribute` 或者 `DefinedRowAttribute` 接口,并且引用的时间属性必须出现在 `TableSchema` 中,类型为 timestamp 类型。如果要同时使用处理时间和事件时间,对应的 `TableSource` 需要同时实现这两个接口:
    7. |

    public interface DefinedProctimeAttribute { @Nullable String getProctimeAttribute(); } public interface DefinedRowtimeAttributes { List getRowtimeAttributeDescriptors(); }

    1. |
    2. | --- |
    3. 其中 `RowtimeAttributeDescriptor` 是对事件时间的描述,包括如何提取事件时间,以及 watermark 的生成策略等:
    4. |

    public final class RowtimeAttributeDescriptor { private final String attributeName; //时间属性名称 private final TimestampExtractor timestampExtractor; //如何提取事件时间 private final WatermarkStrategy watermarkStrategy; //如何生成 watermark }

    1. |
    2. | --- |
    3. 尽管返回值是 `List<RowtimeAttributeDescriptor>`,但目前 Flink SQL 只支持单个事件时间属性。
    4. <a name="cb8a21ae"></a>
    5. ## [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#sql-%E5%BC%95%E6%93%8E%E4%B8%AD%E6%97%B6%E9%97%B4%E5%B1%9E%E6%80%A7%E7%9A%84%E8%BD%AC%E6%8D%A2)SQL 引擎中时间属性的转换
    6. <a name="552a14e1"></a>
    7. ### [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#%E6%9B%B4%E6%96%B0-tableschema)更新 TableSchema
    8. 在将 `DataStream` 转换成一个 `Table` 的过程中,首先需要生成表结构。`Table` 的底层对应的是一个 `QueryOperation`,在这里就是 `ScalaDataStreamQueryOperation` (或者 `JavaDataStreamQueryOperation`,对应 Java API)。`QueryOperation` 提供了 `TableSchema` 和字段映射关系:
    9. |

    public class ScalaDataStreamQueryOperation implements QueryOperation { private final DataStream dataStream; private final int[] fieldIndices; //字段索引映射关系 private final TableSchema tableSchema; //表结构 }

    1. |
    2. | --- |
    3. 获得 `TableSchema` 的逻辑主要被封装在 `FieldInfoUtils.getFieldsInfo` 方法中,主要是通过解析 `Expression` 获得表结构中每一列对应的字段在 `DataStream` 中元素的索引,并得到对应字段的类型:
    4. |

    /**

    • Utility methods for extracting names and indices of fields from different {@link TypeInformation}s. */ public class FieldInfoUtils { private static class ExprToFieldInfo extends ApiExpressionDefaultVisitor {
      1. @Override
      2. public FieldInfo visit(UnresolvedReferenceExpression unresolvedReference) {
      3. return createFieldInfo(unresolvedReference, null);
      4. }
      5. @Override
      6. public FieldInfo visit(UnresolvedCallExpression unresolvedCall) {
      7. if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
      8. return visitAlias(unresolvedCall);
      9. } else if (isRowTimeExpression(unresolvedCall)) {
      10. return createRowtimeFieldInfo(unresolvedCall, null);
      11. } else if (isProcTimeExpression(unresolvedCall)) {
      12. return createProctimeFieldInfo(unresolvedCall, null);
      13. }
      14. return defaultMethod(unresolvedCall);
      15. }
      } private static boolean isRowTimeExpression(Expression origExpr) {
      1. return origExpr instanceof UnresolvedCallExpression &&
      2. ((UnresolvedCallExpression) origExpr).getFunctionDefinition() == BuiltInFunctionDefinitions.ROWTIME;
      } private static boolean isProcTimeExpression(Expression origExpr) {
      1. return origExpr instanceof UnresolvedCallExpression &&
      2. ((UnresolvedCallExpression) origExpr).getFunctionDefinition() == BuiltInFunctionDefinitions.PROCTIME;
      } private static FieldInfo createTimeAttributeField(
      1. UnresolvedReferenceExpression reference,
      2. TimestampKind kind, //这里的Kind是TimestampKind.PROCTIME或TimestampKind.ROWTIME
      3. @Nullable String alias) {
      4. final int idx;
      5. //对于时间属性,没有对应的索引,用特殊的标识
      6. if (kind == TimestampKind.PROCTIME) {
      7. idx = TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER;
      8. } else {
      9. idx = TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER;
      10. }
      11. String originalName = reference.getName();
      12. return new FieldInfo(
      13. alias != null ? alias : originalName,
      14. idx,
      15. createTimeIndicatorType(kind));
      } } ``` | | —- |

    由于时间属性对应的表达式是内置函数的调用,因而可以通过判断对应的函数定义识别出来。
    时间属性并不对应 DataStream 中元素真实的字段,因此会用特殊的标识来作为索引:

    | ``` public class TimeIndicatorTypeInfo extends SqlTimeTypeInfo { public static final int ROWTIME_STREAM_MARKER = -1; public static final int PROCTIME_STREAM_MARKER = -2; }

    1. |
    2. | --- |
    3. 如果通过 `TableSource` 注册一张表,首先会通过 `TableSourceValidation.validateTableSource()` 验证表结构、时间属性等信息,然后会被封装为 `ConnectorCatalogTable`,在这里会完成 `TableSchema` 的更新:
    4. |

    public class ConnectorCatalogTable extends AbstractCatalogTable { public static ConnectorCatalogTable source(TableSource source, boolean isBatch) { //更新 TableSchema final TableSchema tableSchema = calculateSourceSchema(source, isBatch); return new ConnectorCatalogTable<>(source, null, tableSchema, isBatch); } private static TableSchema calculateSourceSchema(TableSource source, boolean isBatch) { TableSchema tableSchema = source.getTableSchema(); if (isBatch) { return tableSchema; } DataType[] types = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount()); String[] fieldNames = tableSchema.getFieldNames(); //检查是否实现了 DefinedRowtimeAttributes 接口 if (source instanceof DefinedRowtimeAttributes) { updateRowtimeIndicators((DefinedRowtimeAttributes) source, fieldNames, types); } //检查是否实现了 DefinedProctimeAttribute 接口 if (source instanceof DefinedProctimeAttribute) { updateProctimeIndicator((DefinedProctimeAttribute) source, fieldNames, types); } return TableSchema.builder().fields(fieldNames, types).build(); } private static void updateRowtimeIndicators( DefinedRowtimeAttributes source, String[] fieldNames, DataType[] types) { List rowtimeAttributes = source.getRowtimeAttributeDescriptors() .stream() .map(RowtimeAttributeDescriptor::getAttributeName) .collect(Collectors.toList()); for (int i = 0; i < fieldNames.length; i++) { if (rowtimeAttributes.contains(fieldNames[i])) { // bridged to timestamp for compatible flink-planner types[i] = new AtomicDataType(new TimestampType(true, TimestampKind.ROWTIME, 3)) .bridgedTo(java.sql.Timestamp.class); } } } }

    1. |
    2. | --- |
    3. 经过更新后的 `TableSchema`,时间属性列的 `LogicalType` 就是用特殊 `TimestampKind` 表征的 `TimestampType`
    4. <a name="6b9dfd02"></a>
    5. ### [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#%E8%BD%AC%E6%8D%A2%E5%88%B0-calcite)转换到 Calcite
    6. `DatabaseCalciteSchema` 中, Flink SQL 中注册的表被转换成 `Calcite` 中使用的表:
    7. |

    class DatabaseCalciteSchema extends FlinkSchema { @Override public Table getTable(String tableName) { ObjectPath tablePath = new ObjectPath(databaseName, tableName); try { if (!catalog.tableExists(tablePath)) { return null; } CatalogBaseTable table = catalog.getTable(tablePath); //将 Flink Catalog 中注册的表转换为 Calcite 中的 Table if (table instanceof QueryOperationCatalogView) { QueryOperationCatalogView view = (QueryOperationCatalogView) table; QueryOperation operation = view.getQueryOperation(); if (operation instanceof DataStreamQueryOperation) { List qualifiedName = Arrays.asList(catalogName, databaseName, tableName); ((DataStreamQueryOperation) operation).setQualifiedName(qualifiedName); } else if (operation instanceof RichTableSourceQueryOperation) { List qualifiedName = Arrays.asList(catalogName, databaseName, tableName); ((RichTableSourceQueryOperation) operation).setQualifiedName(qualifiedName); } return QueryOperationCatalogViewTable.createCalciteTable(view); } else if (table instanceof ConnectorCatalogTable) { return convertConnectorTable((ConnectorCatalogTable<?, ?>) table, tablePath); } else if (table instanceof CatalogTable) { return convertCatalogTable(tablePath, (CatalogTable) table); } else { throw new TableException(“Unsupported table type: “ + table); } } catch (TableNotExistException | CatalogException e) { // TableNotExistException should never happen, because we are checking it exists // via catalog.tableExists throw new TableException(format( “A failure occurred when accessing table. Table path [%s, %s, %s]”, catalogName, databaseName, tableName), e); } } }

    1. |
    2. | --- |
    3. Flink SQL 中定义的表结构 `TableSchema` 也会经过 `FlinkTypeFactory` 转换,`LogicalType`也会被转换成 Calcite 内部使用的 `RelDataType`:
    4. |

    /**

    • Flink specific type factory that represents the interface between Flink’s [[LogicalType]]
    • and Calcite’s [[RelDataType]]. */ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) { def createFieldTypeFromLogicalType(t: LogicalType): RelDataType = { …… val relType = t.getTypeRoot match { case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
      1. val timestampType = t.asInstanceOf[TimestampType]
      2. timestampType.getKind match {
      3. //时间戳类型识别出常规的时间戳和时间属性
      4. case TimestampKind.PROCTIME => createProctimeIndicatorType(true)
      5. case TimestampKind.ROWTIME => createRowtimeIndicatorType(true)
      6. case TimestampKind.REGULAR => createSqlType(TIMESTAMP)
      7. }
      case _ =>
      1. seenTypes.get(t) match {
      2. case Some(retType: RelDataType) => retType
      3. case None =>
      4. val refType = newRelDataType()
      5. seenTypes.put(t, refType)
      6. refType
      7. }
      } ….. } } ``` | | —- |

    经过这一步转换,时间属性被转换成 TimeIndicatorRelDataType 类型。

    查询计划转换

    在 Calcite 优化查询计划是,会识别特殊语句中的时间属性,并转换成对应的 RelNode。例如,在 FlinkLogicalJoin 中,如果关联条件中包含了时间窗口,就会被转换为 StreamExecWindowJoin;而不包含时间窗口的 FlinkLogicalJoin 则会被转换为 StreamExecJoin。其区别就在于对时间属性的识别:

    | ``` /**

    • Rule that converts non-SEMI/ANTI [[FlinkLogicalJoin]] with window bounds in join condition
    • to [[StreamExecWindowJoin]]. */ class StreamExecWindowJoinRule extends ConverterRule( classOf[FlinkLogicalJoin], FlinkConventions.LOGICAL, FlinkConventions.STREAMPHYSICAL, “StreamExecWindowJoinRule”) { override def matches(call: RelOptRuleCall): Boolean = { …… val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(join) //提取时间窗口的边界 val (windowBounds, ) = WindowJoinUtil.extractWindowBoundsFromPredicate( join.getCondition, join.getLeft.getRowType.getFieldCount, joinRowType, join.getCluster.getRexBuilder, tableConfig) if (windowBounds.isDefined) { //如果识别到时间窗口,该规则匹配 if (windowBounds.get.isEventTime) {
      1. true
      } else {
      1. // Check that no event-time attributes are in the input because the processing time window
      2. // join does not correctly hold back watermarks.
      3. // We rely on projection pushdown to remove unused attributes before the join.
      4. !joinRowType.getFieldList.exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
      } } else { // the given join does not have valid window bounds. We cannot translate it. false } } } ``` | | —- |

    窗口聚合也是同理,只是规则匹配中是对 HOP, TUMBLE 等窗口函数的识别。

    物化时间属性

    时间属性是一个逻辑上的列,因为它并不真实对应底层 DataStream 元素中具体的字段。因此,如果在计算操作中要用到时间属性列的值,就需要“物化”(materialized)时间属性。这一部分的逻辑主要是在 RelTimeIndicatorConverter 中。
    对于 PROCTIME 时间属性,就是将其转换为对内置函数 FlinkSqlOperatorTable.PROCTIME_MATERIALIZE 的调用;而对于 ROWTIME 时间属性,则是对其进行一次强制的类型转换,转换为常规的时间戳。

    | ``` /**

    • Helper class for shared logic of materializing time attributes in [[RelNode]] and [[RexNode]]. */ class RexTimeIndicatorMaterializerUtils(rexBuilder: RexBuilder) { def materialize(expr: RexNode): RexNode = { if (isTimeIndicatorType(expr.getType)) { if (isRowtimeIndicatorType(expr.getType)) {
      1. // cast rowtime indicator to regular timestamp
      2. rexBuilder.makeAbstractCast(timestamp(expr.getType.isNullable), expr)
      } else {
      1. // generate proctime access
      2. rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE, expr)
      } } else { expr } } } ``` | | —- |

    生成物理执行计划

    SQL 的查询计划最终需要被转换为 Flink 的算子,即生成物理执行计划。对于一个查询计划来说,首先需要进行转换的就是 Scan 操作,在里将底层的 POJO、Tuple 等对象映射为 Table 中使用的 Row。在这里就需要考虑到时间属性的映射。
    首先,我们来看下在 StreamExecDataStreamScan 中是如何完成字段映射的转换的:

    | ``` class StreamExecDataStreamScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, outputRowType: RelDataType) extends TableScan(cluster, traitSet, table) { override protected def translateToPlanInternal( planner: StreamPlanner): Transformation[BaseRow] = { val config = planner.getTableConfig val inputDataStream: DataStream[Any] = dataStreamTable.dataStream val transform = inputDataStream.getTransformation val rowtimeExpr = getRowtimeExpression(planner.getRelBuilder) // when there is row time extraction expression, we need internal conversion // when the physical type of the input date stream is not BaseRow, we need internal conversion. if (rowtimeExpr.isDefined || ScanUtil.needsConversion(dataStreamTable.dataType)) { // extract time if the index is -1 or -2. val (extractElement, resetElement) = if (ScanUtil.hasTimeAttributeField(dataStreamTable.fieldIndexes)) { (s”ctx.$ELEMENT = $ELEMENT;”, s”ctx.$ELEMENT = null;”) } else { (“”, “”) } val ctx = CodeGeneratorContext(config).setOperatorBaseClass( classOf[AbstractProcessStreamOperator[BaseRow]]) //生成字段映射的代码 ScanUtil.convertToInternalRow( ctx, transform, dataStreamTable.fieldIndexes, //字段索引 dataStreamTable.dataType, getRowType, getTable.getQualifiedName, config, rowtimeExpr, //RowTime 表达式 beforeConvert = extractElement, afterConvert = resetElement) } else { transform.asInstanceOf[Transformation[BaseRow]] } } //获取 RowTime 时间属性的表达式 private def getRowtimeExpression(relBuilder: FlinkRelBuilder): Option[RexNode] = { val fieldIdxs = dataStreamTable.fieldIndexes if (!fieldIdxs.contains(ROWTIMESTREAM_MARKER)) { None } else { //根据字段映射中的特殊marker来查找 rowtime 字段 val rowtimeField = dataStreamTable.fieldNames(fieldIdxs.indexOf(ROWTIME_STREAM_MARKER)) // get expression to extract timestamp fromDataTypeToLogicalType(dataStreamTable.dataType) match { //如果这个datastream本身已经完成了 rowtime 时间属性的提取 case dataType: RowType if dataType.getFieldNames.contains(rowtimeField) && TypeCheckUtils.isRowTime(dataType.getTypeAt(dataType.getFieldIndex(rowtimeField))) => // if rowtimeField already existed in the data stream, use the default rowtime None case => // 用内置函数 StreamRecordTimestampSqlFunction 来提取 rowtime 时间属性 // extract timestamp from StreamRecord Some( relBuilder.cast( relBuilder.call(new StreamRecordTimestampSqlFunction), relBuilder.getTypeFactory.createFieldTypeFromLogicalType( new TimestampType(true, TimestampKind.ROWTIME, 3)).getSqlTypeName)) } } } }

    1. |
    2. | --- |
    3. 在进行字段映射的时候,一个关键的处理就是时间属性要怎么映射。ROWTIME 时间属性被转换为 `StreamRecordTimestampSqlFunction` 的调用。在前面物化时间属性的阶段,我们已经看到,PROCTIME 时间属性已经被转换为内置函数 `FlinkSqlOperatorTable.PROCTIME_MATERIALIZE` 的调用,而对于 ROWTIME 时间属性,只是进行了一次强制的类型转换。这主要是因为,PROCTIME 在流处理中,在不同的算子中,每一次调用都应该获取当前的系统时间;而对于 ROWTIME 而言,它的取值是固定的,因此只需要在最开始完成一次转换即可。RPOCTIME ROWTIME 的取值最终都被转换成对内置函数的调用。<br />这个函数调用的代码生成就比较简单了,RPOCTIME 转换成 `context.timerService().currentProcessingTime()`ROWTIME 转换成 `context.timestamp()`,具体的代码生成可以参考 `GenerateUtils`。<br />由于 PROCTIME 并不需要在 Scan 节点进行物化,因此在这里直接用 null 值替代,在后续需要的时候重新进行计算。<br />而对于从 `TableSource` 注册而来的表,由于它不像 `DataStream` 那样,在定义 ROWTIME 时间属性之前已经完成了 timestamp watermark 的指定。因此在转换 `TableSource` 的过程中,如果定义了 ROWTIME 时间属性,除了需要提取 ROWTIME 时间属性的值以外,还需要指定 watermark。<br />`RowtimeAttributeDescriptor` 是对 ROWTIME 时间属性的描述,包括如何提取事件时间的 `TimestampExtractor`
    4. |

    /**

    • The {@link FieldComputer} interface returns an expression to compute the field of the table
    • schema of a {@link TableSource} from one or more fields of the {@link TableSource}’s return type. *
    • @param The result type of the provided expression. / public interface FieldComputer { String[] getArgumentFields(); TypeInformation getReturnType(); void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes); /*
      • Returns the {@link Expression} that computes the value of the field. *
      • @param fieldAccesses Field access expressions for the argument fields.
      • @return The expression to extract the timestamp from the {@link TableSource} return type. */ Expression getExpression(ResolvedFieldReference[] fieldAccesses); } //Provides an expression to extract the timestamp for a rowtime attribute. public abstract class TimestampExtractor implements FieldComputer, Serializable, Descriptor { @Override public TypeInformation getReturnType() { return Types.LONG; //返回类型是 Long } } ``` | | —- |

    通常提取 ROWTIME 的方式是根据已有的字段来生成,即:

    | ``` public final class ExistingField extends TimestampExtractor { private String field;

    1. /**
    2. * Returns an {@link Expression} that casts a {@link Long}, {@link Timestamp}, or
    3. * timestamp formatted {@link String} field (e.g., "2018-05-28 12:34:56.000")
    4. * into a rowtime attribute.
    5. */
    6. @Override
    7. public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
    8. ResolvedFieldReference fieldAccess = fieldAccesses[0];
    9. DataType type = fromLegacyInfoToDataType(fieldAccess.resultType());
    10. //字段引用的表达式
    11. FieldReferenceExpression fieldReferenceExpr = new FieldReferenceExpression(
    12. fieldAccess.name(),
    13. type,
    14. 0,
    15. fieldAccess.fieldIndex());
    16. //支持的输入字段类型,包括 BIGINT、TIMESTAMP_WITHOUT_TIME_ZONE 和 VARCHAR
    17. switch (type.getLogicalType().getTypeRoot()) {
    18. case BIGINT:
    19. case TIMESTAMP_WITHOUT_TIME_ZONE:
    20. //直接引用相应的字段即可
    21. return fieldReferenceExpr;
    22. case VARCHAR:
    23. //进行一次类型转换
    24. DataType outputType = TIMESTAMP(3).bridgedTo(Timestamp.class);
    25. return new CallExpression(
    26. CAST,
    27. Arrays.asList(fieldReferenceExpr, typeLiteral(outputType)),
    28. outputType);
    29. default:
    30. throw new RuntimeException("Unsupport type: " + type);
    31. }
    32. }

    }

    1. |
    2. | --- |
    3. `TimestampExtractor` 提供的提取时间属性的 `Expression`, 要先被转换为 `RexNode` 之后才可以被用在代码生成中,通过 `TableSourceUtil.getRowtimeExtractionExpression` 完成。如同前面提到对 `StreamRecordTimestampSqlFunction` 调用,这两种获得时间属性的的方式本质上是一样的,只是调用的函数不一致。<br />在完成了字段映射和时间属性的提取后,还需要指定 watermark,这正是 `RowtimeAttributeDescriptor` `WatermarkStrategy` 的用处:
    4. |

    class StreamExecTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, relOptTable: FlinkRelOptTable) extends PhysicalTableSourceScan(cluster, traitSet, relOptTable) { override protected def translateToPlanInternal( planner: StreamPlanner): Transformation[BaseRow] = { val config = planner.getTableConfig val inputTransform = getSourceTransformation(planner.getExecEnv) //1. 字段映射和时间提取 …… //2. 指定watermark val withWatermarks = if (rowtimeDesc.isDefined) { val rowtimeFieldIdx = getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName) val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy watermarkStrategy match { case p: PeriodicWatermarkAssigner => val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p) ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) case p: PunctuatedWatermarkAssigner => val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p, producedDataType) ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) case _: PreserveWatermarks => // The watermarks have already been provided by the underlying DataStream. ingestedTable } } else { // No need to generate watermarks if no rowtime attribute is specified. ingestedTable } withWatermarks.getTransformation } } ``` | | —- |

    到这里,就完成了 ROWTIME 时间属性的提取的 watermark 的生成。 PROCTIME 在使用时按需要进行物化。

    小结

    时间属性是广泛应用在窗口操作中,是流式 SQL 处理中非常重要的概念。本文对 Flink SQL 中时间属性的使用方法和具体实现进行了介绍。

    参考

    -EOF-