参考:

Flink从1.2.0开始支持可查询状态(Queryable State)服务。可查询状态,允许用户从外部系统(如业务系统)查询Flink作业内部的状态。

Queryable State服务架构

image.png

  1. QueryableStateClient: 客户端。运行在外部系统。提交查询请求并接收最终返回的结果。
  2. QueryableStateClientProxy: 客户端代理。运行在每个TaskManager上。接收客户端的请求,找到Key对应的TaskManager,然后将请求转发给具体的查询服务,并负责最终向客户端返回结果。
  3. QueryableStateServer: 查询服务。运行在每个TaskManager上。处理来自客户端代理的请求并返回结果。

    激活Queryable State服务

    为了在 Flink 集群上使用 queryable state,需要进行以下操作:

  4. 将 flink-queryable-state-runtime_2.11-1.11.2.jar (环境中已存在)从 Flink distribution 的 opt/ 目录拷贝到 lib/ 目录;

  5. 将参数 queryable-state.enable 设置为 true。详细信息以及其它配置可参考文档 Configuration

为了验证集群的 queryable state 已经被激活,可以检查任意 task manager 的日志中是否包含 “Started the Queryable State Proxy Server @ …“。

Flink On Yarn模式

Flink On Yarn 集群模式可通过如下设置启动。

  1. 添加依赖

    1. cp ${FLINK_HOME}/opt/flink-queryable-state-runtime_2.11-1.11.2.jar ${FLINK_HOME}/lib/
  2. 启用Queryable State服务
    在${FLINK_HOME}/conf/flink-conf.yaml中设置queryable-state.enable: true。

  3. 查看TaskManager日志: 在TaskManager中见如下日志,即启动。

    1. Started Queryable State Server @ /x.x.x.x:9067.
    2. Started Queryable State Proxy Server @ /x.x.x.x:9069

    Flink Local模式

    Flink Local模式可通过如下设置启动。

  4. 添加依赖

    1. <dependency>
    2. <groupId>org.apache.flink</groupId>
    3. <artifactId>flink-queryable-state-runtime_2.11</artifactId>
    4. <version>1.11.2</version>
    5. </dependency>

    2、启用Queryable State服务

    1. Configuration config = new Configuration();
    2. config.setInteger(ConfigOptions.key("rest.port").defaultValue(8081),8081);
    3. config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    4. //启用Queryable State服务
    5. config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
    6. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);

    3、查看TaskManager日志: 在TaskManager中见如下日志,即启动。

    1. Started Queryable State Server @ /127.0.0.1:9067
    2. Started Queryable State Proxy Server @ /127.0.0.1:9069

    使状态可查询

    通过以上设置,已在Flink集群上激活了可查询状态服务,除此之外,还需要在代码中暴露具体的可查询状态。有两种方式:

  5. 将DataStream转换为QueryableStateStream。如将KeyedStream转换为QueryableStateStream,即可设定KeyedStream中所有Key的State可查。

  6. 通过状态描述StateDescriptor的setQueryable(String queryableStateName)方法,可设定某个Key的State可查。

以下通过两个例子总结这两种方式的使用。

主要依赖

Server端

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-streaming-java_2.11</artifactId>
  4. <version>1.9.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-runtime-web_2.11</artifactId>
  9. <version>1.9.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-queryable-state-runtime_2.11</artifactId>
  14. <version>1.9.0</version>
  15. </dependency>

Client端

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-core</artifactId>
  4. <version>1.9.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-queryable-state-client-java</artifactId>
  9. <version>1.9.0</version>
  10. </dependency>

通过Queryable State Stream使状态可查

