注:本文档基于flink最新稳定版1.9.0

简介

Queryable State API主要作用是将flink管理的键控状态暴露给外部,并允许用户从flink外部查询作业的状态。某些情况下,可查询状态消除了对外部系统(例如键值存储)的分布式操作/事务的需要,这通常是实践中的瓶颈。最后,可查询状态可以为实时作业的调试提供便利。
注意:可查询状态的客户端API当前处于不断发展的状态,并且不保证所提供接口的稳定性。在即将推出的Flink版本中,客户端可能会发生重大的API更改。查询状态对象时,无需任何同步或复制即可从并发线程访问该对象。这是一种设计选择,因为上述任何一种都会导致增加的作业延迟,我们希望避免这种情况。状态后端使用Java堆空间的状态,例如MemoryStateBackend或FsStateBackend在检索值时不能与副本一起使用,而是直接引用存储的值,读取 - 修改 - 写入模式是不安全的,并且可能导致可查询状态服务器由于并发修改而失败。 RocksDBStateBackend可以避免这些问题。

理论背景

有状态的flink job由operator组成,通常是一个或多个source operator,一些实际处理的operator以及一个或多个sink operator。每个operator在任务中并行运行,并且可以使用不同类型的状态。operator可以具有零个,一个或多个“operator状态”,这些状态被组织为以operator任务为范围的列表。如果将operator应用于keyed stream,它还可以具有零个,一个或多个“键控状态”,它们的作用域范围是从每个已处理记录中提取的键。你可以将键控状态视为分布式键-值映射。
如下图所示,Flink job MyApp,该job由Src、Proc、Snk三个operator组成,Src有一个operator state,Proc有一个operator state和两个keyed state,Snk是无状态的。
application-my-app-state-processor-api.png

MyApp的保存点或检查点由所有状态的数据组成,这些数据的组织方式可以恢复每个任务的状态。我们可以用如下思维模型,将每个任务状态的数据映射到数据集或表中。 实际上,我们可以将保存点视为数据库。 每个运算符(由其UID标识)代表一个名称Namespace。 operator的每个operator state都通过一个列映射到名称空间中的专用表,该列包含所有任务的状态数据。 operator的所有键状态都映射到一个表,该表由用于键的列和用于每个键状态的一列组成。 下图显示了MyApp的保存点如何映射到数据库。

database-my-app-state-processor-api.png

服务架构

queryable State.png

可查询状态由以下三个组件组成:

  • QueryableStateClient: 客户端。运行在外部系统。提交查询请求并接收最终返回的结果。
  • QueryableStateClientProxy: 客户端代理。运行在每个TaskManager上。接收客户端的请求,找到Key对应的TaskManager,然后将请求转发给具体的查询服务,并负责最终向客户端返回结果。
  • QueryableStateServer: 查询服务。运行在每个TaskManager上。处理来自客户端代理的请求并返回结果。

激活Queryable State服务

可查询状态在flink发行版中并非是默认开启的,所有需要相应的配置才能启用。

Flink on Yarn/Flink Standalone

  1. 添加依赖

    1. cp ${FLINK_HOME}/opt/flink-queryable-state-runtime_2.11-1.9.0.jar ${FLINK_HOME}/lib/
  2. 启用Queryable State服务

${FLINK_HOME}/conf/flink-conf.yaml中设置queryable-state.enable: true

  1. 验证服务启动状态

查看TaskManager日志,在日志中见到如下内容,则表示可查询状态启用成功

  1. Started Queryable State Server @ /x.x.x.x:9067.
  2. Started Queryable State Proxy Server @ /x.x.x.x:9069

启动flink后,可以访问flink web UI查看TaskManager日志,如
image.png

Flink on Idea

  1. 添加依赖

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-queryable-state-runtime_2.11</artifactId>
    4. <version>1.9.0</version>
    5. </dependency>
  2. 获取启用Queryable State服务的StreamEnvironment

  1. Configuration config = new Configuration();
  2. config.setInteger(ConfigOptions.key("rest.port").defaultValue(8081),8081);
  3. config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
  4. //启用Queryable State服务
  5. config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
  6. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
  7. ...
  8. env.execute("JobName");
  1. 验证服务启动状态

flink在Idea环境运行时会启动一个mini cluster。打开INFO日志,查找如下日志:

  1. Started Queryable State Server @ /127.0.0.1:9067.
  2. Started Queryable State Proxy Server @ /127.0.0.1:9069

如果有相关日志,则表示该flink job可查询状态启用成功。

使状态可查询

