Queryable State Beta 可查询状态Beta

Note: 可查询状态的客户端API当前处于不断变化的状态,没有提供关于所提供接口的稳定性的保证。在即将到来的FLink版本中,可能会在客户端上中断API更改。

简而言之,这个特性公开了Flink的托管键控(分区)状态(请参见与State)一起工作到外部世界,并允许用户从外部Flink查询作业的状态)。对于某些场景,Queryable状态消除了对具有外部系统(例如键值存储)的分布式操作/事务的需求,而这往往是实际中的瓶颈。此外,此特性对于调试目的可能特别有用。

注意: 当查询状态对象时,可以从并发线程访问该对象,而不需要进行任何同步或复制。这是一个设计选择,因为上面的任何一个都会导致工作延迟的增加,这是我们想要避免的。由于使用Java堆空间的任何状态后端,例如_MemoryStateBackendFsStateBackend,在检索值时都不会与副本一起工作,而是直接引用存储的值,因此读-修改-写入模式是不安全的,并且可能导致可查询的状态服务器由于并发修改而失败。RocksDBStateBackend 不受这些问题的影响。

Architecture 建筑学

在演示如何使用“可查询状态”之前,简要描述组成该状态的实体非常有用。“可查询状态”功能由三个主要实体组成:

  1. QueryableStateClient(可能)在FLink集群外部运行并提交用户查询,
  2. QueryableStateClientProxy,它运行在每个TaskManager(_在Flink集群内)上,负责接收客户端的查询,代表他从Responsible Task Manager获取所请求的状态,并将其返回给客户端,以及
  3. QueryableStateServer运行在每个TaskManager上,负责为本地存储的状态提供服务。

客户端连接到代理之一,并发送与特定密钥“k”关联的状态请求。正如使用State,键控状态组织在_KEY组中]中所述,每个TaskManager都被分配了许多这些关键组。为了发现哪个TaskManager负责持有k的密钥组,代理将询问 JobManager。根据答案,代理将查询在该 TaskManager 上运行的 QueryableStateServer,以获得与 k关联的状态,并将响应转发回客户端。

Activating Queryable State 激活可查询状态

要在Flink集群上启用可查询状态,只需将flink-queryable-state-runtime_2.11-1.7.1.jar 从您Flink distribution,文件夹中复制到lib/文件夹。否则,无法启用可查询状态功能。

要验证您的群集是否已启用Queryable状态,请检查任何任务管理器的日志:"Started the Queryable State Proxy Server @ ..."

Making State Queryable 使状态Queryable

