Queryable State

注: 可查询状态的API当前处于发展状态,所以不保证 接口的 稳定性。即将到来的版本中API有可能会发生重大变化。

简而言之,此功能允许用户从外部查询flink托管的分区的状态(see 使用状态)。在某些场景,可查询状态取消了需要和外部系统进行分布式交互的依赖,例如在实践中经常是瓶颈的键值存储。

注意: 可查询状态并发访问键值状态(keyed state)比同步访问更有可能阻碍其操作。因为某些状态存储使用的是java的堆空间,例如:内存状态存储, Fs状态存储,所以内存不直接使用复制当检索值,而是引用存储值,可能会导致读-修改-写模式是不安全的, 且并发修改将会导致可查询状态服务失败。对于这些问题RocksDB状态存储是安全的。

使用可查询状态

为了使用可查询状态,通过设置配置参数 query.server.enabletrue(当前的默认值),是全局 激活可查询状态服务的第一步。然后某些的状态需要可查询可以通过

  • QueryableStateStream, 是一个方便的对象,其行为像一个接收器,可对流入的值进行状态查询,或
  • StateDescriptor#setQueryable(String queryableStateName),使某个算子的键值状态可查询。

下面的段落解释这两种方法的使用。

可查询状态流

键值流 可以通过下面的方法可以将他的值作为可查询状态:

  1. // ValueState
  2. QueryableStateStream asQueryableState(
  3. String queryableStateName,
  4. ValueStateDescriptor stateDescriptor)
  5. // Shortcut for explicit ValueStateDescriptor variant
  6. QueryableStateStream asQueryableState(String queryableStateName)
  7. // FoldingState
  8. QueryableStateStream asQueryableState(
  9. String queryableStateName,
  10. FoldingStateDescriptor stateDescriptor)
  11. // ReducingState
  12. QueryableStateStream asQueryableState(
  13. String queryableStateName,
  14. ReducingStateDescriptor stateDescriptor)
注: 不存在 ListState sink 由于不断增长的列表可能不可以清理, 将会占用大量内存。

调用这些方法将会返回QueryableStateStream,无法再做其他操作, 且当前只保存可查询状态的名称以及 值和键序列化的可查询状态流。它与水槽(sinlk)相当,不能再进一步转变。

在内部一个 QueryableStateStream 被翻译成一个使用所有流入记录来更新可查询状态的算子的实例。

下面的项目,所有键值流的记录将会被用来更新状态实例,通过ValueState#update(value)或者 AppendingState#add(value), 依赖于所选状态的转化: {% highlight java %} stream.keyBy(0).asQueryableState(“query-name”) {% endhighlight %} 这样做就像Scala API的“flatMapWithState”。

托管的键值状态

一个算子的被管理键值状态 (see 使用托管的键值状态) 可以通过使适当的状态描述符可查询来进行查询,例如下面的例子通过StateDescriptor#setQueryable(String queryableStateName): {% highlight java %} ValueStateDescriptor> descriptor = new ValueStateDescriptor<>( “average”, // the state name TypeInformation.of(new TypeHint>() {}), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set descriptor.setQueryable(“query-name”); // queryable state name {% endhighlight %}

注: queryableStateName参数可以任意选择,且只能 用于查询。它不一定与状态自己的名字相同。

查询状态

QueryableStateClient帮助器类可用于针对内部服务状态的“KvState”实例的查询。它需要设置一个有效的 JobManager 地址和端口, 创建方式如下:

  1. final Configuration config = new Configuration();
  2. config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
  3. config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
  4. QueryableStateClient client = new QueryableStateClient(config);

查询的方法是:

  1. Future<byte[]> getKvState(
  2. JobID jobID,
  3. String queryableStateName,
  4. int keyHashCode,
  5. byte[] serializedKeyAndNamespace)

对这个方法的调用返回一个Future,最终持有ID为“jobID”的job,它的“queryableStateName”标识的可查询状态实例拥 有的序列化状态值。keyHashCode是由Object.hashCode()'返回的键的哈希码,而serializedKeyAndNamespace` 是序列化的键和命名空间。

