无论表达式输入是有界批量输入还是无界流输入,Table API和 SQL 查询都具有相同的语义。在许多情况下,对流输入的连续查询能够计算与离线计算结果相同的准确结果。然而,这在一般情况下是不可能的,因为连续查询必须限制它们维护的状态的大小,以避免耗尽存储并且能够在很长一段时间内处理无界流数据。因此,连续查询可能只能提供近似结果,具体取决于输入数据的特征和查询本身。(这里需要说明的是,状态的本质其实还是存储,所以对于状态的维护,需要不断的清理)

Flink 的 Table API 和 SQ L接口提供参数来调整连续查询的准确性和资源消耗。参数通过 QueryConfig 对象指定。QueryConfig 可以从 TableEnvironment 获得,并在转换 Table 时传回,即,当它转换为DataStream通过TableSink发出时

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // obtain query configuration from TableEnvironment
  4. StreamQueryConfig qConfig = tableEnv.queryConfig();
  5. // set query parameters
  6. qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
  7. // define query
  8. Table result = ...
  9. // create TableSink
  10. TableSink<Row> sink = ...
  11. // register TableSink
  12. tableEnv.registerTableSink(
  13. "outputTable", // table name
  14. new String[]{...}, // field names
  15. new TypeInformation[]{...}, // field types
  16. sink); // table sink
  17. // emit result Table via a TableSink
  18. result.insertInto("outputTable", qConfig);
  19. // convert result Table into a DataStream<Row>
  20. DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // obtain query configuration from TableEnvironment val qConfig: StreamQueryConfig = tableEnv.queryConfig
  4. // set query parameters qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
  5. // define query val result: Table = ???
  6. // create TableSink val sink: TableSink[Row] = ???
  7. // register TableSink tableEnv.registerTableSink(
  8. "outputTable", // table name
  9. Array[String](docs_1.7_...), // field names
  10. Array[TypeInformation[_]](...), // field types
  11. sink) // table sink
  12. // emit result Table via a TableSink result.insertInto("outputTable", qConfig)
  13. // convert result Table into a DataStream[Row] val stream: DataStream[Row] = result.toAppendStream[Row](docs_1.7_qConfig)

在下文中,我们将描述 QueryConfig 的参数以及它们如何影响查询的准确性和资源消耗。

空闲状态保留时间

许多查询聚合或连接一个或多个键属性上的记录。在流上执行此类查询时,连续查询需要收集记录或维护每个 key 的部分结果。如果输入流的 key 域正在变化,即,active key 随时间变化,则随着越来越多的不同 key,连续查询累积越来越多的状态。但是,经常在一段时间后 key 变为非活动状态,并且它们的相应状态变得陈旧且无用。

例如,以下查询计算每个会话(session)的单击次数。

  1. SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

sessionId 属性用作分组 key,连续查询维护其观察到的每个 sessionId 的计数。sessionId 属性随着时间的推移而变化(进化),并且 sessionId 值仅在会话(session)结束之前有效,即,在有限的时间段内。但是,连续查询无法知道 sessionId 的此属性,并期望每个 sessionId 值都可以在任何时间点发生。它维护每个观察到的 sessionId 值的计数。因此,随着观察到越来越多的sessionId值,查询的总状态大小不断增长。

空闲状态保留时间参数(Idle State Retention Time) 定义了在删除 key 之前保留 key 状态多长时间而不进行更新。对于前面的示例查询,只要在配置的时间段内没有更新 sessionId,就会删除它的计数。

通过删除 key 的状态,连续查询完全忘记它之前已经看过这个key。如果处理了具有其状态已被删除的 key 的记录,则该记录将被视为具有相应 key 的第一个记录。对于上面的示例,这意味着 sessionId 的计数将再次从 0 开始。

配置空闲状态保留时间有两个参数:

  • minimum idle state retention time 定义了非活动key的状态在被删除之前至少保持多长时间。
  • maximum idle state retention time 义了非活动key的状态在被删除之前最多保留多长时间。

参数规定如下:

  1. StreamQueryConfig qConfig = ...
  2. // set idle state retention time: min = 12 hours, max = 24 hours
  3. qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
  1. val qConfig: StreamQueryConfig = ???
  2. // set idle state retention time: min = 12 hours, max = 24 hours qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))

清理状态需要额外的簿记(bookkeeping),这对于 minTimemaxTime 的较大差异而言变得更实用(便宜)。minTimemaxTime 之间的差异必须至少为5分钟。