参考:
- 官网 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/queryable_state.html
- https://blog.csdn.net/wangpei1949/article/details/100608828
Flink从1.2.0开始支持可查询状态(Queryable State)服务。可查询状态,允许用户从外部系统(如业务系统)查询Flink作业内部的状态。
Queryable State服务架构
- QueryableStateClient: 客户端。运行在外部系统。提交查询请求并接收最终返回的结果。
- QueryableStateClientProxy: 客户端代理。运行在每个TaskManager上。接收客户端的请求,找到Key对应的TaskManager,然后将请求转发给具体的查询服务,并负责最终向客户端返回结果。
QueryableStateServer: 查询服务。运行在每个TaskManager上。处理来自客户端代理的请求并返回结果。
激活Queryable State服务
为了在 Flink 集群上使用 queryable state,需要进行以下操作:
将 flink-queryable-state-runtime_2.11-1.11.2.jar (环境中已存在)从 Flink distribution 的 opt/ 目录拷贝到 lib/ 目录;
- 将参数 queryable-state.enable 设置为 true。详细信息以及其它配置可参考文档 Configuration。
为了验证集群的 queryable state 已经被激活,可以检查任意 task manager 的日志中是否包含 “Started the Queryable State Proxy Server @ …“。
Flink On Yarn模式
Flink On Yarn 集群模式可通过如下设置启动。
添加依赖
cp ${FLINK_HOME}/opt/flink-queryable-state-runtime_2.11-1.11.2.jar ${FLINK_HOME}/lib/
启用Queryable State服务
在${FLINK_HOME}/conf/flink-conf.yaml中设置queryable-state.enable: true。查看TaskManager日志: 在TaskManager中见如下日志,即启动。
Started Queryable State Server @ /x.x.x.x:9067.
Started Queryable State Proxy Server @ /x.x.x.x:9069
Flink Local模式
Flink Local模式可通过如下设置启动。
添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-runtime_2.11</artifactId>
<version>1.11.2</version>
</dependency>
2、启用Queryable State服务
Configuration config = new Configuration();
config.setInteger(ConfigOptions.key("rest.port").defaultValue(8081),8081);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
//启用Queryable State服务
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
3、查看TaskManager日志: 在TaskManager中见如下日志,即启动。
Started Queryable State Server @ /127.0.0.1:9067
Started Queryable State Proxy Server @ /127.0.0.1:9069
使状态可查询
通过以上设置,已在Flink集群上激活了可查询状态服务,除此之外,还需要在代码中暴露具体的可查询状态。有两种方式:
将DataStream转换为QueryableStateStream。如将KeyedStream转换为QueryableStateStream,即可设定KeyedStream中所有Key的State可查。
- 通过状态描述StateDescriptor的setQueryable(String queryableStateName)方法,可设定某个Key的State可查。
主要依赖
Server端
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-runtime_2.11</artifactId>
<version>1.9.0</version>
</dependency>
Client端
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java</artifactId>
<version>1.9.0</version>
</dependency>
通过Queryable State Stream使状态可查
Server端(Flink Job)
package com.bigdata.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class QueryableStateByQueryableStateStream {
public static void main(String[] args) throws Exception {
// 设置参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final String host = parameterTool.get("host", "localhost");
final int port = parameterTool.getInt("port", 1234);
final int parallelism = parameterTool.getInt("parallelism", 4);
// 配置环境
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
env.setParallelism(parallelism);
// 输入数据源:每行数据格式 event,pv
SingleOutputStreamOperator<Tuple2<String, Long>> source =
env.socketTextStream(host, port)
.flatMap(
new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
String[] splits = value.trim().split(",");
out.collect(new Tuple2<>(splits[0], Long.valueOf(splits[1])));
}
});
// 窗口统计: 最近5秒钟内,每个事件的最大pv
SingleOutputStreamOperator<Tuple2<String, Long>> result =
source
.keyBy(
new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
.timeWindow(Time.seconds(5))
.max(1);
// 输出结果
result.print();
// 使得结果的状态可查
// asQueryableState 返回QueryableStateStream
// QueryableStateStream类似于一个接收器,无法进行进一步转换
// QueryableStateStream接收传入的数据并更新状态
result
.keyBy(
new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
.asQueryableState("lastFiveSecondsMaxPV");
env.execute();
}
}
Client端
package com.bigdata.java;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.QueryableStateClient;
public class QueryClient {
public static void main(String[] args) throws Exception {
if (args.length == 0) {
throw new IllegalArgumentException(
"Missing Flink Job ID ...\n"
+ "Usage: ./QueryClient <jobID> [jobManagerHost] [jobManagerPort]");
}
final JobID jobId = JobID.fromHexString(args[0]);
final String jobManagerHost = args.length > 1 ? args[1] : "localhost";
final int jobManagerPort = args.length > 1 ? Integer.parseInt(args[1]) : 9069;
QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);
/*
状态描述符
event_five_seconds_max_pv 状态名称,Server端此名称是通过UUID生成的,这里随意即可
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})) 状态中值的类型,应和Server端相同
*/
ValueStateDescriptor<Tuple2<String, Long>> stateDescriptor =
new ValueStateDescriptor<>(
"lastFiveSecondsMaxPV",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}));
final String key = "click";
/*
轮询执行查询
event_five_seconds_max_pv queryableStateName,应和Server端的queryableStateName相同
*/
while (true) {
CompletableFuture<ValueState<Tuple2<String, Long>>> completableFuture =
client.getKvState(
jobId,
"lastFiveSecondsMaxPV",
key,
BasicTypeInfo.STRING_TYPE_INFO,
stateDescriptor);
System.out.println(completableFuture.get().value());
Thread.sleep(1000);
}
}
}
通过StateDescriptor的setQueryable方法使状态可查
Server端(Flink Job)
package com.bigdata.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
public class QueryableStateByStateDescriptor {
public static void main(String[] args) throws Exception {
// 设置参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final String host = parameterTool.get("host", "localhost");
final int port = parameterTool.getInt("port", 1234);
final int parallelism = parameterTool.getInt("parallelism", 4);
// 配置环境
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
env.setParallelism(parallelism);
// 输入数据源:当前action产生时就上报一条数据,数据格式为 action,money
SingleOutputStreamOperator<Tuple2<String, Long>> source =
env.socketTextStream(host, port)
.flatMap(
new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
String[] splits = value.trim().split(",");
out.collect(new Tuple2<>(splits[0], Long.valueOf(splits[1])));
}
});
// 窗口统计: 计算最近5秒钟内,每个action的count(money)数、sum(money)数
SingleOutputStreamOperator<Tuple5<String, String, String, Long, Long>> result =
source
.keyBy(
new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
.timeWindow(Time.seconds(5))
.process(new CustomCountSumWindowFunction());
result.print();
env.execute();
}
/** 自定义WindowFunction,统一每个Key对应的窗口的count数、sum数 */
static class CustomCountSumWindowFunction
extends ProcessWindowFunction<
Tuple2<String, Long>, Tuple5<String, String, String, Long, Long>, String, TimeWindow> {
// 定义一个ValueState,来存放状态
private transient ValueState<Tuple5<String, String, String, Long, Long>> sumCountValueState;
/**
* 算子CustomCountSumWindowFunction实例化时,只执行一次
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple5<String, String, String, Long, Long>> valueStateDescriptor =
new ValueStateDescriptor<>(
"lastFiveSecondsCountSumValueState",
Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG));
// 通过ValueStateDescriptor.setQueryable 开放此状态,使此状态可查
valueStateDescriptor.setQueryable("lastFiveSecondsCountSum");
sumCountValueState = getRuntimeContext().getState(valueStateDescriptor);
}
/**
* 每个窗口都会调用一次process方法
*
* @param key 当前窗口对应的Key
* @param context 窗口上下文
* @param elements 当前窗口的所有元素
* @param out 收集输出记录
* @throws Exception
*/
@Override
public void process(
String key,
Context context,
Iterable<Tuple2<String, Long>> elements,
Collector<Tuple5<String, String, String, Long, Long>> out)
throws Exception {
// 计算count数、sum数
long currentCount = 0L;
long currentSum = 0L;
for (Tuple2<String, Long> value : elements) {
currentCount += 1;
currentSum += value.f1;
}
// 获取Window开始时间、结束时间
TimeWindow window = context.window();
String windowStartTime =
new DateTime(window.getStart(), DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss");
String windowEndTime =
new DateTime(window.getEnd(), DateTimeZone.UTC).toString("yyyy-MM-dd HH:mm:ss");
// 得到当前值
Tuple5<String, String, String, Long, Long> currentValue =
new Tuple5<>(key, windowStartTime, windowEndTime, currentCount, currentSum);
// 更新状态
sumCountValueState.update(currentValue);
// 输出结果
out.collect(currentValue);
}
}
}
Client端
package com.bigdata.java;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.queryablestate.client.QueryableStateClient;
public class QueryClient2 {
public static void main(String[] args) throws Exception {
if (args.length == 0) {
throw new IllegalArgumentException(
"Missing Flink Job ID ...\n"
+ "Usage: ./QueryClient <jobID> [jobManagerHost] [jobManagerPort]");
}
final JobID jobId = JobID.fromHexString(args[0]);
final String jobManagerHost = args.length > 1 ? args[1] : "localhost";
final int jobManagerPort = args.length > 1 ? Integer.parseInt(args[1]) : 9069;
QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);
/*
状态描述符
*/
ValueStateDescriptor<Tuple5<String, String, String, Long, Long>> valueStateDescriptor =
new ValueStateDescriptor<>(
"lastFiveSecondsCountSumValueState",
Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG));
final String key = "buy";
/*
轮询执行查询
lastFiveSecondsCountSum queryableStateName,应和Server端的queryableStateName相同
*/
while (true) {
CompletableFuture<ValueState<Tuple5<String, String, String, Long, Long>>> completableFuture =
client.getKvState(
jobId,
"lastFiveSecondsCountSum",
key,
BasicTypeInfo.STRING_TYPE_INFO,
valueStateDescriptor);
Tuple5<String, String, String, Long, Long> value = completableFuture.get().value();
System.out.println(
"Key: "
+ value.f0
+ " ,WindowStart: "
+ value.f1
+ " ,WindowEnd: "
+ value.f2
+ " ,Count: "
+ value.f3
+ " ,Sum: "
+ value.f4);
Thread.sleep(1000);
}
}
}