注: 客户端是异步的,可以由多个线程共享。可通过QueryableStateClient#shutdown() 关闭来释放资源,当不需要的时候。

目前的实现仍然是比较低级别的,因为它只适用于此用于提供键/命名空间和返回结果的序列化数据。用户(或一些后续实用程序) 需要为此设置序列化程序。这样做的好处是查询服务不需要担心任何类加载问题等。

有一些序列化帮助方法用于键/命名空间和值序列化在KvStateRequestSerializer里。

案例

下面的案例继承了 CountWindowAverage, 案例 (see 使用托管的键值状态) 如何使它可查询,且如何查询这个值:

  1. public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  2. private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;
  3. @Override
  4. public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
  5. Tuple2<Long, Long> currentSum = sum.value();
  6. currentSum.f0 += 1;
  7. currentSum.f1 += input.f1;
  8. sum.update(currentSum);
  9. if (currentSum.f0 >= 2) {
  10. out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
  11. sum.clear();
  12. }
  13. }
  14. @Override
  15. public void open(Configuration config) {
  16. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  17. new ValueStateDescriptor<>(
  18. "average", // 状态名字
  19. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 类型信息
  20. Tuple2.of(0L, 0L)); // 如果没有,将此值设为默认值。
  21. descriptor.setQueryable("query-name");
  22. sum = getRuntimeContext().getState(descriptor);
  23. }
  24. }

一旦在job中使用,您可以检索作业ID,然后从该算子中查询任何键的当前状态:

  1. final Configuration config = new Configuration();
  2. config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
  3. config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
  4. QueryableStateClient client = new QueryableStateClient(config);
  5. final TypeSerializer<Long> keySerializer =
  6. TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new ExecutionConfig());
  7. final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
  8. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(new ExecutionConfig());
  9. final byte[] serializedKey =
  10. KvStateRequestSerializer.serializeKeyAndNamespace(
  11. key, keySerializer,
  12. VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
  13. Future<byte[]> serializedResult =
  14. client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);
  15. //现在等待结果并返回它
  16. final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
  17. byte[] serializedValue = Await.result(serializedResult, duration);
  18. Tuple2<Long, Long> value =
  19. KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);

Scala用户注意事项

创建“TypeSerializer”实例时,请使用可用的Scala扩展。 添加以下导入:

  1. import org.apache.flink.streaming.api.scala._

现在你可以通过下面的方式创建 type serializers:

  1. val keySerializer = createTypeInformation[Long]
  2. .createSerializer(new ExecutionConfig)

如果你不这样做,则可能会遇到Flink作业和客户端代码中使用的序列化器之间的不匹配,因为类似于scala.Long 的类型在运行时无法获取。

配置

以下配置参数会影响可查询状态服务器和客户端的行为。它们定义在“QueryableStateOptions”中。

服务端

  • query.server.enable: 标记是否开启可查询状态服务端
  • query.server.port: 端口绑定到内部的KvStateServer(0 => 选择随机可用端口)
  • query.server.network-threads: KvStateServer(0 => #slots)的网络(事件循环)线程数
  • query.server.query-threads: KvStateServerHandler(0 => #slots)的异步查询线程数。

客户端 (QueryableStateClient)

  • query.client.network-threads: KvStateClient(0 =>可用内核数)的网络(事件循环)线程数
  • query.client.lookup.num-retries: 位置查找失败次数。
  • query.client.lookup.retry-delay: 位置查找失败重试延时间隔(毫秒)

限制

  • 可查询状态的生命周期受限于job的生命周期,例如, 任务启动时注册可查询状态,并在清理时注销它。在将来的 版本中,最好是将其解耦,以便在任务完成后允许查询,并通过状态加速恢复通过状态复制。
  • 有关KvState的通知可以通过一个简单的说明来实现。 未来这个应该需要改善,以便实现更强大的询问和确认。
  • 服务器和客户端跟踪查询的统计信息。 因为它们不会在任何地方暴露出来,所以默认是禁用的。 一旦可以通过度 量系统更好的发布这些数字,我们应该启用统计。