在流计算中,时间属性承担了一个极其重要的作用,所有基于时间的操作,例如窗口操作,都需要正确获取时间信息。我们曾经在Flink 源码阅读笔记(12)- 时间、定时器和窗口这篇文章中分析过 Flink 内部时间属性、水位线等机制的具体实现。在这篇文章中,我们将介绍在 SQL 和 Table API 中时间属性相关的一些细节。
时间属性概览
在 Flink SQL 中,表可以提供逻辑上的时间属性用于获取时间信息,时间属性可以是处理时间也可以是事件时间。在声明一张表的时候,时间属性可以在表的 schema 中定义。有些特定的操作,如窗口关联和窗口聚合操作必须基于时间属性字段,因而时间属性可以被看作一种特殊的字段类型;但是时间属性可以当作常规的时间戳字段来使用,一旦需要在计算中使用到时间属性,就需要“物化”(materialized)时间属性,时间属性字段就会被转换成一个常规的时间戳类型。被物化后的时间属性不再与 Flink 的时间系统和水位线相关联,因而也就不可以再应用在基于时间的操作中。
在 Flink SQL 的类型系统中,时间属性和常规的时间戳类型共用同样的逻辑类型 TimestampType
,但是通过 TimestampKind
进行区分:
| ``` public enum TimestampKind { REGULAR, //常规的时间戳类型 ROWTIME, //事件时间 PROCTIME //处理时间 }
|
| --- |
由于 Flink SQL 使用 Calcite 完成查询计划的优化,Flink 的所有逻辑类型在 Calcite 中都有对应的 `RelDataType`,并且为时间属性单独创建了一种新的 `RelDataType`,即 `TimeIndicatorRelDataType`:
|
class TimeIndicatorRelDataType( typeSystem: RelDataTypeSystem, originalType: BasicSqlType, val isEventTime: Boolean) //通过 isEventTime 区分是事件时间还是处理时间 extends BasicSqlType( typeSystem, originalType.getSqlTypeName, originalType.getPrecision) { }
|
| --- |
<a name="8bf990b7"></a>
## [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#%E5%AE%9A%E4%B9%89%E8%A1%A8%E7%9A%84%E6%97%B6%E9%97%B4%E5%B1%9E%E6%80%A7)定义表的时间属性
有两种方式来定义一张表的时间属性,一种方式是在将 `DataStream` 转换成 `Table` 的过程中,另一种方式是直接在 `TableSource` 的具体实现中定义。
<a name="3eb28836"></a>
### [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#datastream-%E8%BD%AC%E6%8D%A2%E4%B8%BA-table)DataStream 转换为 Table
在将 `DataStream` 转换成一个 Table 的过程中,可以用特殊的表达式来声明时间属性对应的列:
|
//处理时间 val table = tEnv.fromDataStream(stream, ‘UserActionTimestamp, ‘Username, ‘Data, ‘UserActionTime.proctime) //事件时间 val table = tEnv.fromDataStream(stream, ‘Username, ‘Data, ‘UserActionTime.rowtime)
|
| --- |
其中 `'field.proctime` 或这 `'field.rowtime` 即声明时间属性,表达式会被转换为 `PROCTIME` 和 `ROWTIME` 这两个内置函数的调用 `UnresolvedCallExpression`:
|
// Time definition /**
* Declares a field as the rowtime attribute for indicating, accessing, and working in
* Flink's event time.
*/
def rowtime: Expression = unresolvedCall(ROWTIME, expr) /**
* Declares a field as the proctime attribute for indicating, accessing, and working in
* Flink's processing time.
*/
def proctime: Expression = unresolvedCall(PROCTIME, expr)
|
| --- |
非 Scala 环境下则通过 `ExpressionParser` 完成表达式的解析。
<a name="78503a48"></a>
### [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#%E4%BD%BF%E7%94%A8-tablesource)使用 TableSource
如果要在 `TableSource` 中定义时间属性,则需要 `TableSource` 实现 `DefinedProctimeAttribute` 或者 `DefinedRowAttribute` 接口,并且引用的时间属性必须出现在 `TableSchema` 中,类型为 timestamp 类型。如果要同时使用处理时间和事件时间,对应的 `TableSource` 需要同时实现这两个接口:
|
public interface DefinedProctimeAttribute {
@Nullable
String getProctimeAttribute();
}
public interface DefinedRowtimeAttributes {
List
|
| --- |
其中 `RowtimeAttributeDescriptor` 是对事件时间的描述,包括如何提取事件时间,以及 watermark 的生成策略等:
|
public final class RowtimeAttributeDescriptor { private final String attributeName; //时间属性名称 private final TimestampExtractor timestampExtractor; //如何提取事件时间 private final WatermarkStrategy watermarkStrategy; //如何生成 watermark }
|
| --- |
尽管返回值是 `List<RowtimeAttributeDescriptor>`,但目前 Flink SQL 只支持单个事件时间属性。
<a name="cb8a21ae"></a>
## [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#sql-%E5%BC%95%E6%93%8E%E4%B8%AD%E6%97%B6%E9%97%B4%E5%B1%9E%E6%80%A7%E7%9A%84%E8%BD%AC%E6%8D%A2)SQL 引擎中时间属性的转换
<a name="552a14e1"></a>
### [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#%E6%9B%B4%E6%96%B0-tableschema)更新 TableSchema
在将 `DataStream` 转换成一个 `Table` 的过程中,首先需要生成表结构。`Table` 的底层对应的是一个 `QueryOperation`,在这里就是 `ScalaDataStreamQueryOperation` (或者 `JavaDataStreamQueryOperation`,对应 Java API)。`QueryOperation` 提供了 `TableSchema` 和字段映射关系:
|
public class ScalaDataStreamQueryOperation
|
| --- |
获得 `TableSchema` 的逻辑主要被封装在 `FieldInfoUtils.getFieldsInfo` 方法中,主要是通过解析 `Expression` 获得表结构中每一列对应的字段在 `DataStream` 中元素的索引,并得到对应字段的类型:
|
/**
- Utility methods for extracting names and indices of fields from different {@link TypeInformation}s.
*/
public class FieldInfoUtils {
private static class ExprToFieldInfo extends ApiExpressionDefaultVisitor
{
} private static boolean isRowTimeExpression(Expression origExpr) {@Override
public FieldInfo visit(UnresolvedReferenceExpression unresolvedReference) {
return createFieldInfo(unresolvedReference, null);
}
@Override
public FieldInfo visit(UnresolvedCallExpression unresolvedCall) {
if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
return visitAlias(unresolvedCall);
} else if (isRowTimeExpression(unresolvedCall)) {
return createRowtimeFieldInfo(unresolvedCall, null);
} else if (isProcTimeExpression(unresolvedCall)) {
return createProctimeFieldInfo(unresolvedCall, null);
}
return defaultMethod(unresolvedCall);
}
} private static boolean isProcTimeExpression(Expression origExpr) {return origExpr instanceof UnresolvedCallExpression &&
((UnresolvedCallExpression) origExpr).getFunctionDefinition() == BuiltInFunctionDefinitions.ROWTIME;
} private static FieldInfo createTimeAttributeField(return origExpr instanceof UnresolvedCallExpression &&
((UnresolvedCallExpression) origExpr).getFunctionDefinition() == BuiltInFunctionDefinitions.PROCTIME;
} } ``` | | —- |UnresolvedReferenceExpression reference,
TimestampKind kind, //这里的Kind是TimestampKind.PROCTIME或TimestampKind.ROWTIME
@Nullable String alias) {
final int idx;
//对于时间属性,没有对应的索引,用特殊的标识
if (kind == TimestampKind.PROCTIME) {
idx = TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER;
} else {
idx = TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER;
}
String originalName = reference.getName();
return new FieldInfo(
alias != null ? alias : originalName,
idx,
createTimeIndicatorType(kind));
由于时间属性对应的表达式是内置函数的调用,因而可以通过判断对应的函数定义识别出来。
时间属性并不对应 DataStream
中元素真实的字段,因此会用特殊的标识来作为索引:
| ```
public class TimeIndicatorTypeInfo extends SqlTimeTypeInfo
|
| --- |
如果通过 `TableSource` 注册一张表,首先会通过 `TableSourceValidation.validateTableSource()` 验证表结构、时间属性等信息,然后会被封装为 `ConnectorCatalogTable`,在这里会完成 `TableSchema` 的更新:
|
public class ConnectorCatalogTable
|
| --- |
经过更新后的 `TableSchema`,时间属性列的 `LogicalType` 就是用特殊 `TimestampKind` 表征的 `TimestampType`。
<a name="6b9dfd02"></a>
### [](https://blog.jrwang.me/2019/2019-09-16-flink-sourcecode-sql-time-attribute.md/#%E8%BD%AC%E6%8D%A2%E5%88%B0-calcite)转换到 Calcite
在 `DatabaseCalciteSchema` 中, Flink SQL 中注册的表被转换成 `Calcite` 中使用的表:
|
class DatabaseCalciteSchema extends FlinkSchema {
@Override
public Table getTable(String tableName) {
ObjectPath tablePath = new ObjectPath(databaseName, tableName);
try {
if (!catalog.tableExists(tablePath)) {
return null;
}
CatalogBaseTable table = catalog.getTable(tablePath);
//将 Flink Catalog 中注册的表转换为 Calcite 中的 Table
if (table instanceof QueryOperationCatalogView) {
QueryOperationCatalogView view = (QueryOperationCatalogView) table;
QueryOperation operation = view.getQueryOperation();
if (operation instanceof DataStreamQueryOperation) {
List
|
| --- |
在 Flink SQL 中定义的表结构 `TableSchema` 也会经过 `FlinkTypeFactory` 转换,`LogicalType`也会被转换成 Calcite 内部使用的 `RelDataType`:
|
/**
- Flink specific type factory that represents the interface between Flink’s [[LogicalType]]
- and Calcite’s [[RelDataType]].
*/
class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) {
def createFieldTypeFromLogicalType(t: LogicalType): RelDataType = {
……
val relType = t.getTypeRoot match {
case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
case _ =>val timestampType = t.asInstanceOf[TimestampType]
timestampType.getKind match {
//时间戳类型识别出常规的时间戳和时间属性
case TimestampKind.PROCTIME => createProctimeIndicatorType(true)
case TimestampKind.ROWTIME => createRowtimeIndicatorType(true)
case TimestampKind.REGULAR => createSqlType(TIMESTAMP)
}
} ….. } } ``` | | —- |seenTypes.get(t) match {
case Some(retType: RelDataType) => retType
case None =>
val refType = newRelDataType()
seenTypes.put(t, refType)
refType
}
经过这一步转换,时间属性被转换成 TimeIndicatorRelDataType
类型。
查询计划转换
在 Calcite 优化查询计划是,会识别特殊语句中的时间属性,并转换成对应的 RelNode。例如,在 FlinkLogicalJoin
中,如果关联条件中包含了时间窗口,就会被转换为 StreamExecWindowJoin
;而不包含时间窗口的 FlinkLogicalJoin
则会被转换为 StreamExecJoin
。其区别就在于对时间属性的识别:
| ``` /**
- Rule that converts non-SEMI/ANTI [[FlinkLogicalJoin]] with window bounds in join condition
- to [[StreamExecWindowJoin]].
*/
class StreamExecWindowJoinRule
extends ConverterRule(
classOf[FlinkLogicalJoin],
FlinkConventions.LOGICAL,
FlinkConventions.STREAMPHYSICAL,
“StreamExecWindowJoinRule”) {
override def matches(call: RelOptRuleCall): Boolean = {
……
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(join)
//提取时间窗口的边界
val (windowBounds, ) = WindowJoinUtil.extractWindowBoundsFromPredicate(
join.getCondition,
join.getLeft.getRowType.getFieldCount,
joinRowType,
join.getCluster.getRexBuilder,
tableConfig)
if (windowBounds.isDefined) {
//如果识别到时间窗口,该规则匹配
if (windowBounds.get.isEventTime) {
} else {true
} } else { // the given join does not have valid window bounds. We cannot translate it. false } } } ``` | | —- |// Check that no event-time attributes are in the input because the processing time window
// join does not correctly hold back watermarks.
// We rely on projection pushdown to remove unused attributes before the join.
!joinRowType.getFieldList.exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
窗口聚合也是同理,只是规则匹配中是对 HOP
, TUMBLE
等窗口函数的识别。
物化时间属性
时间属性是一个逻辑上的列,因为它并不真实对应底层 DataStream
元素中具体的字段。因此,如果在计算操作中要用到时间属性列的值,就需要“物化”(materialized)时间属性。这一部分的逻辑主要是在 RelTimeIndicatorConverter
中。
对于 PROCTIME 时间属性,就是将其转换为对内置函数 FlinkSqlOperatorTable.PROCTIME_MATERIALIZE
的调用;而对于 ROWTIME 时间属性,则是对其进行一次强制的类型转换,转换为常规的时间戳。
| ``` /**
- Helper class for shared logic of materializing time attributes in [[RelNode]] and [[RexNode]].
*/
class RexTimeIndicatorMaterializerUtils(rexBuilder: RexBuilder) {
def materialize(expr: RexNode): RexNode = {
if (isTimeIndicatorType(expr.getType)) {
if (isRowtimeIndicatorType(expr.getType)) {
} else {// cast rowtime indicator to regular timestamp
rexBuilder.makeAbstractCast(timestamp(expr.getType.isNullable), expr)
} } else { expr } } } ``` | | —- |// generate proctime access
rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE, expr)
生成物理执行计划
SQL 的查询计划最终需要被转换为 Flink 的算子,即生成物理执行计划。对于一个查询计划来说,首先需要进行转换的就是 Scan 操作,在里将底层的 POJO、Tuple 等对象映射为 Table 中使用的 Row。在这里就需要考虑到时间属性的映射。
首先,我们来看下在 StreamExecDataStreamScan
中是如何完成字段映射的转换的:
| ``` class StreamExecDataStreamScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, outputRowType: RelDataType) extends TableScan(cluster, traitSet, table) { override protected def translateToPlanInternal( planner: StreamPlanner): Transformation[BaseRow] = { val config = planner.getTableConfig val inputDataStream: DataStream[Any] = dataStreamTable.dataStream val transform = inputDataStream.getTransformation val rowtimeExpr = getRowtimeExpression(planner.getRelBuilder) // when there is row time extraction expression, we need internal conversion // when the physical type of the input date stream is not BaseRow, we need internal conversion. if (rowtimeExpr.isDefined || ScanUtil.needsConversion(dataStreamTable.dataType)) { // extract time if the index is -1 or -2. val (extractElement, resetElement) = if (ScanUtil.hasTimeAttributeField(dataStreamTable.fieldIndexes)) { (s”ctx.$ELEMENT = $ELEMENT;”, s”ctx.$ELEMENT = null;”) } else { (“”, “”) } val ctx = CodeGeneratorContext(config).setOperatorBaseClass( classOf[AbstractProcessStreamOperator[BaseRow]]) //生成字段映射的代码 ScanUtil.convertToInternalRow( ctx, transform, dataStreamTable.fieldIndexes, //字段索引 dataStreamTable.dataType, getRowType, getTable.getQualifiedName, config, rowtimeExpr, //RowTime 表达式 beforeConvert = extractElement, afterConvert = resetElement) } else { transform.asInstanceOf[Transformation[BaseRow]] } } //获取 RowTime 时间属性的表达式 private def getRowtimeExpression(relBuilder: FlinkRelBuilder): Option[RexNode] = { val fieldIdxs = dataStreamTable.fieldIndexes if (!fieldIdxs.contains(ROWTIMESTREAM_MARKER)) { None } else { //根据字段映射中的特殊marker来查找 rowtime 字段 val rowtimeField = dataStreamTable.fieldNames(fieldIdxs.indexOf(ROWTIME_STREAM_MARKER)) // get expression to extract timestamp fromDataTypeToLogicalType(dataStreamTable.dataType) match { //如果这个datastream本身已经完成了 rowtime 时间属性的提取 case dataType: RowType if dataType.getFieldNames.contains(rowtimeField) && TypeCheckUtils.isRowTime(dataType.getTypeAt(dataType.getFieldIndex(rowtimeField))) => // if rowtimeField already existed in the data stream, use the default rowtime None case => // 用内置函数 StreamRecordTimestampSqlFunction 来提取 rowtime 时间属性 // extract timestamp from StreamRecord Some( relBuilder.cast( relBuilder.call(new StreamRecordTimestampSqlFunction), relBuilder.getTypeFactory.createFieldTypeFromLogicalType( new TimestampType(true, TimestampKind.ROWTIME, 3)).getSqlTypeName)) } } } }
|
| --- |
在进行字段映射的时候,一个关键的处理就是时间属性要怎么映射。ROWTIME 时间属性被转换为 `StreamRecordTimestampSqlFunction` 的调用。在前面物化时间属性的阶段,我们已经看到,PROCTIME 时间属性已经被转换为内置函数 `FlinkSqlOperatorTable.PROCTIME_MATERIALIZE` 的调用,而对于 ROWTIME 时间属性,只是进行了一次强制的类型转换。这主要是因为,PROCTIME 在流处理中,在不同的算子中,每一次调用都应该获取当前的系统时间;而对于 ROWTIME 而言,它的取值是固定的,因此只需要在最开始完成一次转换即可。RPOCTIME 和 ROWTIME 的取值最终都被转换成对内置函数的调用。<br />这个函数调用的代码生成就比较简单了,RPOCTIME 转换成 `context.timerService().currentProcessingTime()`,ROWTIME 转换成 `context.timestamp()`,具体的代码生成可以参考 `GenerateUtils`。<br />由于 PROCTIME 并不需要在 Scan 节点进行物化,因此在这里直接用 null 值替代,在后续需要的时候重新进行计算。<br />而对于从 `TableSource` 注册而来的表,由于它不像 `DataStream` 那样,在定义 ROWTIME 时间属性之前已经完成了 timestamp 和 watermark 的指定。因此在转换 `TableSource` 的过程中,如果定义了 ROWTIME 时间属性,除了需要提取 ROWTIME 时间属性的值以外,还需要指定 watermark。<br />`RowtimeAttributeDescriptor` 是对 ROWTIME 时间属性的描述,包括如何提取事件时间的 `TimestampExtractor`:
|
/**
- The {@link FieldComputer} interface returns an expression to compute the field of the table
- schema of a {@link TableSource} from one or more fields of the {@link TableSource}’s return type. *
- @param
The result type of the provided expression. / public interface FieldComputer { String[] getArgumentFields(); TypeInformation getReturnType(); void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes); /* - Returns the {@link Expression} that computes the value of the field. *
- @param fieldAccesses Field access expressions for the argument fields.
- @return The expression to extract the timestamp from the {@link TableSource} return type.
*/
Expression getExpression(ResolvedFieldReference[] fieldAccesses);
}
//Provides an expression to extract the timestamp for a rowtime attribute.
public abstract class TimestampExtractor implements FieldComputer
, Serializable, Descriptor { @Override public TypeInformation getReturnType() { return Types.LONG; //返回类型是 Long } } ``` | | —- |
通常提取 ROWTIME 的方式是根据已有的字段来生成,即:
| ``` public final class ExistingField extends TimestampExtractor { private String field;
/**
* Returns an {@link Expression} that casts a {@link Long}, {@link Timestamp}, or
* timestamp formatted {@link String} field (e.g., "2018-05-28 12:34:56.000")
* into a rowtime attribute.
*/
@Override
public Expression getExpression(ResolvedFieldReference[] fieldAccesses) {
ResolvedFieldReference fieldAccess = fieldAccesses[0];
DataType type = fromLegacyInfoToDataType(fieldAccess.resultType());
//字段引用的表达式
FieldReferenceExpression fieldReferenceExpr = new FieldReferenceExpression(
fieldAccess.name(),
type,
0,
fieldAccess.fieldIndex());
//支持的输入字段类型,包括 BIGINT、TIMESTAMP_WITHOUT_TIME_ZONE 和 VARCHAR
switch (type.getLogicalType().getTypeRoot()) {
case BIGINT:
case TIMESTAMP_WITHOUT_TIME_ZONE:
//直接引用相应的字段即可
return fieldReferenceExpr;
case VARCHAR:
//进行一次类型转换
DataType outputType = TIMESTAMP(3).bridgedTo(Timestamp.class);
return new CallExpression(
CAST,
Arrays.asList(fieldReferenceExpr, typeLiteral(outputType)),
outputType);
default:
throw new RuntimeException("Unsupport type: " + type);
}
}
}
|
| --- |
`TimestampExtractor` 提供的提取时间属性的 `Expression`, 要先被转换为 `RexNode` 之后才可以被用在代码生成中,通过 `TableSourceUtil.getRowtimeExtractionExpression` 完成。如同前面提到对 `StreamRecordTimestampSqlFunction` 调用,这两种获得时间属性的的方式本质上是一样的,只是调用的函数不一致。<br />在完成了字段映射和时间属性的提取后,还需要指定 watermark,这正是 `RowtimeAttributeDescriptor` 中 `WatermarkStrategy` 的用处:
|
class StreamExecTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, relOptTable: FlinkRelOptTable) extends PhysicalTableSourceScan(cluster, traitSet, relOptTable) { override protected def translateToPlanInternal( planner: StreamPlanner): Transformation[BaseRow] = { val config = planner.getTableConfig val inputTransform = getSourceTransformation(planner.getExecEnv) //1. 字段映射和时间提取 …… //2. 指定watermark val withWatermarks = if (rowtimeDesc.isDefined) { val rowtimeFieldIdx = getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName) val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy watermarkStrategy match { case p: PeriodicWatermarkAssigner => val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p) ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) case p: PunctuatedWatermarkAssigner => val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p, producedDataType) ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) case _: PreserveWatermarks => // The watermarks have already been provided by the underlying DataStream. ingestedTable } } else { // No need to generate watermarks if no rowtime attribute is specified. ingestedTable } withWatermarks.getTransformation } } ``` | | —- |
到这里,就完成了 ROWTIME 时间属性的提取的 watermark 的生成。 PROCTIME 在使用时按需要进行物化。
小结
时间属性是广泛应用在窗口操作中,是流式 SQL 处理中非常重要的概念。本文对 Flink SQL 中时间属性的使用方法和具体实现进行了介绍。
参考
-EOF-