Server端(Flink Job)

  1. package com.bigdata.flink;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.functions.KeySelector;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.api.java.utils.ParameterTool;
  6. import org.apache.flink.configuration.ConfigConstants;
  7. import org.apache.flink.configuration.Configuration;
  8. import org.apache.flink.configuration.QueryableStateOptions;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.windowing.time.Time;
  12. import org.apache.flink.util.Collector;
  13. public class QueryableStateByQueryableStateStream {
  14. public static void main(String[] args) throws Exception {
  15. // 设置参数
  16. ParameterTool parameterTool = ParameterTool.fromArgs(args);
  17. final String host = parameterTool.get("host", "localhost");
  18. final int port = parameterTool.getInt("port", 1234);
  19. final int parallelism = parameterTool.getInt("parallelism", 4);
  20. // 配置环境
  21. Configuration config = new Configuration();
  22. config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
  23. config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
  24. StreamExecutionEnvironment env =
  25. StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
  26. env.setParallelism(parallelism);
  27. // 输入数据源:每行数据格式 event,pv
  28. SingleOutputStreamOperator<Tuple2<String, Long>> source =
  29. env.socketTextStream(host, port)
  30. .flatMap(
  31. new FlatMapFunction<String, Tuple2<String, Long>>() {
  32. @Override
  33. public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
  34. String[] splits = value.trim().split(",");
  35. out.collect(new Tuple2<>(splits[0], Long.valueOf(splits[1])));
  36. }
  37. });
  38. // 窗口统计: 最近5秒钟内,每个事件的最大pv
  39. SingleOutputStreamOperator<Tuple2<String, Long>> result =
  40. source
  41. .keyBy(
  42. new KeySelector<Tuple2<String, Long>, String>() {
  43. @Override
  44. public String getKey(Tuple2<String, Long> value) throws Exception {
  45. return value.f0;
  46. }
  47. })
  48. .timeWindow(Time.seconds(5))
  49. .max(1);
  50. // 输出结果
  51. result.print();
  52. // 使得结果的状态可查
  53. // asQueryableState 返回QueryableStateStream
  54. // QueryableStateStream类似于一个接收器,无法进行进一步转换
  55. // QueryableStateStream接收传入的数据并更新状态
  56. result
  57. .keyBy(
  58. new KeySelector<Tuple2<String, Long>, String>() {
  59. @Override
  60. public String getKey(Tuple2<String, Long> value) throws Exception {
  61. return value.f0;
  62. }
  63. })
  64. .asQueryableState("lastFiveSecondsMaxPV");
  65. env.execute();
  66. }
  67. }

Client端

  1. package com.bigdata.java;
  2. import java.util.concurrent.CompletableFuture;
  3. import org.apache.flink.api.common.JobID;
  4. import org.apache.flink.api.common.state.ValueState;
  5. import org.apache.flink.api.common.state.ValueStateDescriptor;
  6. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  7. import org.apache.flink.api.common.typeinfo.TypeHint;
  8. import org.apache.flink.api.common.typeinfo.TypeInformation;
  9. import org.apache.flink.api.java.tuple.Tuple2;
  10. import org.apache.flink.queryablestate.client.QueryableStateClient;
  11. public class QueryClient {
  12. public static void main(String[] args) throws Exception {
  13. if (args.length == 0) {
  14. throw new IllegalArgumentException(
  15. "Missing Flink Job ID ...\n"
  16. + "Usage: ./QueryClient <jobID> [jobManagerHost] [jobManagerPort]");
  17. }
  18. final JobID jobId = JobID.fromHexString(args[0]);
  19. final String jobManagerHost = args.length > 1 ? args[1] : "localhost";
  20. final int jobManagerPort = args.length > 1 ? Integer.parseInt(args[1]) : 9069;
  21. QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);
  22. /*
  23. 状态描述符
  24. event_five_seconds_max_pv 状态名称,Server端此名称是通过UUID生成的,这里随意即可
  25. TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})) 状态中值的类型,应和Server端相同
  26. */
  27. ValueStateDescriptor<Tuple2<String, Long>> stateDescriptor =
  28. new ValueStateDescriptor<>(
  29. "lastFiveSecondsMaxPV",
  30. TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}));
  31. final String key = "click";
  32. /*
  33. 轮询执行查询
  34. event_five_seconds_max_pv queryableStateName,应和Server端的queryableStateName相同
  35. */
  36. while (true) {
  37. CompletableFuture<ValueState<Tuple2<String, Long>>> completableFuture =
  38. client.getKvState(
  39. jobId,
  40. "lastFiveSecondsMaxPV",
  41. key,
  42. BasicTypeInfo.STRING_TYPE_INFO,
  43. stateDescriptor);
  44. System.out.println(completableFuture.get().value());
  45. Thread.sleep(1000);
  46. }
  47. }
  48. }