通过以上设置,已在Flink集群上激活了可查询状态服务,除此之外,还需要在代码中暴露具体的可查询状态。有两种方式:

  • 将DataStream转换为QueryableStateStream。如将KeyedStream转换QueryableStateStream,即可设定KeyedStream中所有Key的State可查。
  • 通过状态描述StateDescriptor的setQueryable(String queryableStateName)方法,可设定某个Key的State可查。

相关依赖

  • server端依赖

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-core</artifactId>
    4. <version>${flink.version}</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.apache.flink</groupId>
    8. <artifactId>flink-streaming-java_2.11</artifactId>
    9. <version>${flink.version}</version>
    10. </dependency>
    11. <dependency>
    12. <groupId>org.apache.flink</groupId>
    13. <artifactId>flink-runtime-web_2.11</artifactId>
    14. <version>${flink.version}</version>
    15. </dependency>
    16. <dependency>
    17. <groupId>org.apache.flink</groupId>
    18. <artifactId>flink-queryable-state-runtime_2.11</artifactId>
    19. <version>${flink.version}</version>
    20. </dependency>
  • client端依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-queryable-state-client-java</artifactId>
  4. <version>1.9.0</version>
  5. </dependency>

QueryableStateStream

  • Server端(Flink job)
  1. package com.flink.state.queryable;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.configuration.ConfigConstants;
  5. import org.apache.flink.configuration.ConfigOptions;
  6. import org.apache.flink.configuration.Configuration;
  7. import org.apache.flink.configuration.QueryableStateOptions;
  8. import org.apache.flink.streaming.api.CheckpointingMode;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.util.Collector;
  12. /**
  13. * @author : 恋晨
  14. * Date : 2019/10/9 2:40 PM
  15. * 功能 : Flink 可查询状态示例job
  16. * 使用将DataStream转换为QueryableStateStream使得状态可查询
  17. */
  18. public class WordCountOnSetStream {
  19. public static void main(String[] args) throws Exception{
  20. final String hostname = "localhost";
  21. final int port = 9000;
  22. Configuration config = new Configuration();
  23. config.setInteger(ConfigOptions.key("rest.port").defaultValue(8081),8081);
  24. config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
  25. /**启用Queryable State服务*/
  26. config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
  27. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
  28. env.setParallelism(1);
  29. env.enableCheckpointing(1000 , CheckpointingMode.AT_LEAST_ONCE);
  30. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
  31. DataStream<String> text = env.socketTextStream(hostname, port, "\n");
  32. DataStream<Tuple2<String , Long>> windowCounts = text
  33. .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
  34. @Override
  35. public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
  36. out.collect(new Tuple2<String,Long>(value ,1L));
  37. }
  38. })
  39. .keyBy(0)
  40. .sum(1);
  41. windowCounts.print();
  42. windowCounts.keyBy(0).asQueryableState("QueryableState_WordCount");
  43. env.execute("Socket Window WordCount");
  44. }
  45. }

