Flink从1.2.0开始支持可查询状态(Queryable State)服务。可查询状态,允许用户从外部系统(如业务系统)查询Flink作业内部的状态。

Queryable State服务架构

image.png

  1. QueryableStateClient: 客户端。运行在外部系统。提交查询请求并接收最终返回的结果。
  2. QueryableStateClientProxy: 客户端代理。运行在每个TaskManager上。接收客户端的请求,找到Key对应的TaskManager,然后将请求转发给具体的查询服务,并负责最终向客户端返回结果。
  3. QueryableStateServer: 查询服务。运行在每个TaskManager上。处理来自客户端代理的请求并返回结果。


激活Queryable State服务

Flink On Yarn模式

Flink On Yarn 集群模式可通过如下设置启动。

  1. 添加依赖

    1. cp ${FLINK_HOME}/opt/flink-queryable-state-runtime_2.11-1.13.0.jar ${FLINK_HOME}/lib/
  2. 启用Queryable State服务
    在${FLINK_HOME}/conf/flink-conf.yaml中设置queryable-state.enable: true

  3. 查看TaskManager日志: 在TaskManager中见如下日志,即启动。

    1. Started Queryable State Server @ /x.x.x.x:9067.
    2. Started Queryable State Proxy Server @ /x.x.x.x:9069

    Flink Local模式

    Flink Local模式可通过如下设置启动。

  4. 添加依赖

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-queryable-state-runtime_2.11</artifactId>
    4. <version>1.13.0</version>
    5. </dependency>
  5. 启用Queryable State服务 ```java Configuration config = new Configuration(); config.setInteger(ConfigOptions.key(“rest.port”).defaultValue(8081),8081); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); //启用Queryable State服务 config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);

  1. 3. 查看TaskManager日志: TaskManager中见如下日志,即启动。
  2. ```java
  3. Started Queryable State Server @ /127.0.0.1:9067
  4. Started Queryable State Proxy Server @ /127.0.0.1:9069

使状态可查询

通过以上设置,已在Flink集群上激活了可查询状态服务,除此之外,还需要在代码中暴露具体的可查询状态。有两种方式:

  1. 将DataStream转换为QueryableStateStream。如将KeyedStream转换为QueryableStateStream,即可设定KeyedStream中所有Key的State可查。
  2. 通过状态描述StateDescriptor的setQueryable(String queryableStateName)方法,可设定某个Key的State可查。

以下通过两个例子总结这两种方式的使用。

主要依赖

Server端

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-streaming-java_2.11</artifactId>
  4. <version>1.13.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-runtime-web_2.11</artifactId>
  9. <version>1.13.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-queryable-state-runtime_2.11</artifactId>
  14. <version>1.13.0</version>
  15. </dependency>

Client端

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-core</artifactId>
  4. <version>1.9.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-queryable-state-client-java</artifactId>
  9. <version>1.9.0</version>
  10. </dependency>

通过Queryable State Stream使状态可查