通过StateDescriptor的setQueryable方法使状态可查

Server端(Flink Job)

  1. package com.bigdata.flink;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.common.state.ValueState;
  4. import org.apache.flink.api.common.state.ValueStateDescriptor;
  5. import org.apache.flink.api.common.typeinfo.Types;
  6. import org.apache.flink.api.java.functions.KeySelector;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.api.java.tuple.Tuple5;
  9. import org.apache.flink.api.java.utils.ParameterTool;
  10. import org.apache.flink.configuration.ConfigConstants;
  11. import org.apache.flink.configuration.Configuration;
  12. import org.apache.flink.configuration.QueryableStateOptions;
  13. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  15. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  16. import org.apache.flink.streaming.api.windowing.time.Time;
  17. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  18. import org.apache.flink.util.Collector;
  19. import org.joda.time.DateTime;
  20. import org.joda.time.DateTimeZone;
  21. public class QueryableStateByStateDescriptor {
  22. public static void main(String[] args) throws Exception {
  23. // 设置参数
  24. ParameterTool parameterTool = ParameterTool.fromArgs(args);
  25. final String host = parameterTool.get("host", "localhost");
  26. final int port = parameterTool.getInt("port", 1234);
  27. final int parallelism = parameterTool.getInt("parallelism", 4);
  28. // 配置环境
  29. Configuration config = new Configuration();
  30. config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
  31. config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
  32. StreamExecutionEnvironment env =
  33. StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
  34. env.setParallelism(parallelism);
  35. // 输入数据源:当前action产生时就上报一条数据,数据格式为 action,money
  36. SingleOutputStreamOperator<Tuple2<String, Long>> source =
  37. env.socketTextStream(host, port)
  38. .flatMap(
  39. new FlatMapFunction<String, Tuple2<String, Long>>() {
  40. @Override
  41. public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
  42. String[] splits = value.trim().split(",");
  43. out.collect(new Tuple2<>(splits[0], Long.valueOf(splits[1])));
  44. }
  45. });
  46. // 窗口统计: 计算最近5秒钟内,每个action的count(money)数、sum(money)数
  47. SingleOutputStreamOperator<Tuple5<String, String, String, Long, Long>> result =
  48. source
  49. .keyBy(
  50. new KeySelector<Tuple2<String, Long>, String>() {
  51. @Override
  52. public String getKey(Tuple2<String, Long> value) throws Exception {
  53. return value.f0;
  54. }
  55. })
  56. .timeWindow(Time.seconds(5))
  57. .process(new CustomCountSumWindowFunction());
  58. result.print();
  59. env.execute();
  60. }
  61. /** 自定义WindowFunction,统一每个Key对应的窗口的count数、sum数 */
  62. static class CustomCountSumWindowFunction
  63. extends ProcessWindowFunction<
  64. Tuple2<String, Long>, Tuple5<String, String, String, Long, Long>, String, TimeWindow> {
  65. // 定义一个ValueState,来存放状态
  66. private transient ValueState<Tuple5<String, String, String, Long, Long>> sumCountValueState;
  67. /**
  68. * 算子CustomCountSumWindowFunction实例化时,只执行一次
  69. *
  70. * @param parameters
  71. * @throws Exception
  72. */
  73. @Override
  74. public void open(Configuration parameters) throws Exception {
  75. ValueStateDescriptor<Tuple5<String, String, String, Long, Long>> valueStateDescriptor =
  76. new ValueStateDescriptor<>(
  77. "lastFiveSecondsCountSumValueState",
  78. Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG));
  79. // 通过ValueStateDescriptor.setQueryable 开放此状态,使此状态可查
  80. valueStateDescriptor.setQueryable("lastFiveSecondsCountSum");
  81. sumCountValueState = getRuntimeContext().getState(valueStateDescriptor);
  82. }
  83. /**
  84. * 每个窗口都会调用一次process方法
  85. *
  86. * @param key 当前窗口对应的Key
  87. * @param context 窗口上下文
  88. * @param elements 当前窗口的所有元素
  89. * @param out 收集输出记录
  90. * @throws Exception
  91. */
  92. @Override
  93. public void process(
  94. String key,
  95. Context context,
  96. Iterable<Tuple2<String, Long>> elements,
  97. Collector<Tuple5<String, String, String, Long, Long>> out)
  98. throws Exception {
  99. // 计算count数、sum数
  100. long currentCount = 0L;
  101. long currentSum = 0L;
  102. for (Tuple2<String, Long> value : elements) {
  103. currentCount += 1;
  104. currentSum += value.f1;
  105. }
  106. // 获取Window开始时间、结束时间
  107. TimeWindow window = context.window();
  108. String windowStartTime =
  109. new DateTime(window.getStart(), DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss");
  110. String windowEndTime =
  111. new DateTime(window.getEnd(), DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss");
  112. // 得到当前值
  113. Tuple5<String, String, String, Long, Long> currentValue =
  114. new Tuple5<>(key, windowStartTime, windowEndTime, currentCount, currentSum);
  115. // 更新状态
  116. sumCountValueState.update(currentValue);
  117. // 输出结果
  118. out.collect(currentValue);
  119. }
  120. }
  121. }

