Flink从1.2.0开始支持可查询状态(Queryable State)服务。可查询状态,允许用户从外部系统(如业务系统)查询Flink作业内部的状态。
Queryable State服务架构

- QueryableStateClient: 客户端。运行在外部系统。提交查询请求并接收最终返回的结果。
- QueryableStateClientProxy: 客户端代理。运行在每个TaskManager上。接收客户端的请求,找到Key对应的TaskManager,然后将请求转发给具体的查询服务,并负责最终向客户端返回结果。
- QueryableStateServer: 查询服务。运行在每个TaskManager上。处理来自客户端代理的请求并返回结果。
激活Queryable State服务
Flink On Yarn模式
Flink On Yarn 集群模式可通过如下设置启动。
添加依赖
cp ${FLINK_HOME}/opt/flink-queryable-state-runtime_2.11-1.13.0.jar ${FLINK_HOME}/lib/
启用Queryable State服务
在${FLINK_HOME}/conf/flink-conf.yaml中设置queryable-state.enable: true查看TaskManager日志: 在TaskManager中见如下日志,即启动。
Started Queryable State Server @ /x.x.x.x:9067.Started Queryable State Proxy Server @ /x.x.x.x:9069
Flink Local模式
Flink Local模式可通过如下设置启动。
添加依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-queryable-state-runtime_2.11</artifactId><version>1.13.0</version></dependency>
启用Queryable State服务 ```java Configuration config = new Configuration(); config.setInteger(ConfigOptions.key(“rest.port”).defaultValue(8081),8081); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); //启用Queryable State服务 config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
3. 查看TaskManager日志: 在TaskManager中见如下日志,即启动。```javaStarted Queryable State Server @ /127.0.0.1:9067Started Queryable State Proxy Server @ /127.0.0.1:9069
使状态可查询
通过以上设置,已在Flink集群上激活了可查询状态服务,除此之外,还需要在代码中暴露具体的可查询状态。有两种方式:
- 将DataStream转换为QueryableStateStream。如将KeyedStream转换为QueryableStateStream,即可设定KeyedStream中所有Key的State可查。
- 通过状态描述StateDescriptor的setQueryable(String queryableStateName)方法,可设定某个Key的State可查。
以下通过两个例子总结这两种方式的使用。
主要依赖
Server端
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-queryable-state-runtime_2.11</artifactId><version>1.13.0</version></dependency>
Client端
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-queryable-state-client-java</artifactId><version>1.9.0</version></dependency>
通过Queryable State Stream使状态可查
Server端(Flink Job)
package com.lingyao.state;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.configuration.ConfigConstants;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.QueryableStateOptions;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;/*** @Author : wangxiaohui* @Date: 2021-09-29 11:26* Queryable State Stream*/public class QueryableStateByQueryableStateStream {public static void main(String[] args) throws Exception {// 设置参数ParameterTool parameterTool = ParameterTool.fromArgs(args);final String host = parameterTool.get("host", "localhost");final int port = parameterTool.getInt("port", 1234);final int parallelism = parameterTool.getInt("parallelism", 4);// 配置环境Configuration config = new Configuration();config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);env.getConfig().setAutoWatermarkInterval(0);env.setParallelism(parallelism);// 输入数据源:每行数据格式 event,pvSingleOutputStreamOperator<Tuple2<String, Long>> source =env.socketTextStream(host, port).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Long>> out) {String[] splits = value.trim().split(",");out.collect(new Tuple2<>(splits[0], Long.valueOf(splits[1])));}});// 窗口统计: 最近5秒钟内,每个事件的最大pvSingleOutputStreamOperator<Tuple2<String, Long>> result =source.keyBy(new KeySelector<Tuple2<String, Long>, String>() {@Overridepublic String getKey(Tuple2<String, Long> value) throws Exception {return value.f0;}}).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).max(1);// 输出结果result.print();// 使得结果的状态可查// asQueryableState 返回QueryableStateStream// QueryableStateStream类似于一个接收器,无法进行进一步转换// QueryableStateStream接收传入的数据并更新状态result.keyBy(new KeySelector<Tuple2<String, Long>, String>() {@Overridepublic String getKey(Tuple2<String, Long> value) throws Exception {return value.f0;}}).asQueryableState("lastFiveSecondsMaxPV");env.execute();}}
Client端
package com.lingyao.state;import org.apache.flink.api.common.JobID;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.queryablestate.client.QueryableStateClient;import java.util.concurrent.CompletableFuture;/*** @Author : wangxiaohui* @Date: 2021-09-29 11:29*/public class QueryClient {public static void main(String[] args) throws Exception {if (args.length == 0) {throw new IllegalArgumentException("Missing Flink Job ID ...\n"+ "Usage: ./QueryClient <jobID> [jobManagerHost] [jobManagerPort]");}final JobID jobId = JobID.fromHexString(args[0]);final String jobManagerHost = args.length > 1 ? args[1] : "localhost";final int jobManagerPort = args.length > 1 ? Integer.parseInt(args[1]) : 9069;QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);/*状态描述符event_five_seconds_max_pv 状态名称,Server端此名称是通过UUID生成的,这里随意即可TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})) 状态中值的类型,应和Server端相同*/ValueStateDescriptor<Tuple2<String, Long>> stateDescriptor =new ValueStateDescriptor<>("lastFiveSecondsMaxPV",TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}));final String key = "click";/*轮询执行查询event_five_seconds_max_pv queryableStateName,应和Server端的queryableStateName相同*/while (true) {CompletableFuture<ValueState<Tuple2<String, Long>>> completableFuture =client.getKvState(jobId,"lastFiveSecondsMaxPV",key,BasicTypeInfo.STRING_TYPE_INFO,stateDescriptor);System.out.println(completableFuture.get().value());Thread.sleep(1000);}}}
通过StateDescriptor的setQueryable方法使状态可查
Server端(Flink Job)
package com.lingyao.state;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple5;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.configuration.ConfigConstants;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.QueryableStateOptions;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import org.joda.time.DateTime;import org.joda.time.DateTimeZone;/*** @Author : wangxiaohui* @Date: 2021-09-29 14:36*/public class QueryableStateByStateDescriptor {public static void main(String[] args) throws Exception {// 设置参数ParameterTool parameterTool = ParameterTool.fromArgs(args);final String host = parameterTool.get("host", "localhost");final int port = parameterTool.getInt("port", 1234);final int parallelism = parameterTool.getInt("parallelism", 4);// 配置环境Configuration config = new Configuration();config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);env.setParallelism(parallelism);env.getConfig().setAutoWatermarkInterval(0);// 输入数据源:当前action产生时就上报一条数据,数据格式为 action,moneySingleOutputStreamOperator<Tuple2<String, Long>> source =env.socketTextStream(host, port).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Long>> out) {String[] splits = value.trim().split(",");out.collect(new Tuple2<>(splits[0], Long.valueOf(splits[1])));}});// 窗口统计: 计算最近5秒钟内,每个action的count(money)数、sum(money)数SingleOutputStreamOperator<Tuple5<String, String, String, Long, Long>> result =source.keyBy(new KeySelector<Tuple2<String, Long>, String>() {@Overridepublic String getKey(Tuple2<String, Long> value) throws Exception {return value.f0;}}).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new CustomCountSumWindowFunction());result.print();env.execute();}/** 自定义WindowFunction,统一每个Key对应的窗口的count数、sum数 */static class CustomCountSumWindowFunctionextends ProcessWindowFunction<Tuple2<String, Long>, Tuple5<String, String, String, Long, Long>, String, TimeWindow> {// 定义一个ValueState,来存放状态private transient ValueState<Tuple5<String, String, String, Long, Long>> sumCountValueState;/*** 算子CustomCountSumWindowFunction实例化时,只执行一次** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Tuple5<String, String, String, Long, Long>> valueStateDescriptor =new ValueStateDescriptor<>("lastFiveSecondsCountSumValueState",Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG));// 通过ValueStateDescriptor.setQueryable 开放此状态,使此状态可查valueStateDescriptor.setQueryable("lastFiveSecondsCountSum");sumCountValueState = getRuntimeContext().getState(valueStateDescriptor);}/*** 每个窗口都会调用一次process方法** @param key 当前窗口对应的Key* @param context 窗口上下文* @param elements 当前窗口的所有元素* @param out 收集输出记录* @throws Exception*/@Overridepublic void process(String key,Context context,Iterable<Tuple2<String, Long>> elements,Collector<Tuple5<String, String, String, Long, Long>> out)throws Exception {// 计算count数、sum数long currentCount = 0L;long currentSum = 0L;for (Tuple2<String, Long> value : elements) {currentCount += 1;currentSum += value.f1;}// 获取Window开始时间、结束时间TimeWindow window = context.window();String windowStartTime =new DateTime(window.getStart(), DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss");String windowEndTime =new DateTime(window.getEnd(), DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss");// 得到当前值Tuple5<String, String, String, Long, Long> currentValue =new Tuple5<>(key, windowStartTime, windowEndTime, currentCount, currentSum);// 更新状态sumCountValueState.update(currentValue);// 输出结果out.collect(currentValue);}}}
Client端
package com.lingyao.state;import org.apache.flink.api.common.JobID;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple5;import org.apache.flink.queryablestate.client.QueryableStateClient;import java.util.concurrent.CompletableFuture;/*** @Author : wangxiaohui* @Date: 2021-09-29 14:42*/public class QueryClient2 {public static void main(String[] args) throws Exception {if (args.length == 0) {throw new IllegalArgumentException("Missing Flink Job ID ...\n"+ "Usage: ./QueryClient <jobID> [jobManagerHost] [jobManagerPort]");}final JobID jobId = JobID.fromHexString(args[0]);final String jobManagerHost = args.length > 1 ? args[1] : "localhost";final int jobManagerPort = args.length > 1 ? Integer.parseInt(args[1]) : 9069;QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);/*状态描述符*/ValueStateDescriptor<Tuple5<String, String, String, Long, Long>> valueStateDescriptor =new ValueStateDescriptor<>("lastFiveSecondsCountSumValueState",Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG));final String key = "click";/*轮询执行查询lastFiveSecondsCountSum queryableStateName,应和Server端的queryableStateName相同*/while (true) {CompletableFuture<ValueState<Tuple5<String, String, String, Long, Long>>> completableFuture =client.getKvState(jobId,"lastFiveSecondsCountSum",key,BasicTypeInfo.STRING_TYPE_INFO,valueStateDescriptor);Tuple5<String, String, String, Long, Long> value = completableFuture.get().value();System.out.println("Key: "+ value.f0+ " ,WindowStart: "+ value.f1+ " ,WindowEnd: "+ value.f2+ " ,Count: "+ value.f3+ " ,Sum: "+ value.f4);Thread.sleep(1000);}}}
TODO
将状态保存在外部系统。
修改Checkpoint中的状态,让应用根据修改后的状态启动。
原文链接:https://blog.csdn.net/wangpei1949/article/details/100608828