Server端(Flink Job)

  1. package com.lingyao.state;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.functions.KeySelector;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.api.java.utils.ParameterTool;
  6. import org.apache.flink.configuration.ConfigConstants;
  7. import org.apache.flink.configuration.Configuration;
  8. import org.apache.flink.configuration.QueryableStateOptions;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  12. import org.apache.flink.streaming.api.windowing.time.Time;
  13. import org.apache.flink.util.Collector;
  14. /**
  15. * @Author : wangxiaohui
  16. * @Date: 2021-09-29 11:26
  17. * Queryable State Stream
  18. */
  19. public class QueryableStateByQueryableStateStream {
  20. public static void main(String[] args) throws Exception {
  21. // 设置参数
  22. ParameterTool parameterTool = ParameterTool.fromArgs(args);
  23. final String host = parameterTool.get("host", "localhost");
  24. final int port = parameterTool.getInt("port", 1234);
  25. final int parallelism = parameterTool.getInt("parallelism", 4);
  26. // 配置环境
  27. Configuration config = new Configuration();
  28. config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
  29. config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
  30. StreamExecutionEnvironment env =
  31. StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
  32. env.getConfig().setAutoWatermarkInterval(0);
  33. env.setParallelism(parallelism);
  34. // 输入数据源:每行数据格式 event,pv
  35. SingleOutputStreamOperator<Tuple2<String, Long>> source =
  36. env.socketTextStream(host, port)
  37. .flatMap(
  38. new FlatMapFunction<String, Tuple2<String, Long>>() {
  39. @Override
  40. public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
  41. String[] splits = value.trim().split(",");
  42. out.collect(new Tuple2<>(splits[0], Long.valueOf(splits[1])));
  43. }
  44. });
  45. // 窗口统计: 最近5秒钟内,每个事件的最大pv
  46. SingleOutputStreamOperator<Tuple2<String, Long>> result =
  47. source
  48. .keyBy(
  49. new KeySelector<Tuple2<String, Long>, String>() {
  50. @Override
  51. public String getKey(Tuple2<String, Long> value) throws Exception {
  52. return value.f0;
  53. }
  54. })
  55. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  56. .max(1);
  57. // 输出结果
  58. result.print();
  59. // 使得结果的状态可查
  60. // asQueryableState 返回QueryableStateStream
  61. // QueryableStateStream类似于一个接收器,无法进行进一步转换
  62. // QueryableStateStream接收传入的数据并更新状态
  63. result
  64. .keyBy(
  65. new KeySelector<Tuple2<String, Long>, String>() {
  66. @Override
  67. public String getKey(Tuple2<String, Long> value) throws Exception {
  68. return value.f0;
  69. }
  70. })
  71. .asQueryableState("lastFiveSecondsMaxPV");
  72. env.execute();
  73. }
  74. }

Client端

  1. package com.lingyao.state;
  2. import org.apache.flink.api.common.JobID;
  3. import org.apache.flink.api.common.state.ValueState;
  4. import org.apache.flink.api.common.state.ValueStateDescriptor;
  5. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  6. import org.apache.flink.api.common.typeinfo.TypeHint;
  7. import org.apache.flink.api.common.typeinfo.TypeInformation;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.queryablestate.client.QueryableStateClient;
  10. import java.util.concurrent.CompletableFuture;
  11. /**
  12. * @Author : wangxiaohui
  13. * @Date: 2021-09-29 11:29
  14. */
  15. public class QueryClient {
  16. public static void main(String[] args) throws Exception {
  17. if (args.length == 0) {
  18. throw new IllegalArgumentException(
  19. "Missing Flink Job ID ...\n"
  20. + "Usage: ./QueryClient <jobID> [jobManagerHost] [jobManagerPort]");
  21. }
  22. final JobID jobId = JobID.fromHexString(args[0]);
  23. final String jobManagerHost = args.length > 1 ? args[1] : "localhost";
  24. final int jobManagerPort = args.length > 1 ? Integer.parseInt(args[1]) : 9069;
  25. QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);
  26. /*
  27. 状态描述符
  28. event_five_seconds_max_pv 状态名称,Server端此名称是通过UUID生成的,这里随意即可
  29. TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})) 状态中值的类型,应和Server端相同
  30. */
  31. ValueStateDescriptor<Tuple2<String, Long>> stateDescriptor =
  32. new ValueStateDescriptor<>(
  33. "lastFiveSecondsMaxPV",
  34. TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}));
  35. final String key = "click";
  36. /*
  37. 轮询执行查询
  38. event_five_seconds_max_pv queryableStateName,应和Server端的queryableStateName相同
  39. */
  40. while (true) {
  41. CompletableFuture<ValueState<Tuple2<String, Long>>> completableFuture =
  42. client.getKvState(
  43. jobId,
  44. "lastFiveSecondsMaxPV",
  45. key,
  46. BasicTypeInfo.STRING_TYPE_INFO,
  47. stateDescriptor);
  48. System.out.println(completableFuture.get().value());
  49. Thread.sleep(1000);
  50. }
  51. }
  52. }