Client端

  1. package com.bigdata.java;
  2. import java.util.concurrent.CompletableFuture;
  3. import org.apache.flink.api.common.JobID;
  4. import org.apache.flink.api.common.state.ValueState;
  5. import org.apache.flink.api.common.state.ValueStateDescriptor;
  6. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  7. import org.apache.flink.api.common.typeinfo.Types;
  8. import org.apache.flink.api.java.tuple.Tuple5;
  9. import org.apache.flink.queryablestate.client.QueryableStateClient;
  10. public class QueryClient2 {
  11. public static void main(String[] args) throws Exception {
  12. if (args.length == 0) {
  13. throw new IllegalArgumentException(
  14. "Missing Flink Job ID ...\n"
  15. + "Usage: ./QueryClient <jobID> [jobManagerHost] [jobManagerPort]");
  16. }
  17. final JobID jobId = JobID.fromHexString(args[0]);
  18. final String jobManagerHost = args.length > 1 ? args[1] : "localhost";
  19. final int jobManagerPort = args.length > 1 ? Integer.parseInt(args[1]) : 9069;
  20. QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);
  21. /*
  22. 状态描述符
  23. */
  24. ValueStateDescriptor<Tuple5<String, String, String, Long, Long>> valueStateDescriptor =
  25. new ValueStateDescriptor<>(
  26. "lastFiveSecondsCountSumValueState",
  27. Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG));
  28. final String key = "buy";
  29. /*
  30. 轮询执行查询
  31. lastFiveSecondsCountSum queryableStateName,应和Server端的queryableStateName相同
  32. */
  33. while (true) {
  34. CompletableFuture<ValueState<Tuple5<String, String, String, Long, Long>>> completableFuture =
  35. client.getKvState(
  36. jobId,
  37. "lastFiveSecondsCountSum",
  38. key,
  39. BasicTypeInfo.STRING_TYPE_INFO,
  40. valueStateDescriptor);
  41. Tuple5<String, String, String, Long, Long> value = completableFuture.get().value();
  42. System.out.println(
  43. "Key: "
  44. + value.f0
  45. + " ,WindowStart: "
  46. + value.f1
  47. + " ,WindowEnd: "
  48. + value.f2
  49. + " ,Count: "
  50. + value.f3
  51. + " ,Sum: "
  52. + value.f4);
  53. Thread.sleep(1000);
  54. }
  55. }
  56. }