现在您已经激活了集群上的Queryable状态,现在是看看如何使用它的时候了。为了使状态对外部世界可见,需要使用以下方法显式地使其可查询:

  • QueryableStateStream是一个方便的对象,充当接收器,并以可查询状态提供传入值,或者
  • stateDescriptor.setQueryable(String queryableStateName)`方法,它使由状态描述符表示的键状态是可查询的。

以下各节介绍了这两种方法的使用情况。

Queryable State Stream 可查询状态流

KeyedStream 上调用.asQueryableState(stateName, stateDescriptor)返回一个 QueryableStateStream ,该 QueryableStateStream 提供其值为Queryable状态。根据状态的类型,asQueryableState() 方法有以下变体:

  1. // ValueState
  2. QueryableStateStream asQueryableState(
  3. String queryableStateName,
  4. ValueStateDescriptor stateDescriptor)
  5. // Shortcut for explicit ValueStateDescriptor variant
  6. QueryableStateStream asQueryableState(String queryableStateName)
  7. // FoldingState
  8. QueryableStateStream asQueryableState(
  9. String queryableStateName,
  10. FoldingStateDescriptor stateDescriptor)
  11. // ReducingState
  12. QueryableStateStream asQueryableState(
  13. String queryableStateName,
  14. ReducingStateDescriptor stateDescriptor)

Note: 没有可查询的ListState 接收器,因为它会导致一个不断增长的列表,该列表可能不会被清理,因此最终会消耗过多的内存。

返回的QueryableStateStream 可视为sink,**不能进一步转换。在内部, QueryableStateStream 被转换为使用所有传入记录来更新可查询状态实例的运算符。 asQueryableState 调用中提供的StateDescriptor类型暗示了更新逻辑。在如下程序中,键控流的所有记录都将用于通过ValueState.update(value)更新状态实例:

  1. stream.keyBy(0).asQueryableState("query-name")

这类似于ScalaAPI的flatMapWithState

Managed Keyed State 受管理的键控状态

操作符的托管键控状态(请参见使用托管键控State)可以通过StateDescriptor.setQueryable(String queryableStateName)使适当的状态描述符可查询),如下例所示:

  1. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  2. new ValueStateDescriptor<>(
  3. "average", // the state name
  4. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
  5. descriptor.setQueryable("query-name"); // queryable state name

Note: queryableStateName参数可以任意选择,仅用于查询。它不一定要和国家的名字相同。

该变型没有限制可以使哪种类型的状态是可查询的。这意味着,这可用于任何ValueState, ReduceState, ListState, MapState, AggregatingState和当前已过时的 FoldingState

Querying State 查询状态

到目前为止,您已经将集群设置为使用Queryable状态运行,并且声明(有些)您的状态为queryable。现在是了解如何查询此状态的时候了。

为此,您可以使用 QueryableStateClient 辅助类。这可以在flink-queryable-state-clientjar中提供,它必须明确包含在项目 pom.xmlflink-core之间的依赖关系中,如下所示:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-core</artifactId>
  4. <version>1.7.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-queryable-state-client-java_2.11</artifactId>
  9. <version>1.7.1</version>
  10. </dependency>

有关此问题的详细信息,您可以检查如何设置FLink程序

QueryableStateClient将将您的查询提交给内部代理,然后内部代理将处理您的查询并返回最终结果。初始化客户机的唯一要求是提供一个有效的TaskManager主机名(请记住,每个任务管理器上都运行着一个可查询的状态代理)和代理侦听的端口。更多关于如何配置Configuration节中的代理和状态服务器端口的信息。

  1. QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);

在客户端准备好的情况下,要查询与类型K的密钥相关联的类型 V的状态,您可以使用以下方法:

  1. CompletableFuture<S> getKvState(
  2. JobID jobId,
  3. String queryableStateName,
  4. K key,
  5. TypeInformation<K> keyTypeInfo,
  6. StateDescriptor<S, V> stateDescriptor)

以上返回CompletableFuture,最终保存了ID为jobID的作业的 queryableStateName 标识的可查询状态实例的状态值。key 是您感兴趣的状态,而keyTypeInfo将告诉flink如何序列化/反序列化它。最后,stateDescriptor包含有关所请求状态的必要信息,即它的类型(Value, Reduce等)以及有关如何序列化/反序列化它的必要信息。

仔细的读者会注意到,返回的未来包含一个类型为 S、a State 对象的值,其中包含实际值。这可以是Flink支持的任何状态类型:ValueState, ReduceState, ListState, MapState, AggregatingState,以及当前废弃的FoldingState

Note: 这些状态对象不允许对包含的状态进行修改。您可以使用它们获得状态的实际值,例如使用valueState.get(),或者迭代包含的 <K, V> 条目,__使用 mapState.entries(),但是不能修改它们。例如,在返回的列表状态上调用‘add()’方法将抛出一个 UnsupportedOperationException. 注意: 客户端是异步的,可以由多个线程共享。它需要在未使用时通过 QueryableStateClient.shutdown()关闭,以便释放资源。

Example 例子

下面的示例扩展了“Counterdinaverage”示例(参见[使用受管理的键状态](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#using-managed-keyed-state)),通过它可以查询并显示如何查询此值:

  1. public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  2. private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum
  3. @Override
  4. public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
  5. Tuple2<Long, Long> currentSum = sum.value();
  6. currentSum.f0 += 1;
  7. currentSum.f1 += input.f1;
  8. sum.update(currentSum);
  9. if (currentSum.f0 >= 2) {
  10. out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
  11. sum.clear();
  12. }
  13. }
  14. @Override
  15. public void open(Configuration config) {
  16. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  17. new ValueStateDescriptor<>(
  18. "average", // the state name
  19. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
  20. descriptor.setQueryable("query-name");
  21. sum = getRuntimeContext().getState(descriptor);
  22. }
  23. }

一旦在作业中使用,就可以检索作业ID,然后从此运算符查询任何键的当前状态:

  1. QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
  2. // the state descriptor of the state to be fetched.
  3. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  4. new ValueStateDescriptor<>(
  5. "average",
  6. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
  7. CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
  8. client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);
  9. // now handle the returned value
  10. resultFuture.thenAccept(response -> {
  11. try {
  12. Tuple2<Long, Long> res = response.get();
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. });

Configuration 布局,构造

以下配置参数影响可查询状态服务器和客户端的行为。它们在“QueryableStateOptions”中定义。

State Server 状态服务器

  • query.server.ports: 可查询状态服务器的服务器端口范围。如果超过1个任务管理器在同一台计算机上运行,则这可用于避免端口冲突。指定范围可以是:端口:“9123”、一系列端口:“50100-50200”或范围和/或点列表:“50100-50200,50300-50400,51234”。默认端口为9067。
  • query.server.network-threads: 接收状态服务器传入请求的网络(事件循环)线程数(0=&gt;#时隙)
  • query.server.query-threads: 处理/服务状态服务器传入请求的线程数(0=&gt;#槽)。

Proxy 代理服务器

  • query.proxy.ports: 可查询状态代理的服务器端口范围。如果超过1个任务管理器在同一台计算机上运行,则这可用于避免端口冲突。指定范围可以是:端口:“9123”、一系列端口:“50100-50200”或范围和/或点列表:“50100-50200,50300-50400,51234”。默认端口为9069。
  • query.proxy.network-threads: 接收客户端代理传入请求的网络(事件循环)线程数(0=&gt;#时隙)
  • query.proxy.query-threads: 处理/服务客户端代理传入请求的线程数(0=&gt;#槽)。

Limitations 限制,边界

  • 可查询状态生命周期被绑定到作业的生命周期,_例如_TATES在启动时注册Queryable状态,并在处理时注销它。在未来的版本中,需要将其解耦,以便在任务完成后允许查询,并通过状态复制加快恢复速度。
  • 关于可用的KvState的通知是通过一个简单的Tell进行的。在未来,这一点应该得到改进,以便在请求和确认方面更加健壮。
  • 服务器和客户端跟踪查询的统计信息。它们当前在默认情况下被禁用,因为它们不会在任何地方被曝光。只要有更好的支持通过度量系统发布这些数字,我们就应该启用统计信息。