通过StateDescriptor的setQueryable方法使状态可查

Server端(Flink Job)

  1. package com.lingyao.state;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.common.state.ValueState;
  4. import org.apache.flink.api.common.state.ValueStateDescriptor;
  5. import org.apache.flink.api.common.typeinfo.Types;
  6. import org.apache.flink.api.java.functions.KeySelector;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.api.java.tuple.Tuple5;
  9. import org.apache.flink.api.java.utils.ParameterTool;
  10. import org.apache.flink.configuration.ConfigConstants;
  11. import org.apache.flink.configuration.Configuration;
  12. import org.apache.flink.configuration.QueryableStateOptions;
  13. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  16. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  17. import org.apache.flink.streaming.api.windowing.time.Time;
  18. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  19. import org.apache.flink.util.Collector;
  20. import org.joda.time.DateTime;
  21. import org.joda.time.DateTimeZone;
  22. /**
  23. * @Author : wangxiaohui
  24. * @Date: 2021-09-29 14:36
  25. */
  26. public class QueryableStateByStateDescriptor {
  27. public static void main(String[] args) throws Exception {
  28. // 设置参数
  29. ParameterTool parameterTool = ParameterTool.fromArgs(args);
  30. final String host = parameterTool.get("host", "localhost");
  31. final int port = parameterTool.getInt("port", 1234);
  32. final int parallelism = parameterTool.getInt("parallelism", 4);
  33. // 配置环境
  34. Configuration config = new Configuration();
  35. config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
  36. config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
  37. StreamExecutionEnvironment env =
  38. StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
  39. env.setParallelism(parallelism);
  40. env.getConfig().setAutoWatermarkInterval(0);
  41. // 输入数据源:当前action产生时就上报一条数据,数据格式为 action,money
  42. SingleOutputStreamOperator<Tuple2<String, Long>> source =
  43. env.socketTextStream(host, port)
  44. .flatMap(
  45. new FlatMapFunction<String, Tuple2<String, Long>>() {
  46. @Override
  47. public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
  48. String[] splits = value.trim().split(",");
  49. out.collect(new Tuple2<>(splits[0], Long.valueOf(splits[1])));
  50. }
  51. });
  52. // 窗口统计: 计算最近5秒钟内,每个action的count(money)数、sum(money)数
  53. SingleOutputStreamOperator<Tuple5<String, String, String, Long, Long>> result =
  54. source
  55. .keyBy(
  56. new KeySelector<Tuple2<String, Long>, String>() {
  57. @Override
  58. public String getKey(Tuple2<String, Long> value) throws Exception {
  59. return value.f0;
  60. }
  61. })
  62. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  63. .process(new CustomCountSumWindowFunction());
  64. result.print();
  65. env.execute();
  66. }
  67. /** 自定义WindowFunction,统一每个Key对应的窗口的count数、sum数 */
  68. static class CustomCountSumWindowFunction
  69. extends ProcessWindowFunction<
  70. Tuple2<String, Long>, Tuple5<String, String, String, Long, Long>, String, TimeWindow> {
  71. // 定义一个ValueState,来存放状态
  72. private transient ValueState<Tuple5<String, String, String, Long, Long>> sumCountValueState;
  73. /**
  74. * 算子CustomCountSumWindowFunction实例化时,只执行一次
  75. *
  76. * @param parameters
  77. * @throws Exception
  78. */
  79. @Override
  80. public void open(Configuration parameters) throws Exception {
  81. ValueStateDescriptor<Tuple5<String, String, String, Long, Long>> valueStateDescriptor =
  82. new ValueStateDescriptor<>(
  83. "lastFiveSecondsCountSumValueState",
  84. Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG));
  85. // 通过ValueStateDescriptor.setQueryable 开放此状态,使此状态可查
  86. valueStateDescriptor.setQueryable("lastFiveSecondsCountSum");
  87. sumCountValueState = getRuntimeContext().getState(valueStateDescriptor);
  88. }
  89. /**
  90. * 每个窗口都会调用一次process方法
  91. *
  92. * @param key 当前窗口对应的Key
  93. * @param context 窗口上下文
  94. * @param elements 当前窗口的所有元素
  95. * @param out 收集输出记录
  96. * @throws Exception
  97. */
  98. @Override
  99. public void process(
  100. String key,
  101. Context context,
  102. Iterable<Tuple2<String, Long>> elements,
  103. Collector<Tuple5<String, String, String, Long, Long>> out)
  104. throws Exception {
  105. // 计算count数、sum数
  106. long currentCount = 0L;
  107. long currentSum = 0L;
  108. for (Tuple2<String, Long> value : elements) {
  109. currentCount += 1;
  110. currentSum += value.f1;
  111. }
  112. // 获取Window开始时间、结束时间
  113. TimeWindow window = context.window();
  114. String windowStartTime =
  115. new DateTime(window.getStart(), DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss");
  116. String windowEndTime =
  117. new DateTime(window.getEnd(), DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss");
  118. // 得到当前值
  119. Tuple5<String, String, String, Long, Long> currentValue =
  120. new Tuple5<>(key, windowStartTime, windowEndTime, currentCount, currentSum);
  121. // 更新状态
  122. sumCountValueState.update(currentValue);
  123. // 输出结果
  124. out.collect(currentValue);
  125. }
  126. }
  127. }