运行该job,访问flink web UI链接
可以查看该任务的jobID、DAG等等信息。
image.png

  • Client端 ```java package com.flink.state.queryable;

import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.queryablestate.client.QueryableStateClient;

import java.util.concurrent.CompletableFuture;

/**

  • @author : 恋晨
  • Date : 2019/10/9 4:05 PM
  • 功能 : 状态查询客户端示例 */ public class QueryClient {

    public static void main(String[] args) throws Exception{

    1. final JobID jobID = JobID.fromHexString("5dc4c2765c46664e0c121855baf2dda8");
    2. final String hostname = "localhost";
    3. final int port = 9069;
    4. QueryableStateClient client = new QueryableStateClient(hostname , port);
    5. ValueStateDescriptor<Tuple2<String, Long>> descriptor =
    6. new ValueStateDescriptor<>(
    7. "QueryableState_WordCount",
    8. TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}));
    9. final String key = "hello";
    10. CompletableFuture<ValueState<Tuple2<String , Long>>> completableFuture =
    11. client.getKvState(
    12. jobID,
    13. "QueryableState_WordCount",
    14. key,
    15. BasicTypeInfo.STRING_TYPE_INFO,
    16. descriptor
    17. );
    18. System.out.println(completableFuture.get().value());

    } } ```

    setQueryable

  • Server端(flink job)
  1. package com.flink.state.queryable;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.common.functions.RichFlatMapFunction;
  4. import org.apache.flink.api.common.state.ValueState;
  5. import org.apache.flink.api.common.state.ValueStateDescriptor;
  6. import org.apache.flink.api.common.typeinfo.Types;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.configuration.ConfigConstants;
  9. import org.apache.flink.configuration.ConfigOptions;
  10. import org.apache.flink.configuration.Configuration;
  11. import org.apache.flink.configuration.QueryableStateOptions;
  12. import org.apache.flink.streaming.api.CheckpointingMode;
  13. import org.apache.flink.streaming.api.datastream.DataStream;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.util.Collector;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. /**
  19. * @author : 恋晨
  20. * Date : 2019/10/10 10:49 AM
  21. * 功能 : flink 可查询状态示例程序
  22. * 通过对state setQueryable实现状态可查询
  23. */
  24. public class WordCountSetQueryable {
  25. final static Logger log = LoggerFactory.getLogger("WordCountSetQueryable");
  26. public static void main(String[] args) throws Exception{
  27. final String hostname = "localhost";
  28. final int port = 9009;
  29. Configuration config = new Configuration();
  30. config.setInteger(ConfigOptions.key("rest.port").defaultValue(8081),8081);
  31. config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
  32. /**启用Queryable State服务*/
  33. config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
  34. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
  35. env.setParallelism(1);
  36. env.enableCheckpointing(1000 , CheckpointingMode.AT_LEAST_ONCE);
  37. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
  38. DataStream<String> text = env.socketTextStream(hostname, port, "\n");
  39. DataStream<Tuple2<String , Long>> windowCounts = text
  40. .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
  41. @Override
  42. public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
  43. out.collect(new Tuple2<String,Long>(value ,1L));
  44. }
  45. })
  46. .keyBy(0)
  47. .flatMap(new RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
  48. public transient ValueState<Tuple2<String,Long>> countState;
  49. @Override
  50. public void open(Configuration parameters) throws Exception {
  51. ValueStateDescriptor<Tuple2<String,Long>> valueStateDescriptor =
  52. new ValueStateDescriptor<Tuple2<String, Long>>(
  53. "QueryableState_WordCount_state",
  54. Types.TUPLE(Types.STRING , Types.LONG),
  55. new Tuple2<>(null , 0L)
  56. );
  57. /**通过ValueStateDescriptor.setQueryable 开放此状态*/
  58. valueStateDescriptor.setQueryable("QueryableState_WordCount");
  59. countState = getRuntimeContext().getState(valueStateDescriptor);
  60. }
  61. @Override
  62. public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, Long>> out) throws Exception {
  63. Tuple2<String,Long> currentState = countState.value();
  64. if(currentState.f0 == null){
  65. currentState.f0 = value.f0;
  66. }
  67. currentState.f1 += value.f1;
  68. countState.update(currentState);
  69. log.info(currentState.toString());
  70. out.collect(currentState);
  71. }
  72. });
  73. windowCounts.print();
  74. env.execute("Socket Window WordCount");
  75. }
  76. }
  • Client端
  1. package com.flink.state.queryable;
  2. import org.apache.flink.api.common.JobID;
  3. import org.apache.flink.api.common.state.ValueState;
  4. import org.apache.flink.api.common.state.ValueStateDescriptor;
  5. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  6. import org.apache.flink.api.common.typeinfo.TypeHint;
  7. import org.apache.flink.api.common.typeinfo.TypeInformation;
  8. import org.apache.flink.api.common.typeinfo.Types;
  9. import org.apache.flink.api.java.tuple.Tuple2;
  10. import org.apache.flink.queryablestate.client.QueryableStateClient;
  11. import java.util.concurrent.CompletableFuture;
  12. /**
  13. * @author : 恋晨
  14. * Date : 2019/10/9 4:05 PM
  15. * 功能 : 状态查询客户端示例
  16. */
  17. public class QueryClient {
  18. public static void main(String[] args) throws Exception{
  19. final JobID jobID = JobID.fromHexString("722bf03f719a2e81587cc1cc3c684499");
  20. final String hostname = "localhost";
  21. final int port = 9069;
  22. QueryableStateClient client = new QueryableStateClient(hostname , port);
  23. ValueStateDescriptor<Tuple2<String,Long>> valueStateDescriptor =
  24. new ValueStateDescriptor<Tuple2<String, Long>>(
  25. "QueryableState_WordCount_state",
  26. Types.TUPLE(Types.STRING , Types.LONG)
  27. );
  28. final String key = "hello";
  29. CompletableFuture<ValueState<Tuple2<String , Long>>> completableFuture =
  30. client.getKvState(
  31. jobID,
  32. "QueryableState_WordCount",
  33. key,
  34. BasicTypeInfo.STRING_TYPE_INFO,
  35. valueStateDescriptor
  36. );
  37. System.out.println(completableFuture.get().value());
  38. }
  39. }

测试环境test run

  1. 将上述程序打包,并上传谷歌云;
  2. 运行nc -l 9000,发送Words;
  3. 提交job

    1. bin/flink run QueryableState.jar -C com.flink.state.queryable.WordCountOnSetStream

    web UI如下:
    image.png

  4. 使用telnet验证9069端口是否是通的;

  5. 本地IDE启动queryable Client查询状态数据;

image.png