Client端

  1. package com.lingyao.state;
  2. import org.apache.flink.api.common.JobID;
  3. import org.apache.flink.api.common.state.ValueState;
  4. import org.apache.flink.api.common.state.ValueStateDescriptor;
  5. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  6. import org.apache.flink.api.common.typeinfo.Types;
  7. import org.apache.flink.api.java.tuple.Tuple5;
  8. import org.apache.flink.queryablestate.client.QueryableStateClient;
  9. import java.util.concurrent.CompletableFuture;
  10. /**
  11. * @Author : wangxiaohui
  12. * @Date: 2021-09-29 14:42
  13. */
  14. public class QueryClient2 {
  15. public static void main(String[] args) throws Exception {
  16. if (args.length == 0) {
  17. throw new IllegalArgumentException(
  18. "Missing Flink Job ID ...\n"
  19. + "Usage: ./QueryClient <jobID> [jobManagerHost] [jobManagerPort]");
  20. }
  21. final JobID jobId = JobID.fromHexString(args[0]);
  22. final String jobManagerHost = args.length > 1 ? args[1] : "localhost";
  23. final int jobManagerPort = args.length > 1 ? Integer.parseInt(args[1]) : 9069;
  24. QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);
  25. /*
  26. 状态描述符
  27. */
  28. ValueStateDescriptor<Tuple5<String, String, String, Long, Long>> valueStateDescriptor =
  29. new ValueStateDescriptor<>(
  30. "lastFiveSecondsCountSumValueState",
  31. Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG));
  32. final String key = "click";
  33. /*
  34. 轮询执行查询
  35. lastFiveSecondsCountSum queryableStateName,应和Server端的queryableStateName相同
  36. */
  37. while (true) {
  38. CompletableFuture<ValueState<Tuple5<String, String, String, Long, Long>>> completableFuture =
  39. client.getKvState(
  40. jobId,
  41. "lastFiveSecondsCountSum",
  42. key,
  43. BasicTypeInfo.STRING_TYPE_INFO,
  44. valueStateDescriptor);
  45. Tuple5<String, String, String, Long, Long> value = completableFuture.get().value();
  46. System.out.println(
  47. "Key: "
  48. + value.f0
  49. + " ,WindowStart: "
  50. + value.f1
  51. + " ,WindowEnd: "
  52. + value.f2
  53. + " ,Count: "
  54. + value.f3
  55. + " ,Sum: "
  56. + value.f4);
  57. Thread.sleep(1000);
  58. }
  59. }
  60. }

TODO

将状态保存在外部系统。
修改Checkpoint中的状态,让应用根据修改后的状态启动。
原文链接:https://blog.csdn.net/wangpei1949/article/details/100608828