“入门一”使用的是DataStream API,比较底层,要写很多东西。这次来学习高级API——Tabel API
首先要解决问题是:如何生成TabelStream?目前Table API已经是批流统一,因此这里只用 StreamTableEnvironment。方式很简单
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
这样就可以得到一个 table api的环境。注意:不是同一个 StreamTableEnvironment 的表是不能交互的。
简单的table api应用有很多,网上都是,但不符合我的需求。我需要的是支持旁路输出。可以先生成SingleOutputStreamOperator,再用StreamTableEnvironment去处理。理论具备,开始实践。难点要支持fastjson的解析。此时要用POJO类来实现
User.java (简略版)
package cn.coremail.flink.monitor;
import java.io.Serializable;
public class User implements Serializable{
public Long timestamp;
public String tag;
public String ip;
public User() { }
public User(Long timestamp, String tag, String ip) {
this.timestamp = timestamp;
this.tag = tag;
this.ip = ip;
}
public long getTimestamp() { return timestamp; }
public String getAppId() { return tag; }
public String getModule() { return ip; }
}
重点来了
StreamJob.java (只保留关键代码)
public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");
properties.setProperty("group.id", "flink");
String TOPIC_NAME = "test";
Schema KafKaSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"fields\":[{\"name\":\"headers\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"body\",\"type\":\"bytes\"}]}");
DataStream<GenericRecord> KafkaStream = env.addSource(new FlinkKafkaConsumer<GenericRecord>(TOPIC_NAME, AvroDeserializationSchema.forGeneric(KafKaSchema), properties));
final OutputTag<JSONObject> tagDistribution = new OutputTag<JSONObject>("tag distribution", TypeInformation.of(JSONObject.class));
SingleOutputStreamOperator<JSONObject> outputStream = KafkaStream.process(new ProcessFunction<JSONObject, JSONObject>(){
@Override
public void processElement(JSONObject LogInfo, Context ctx, Collector<JSONObject> out) throws Exception {
ctx.output(tagDistribution, LogInfo);
}
});
// 关键的转换,旁路输出的Stream要经过 FlatMap得到POJO实例,才可以被 table 所解析
DataStream<User> stream = outputStream.getSideOutput(tagDistribution).map(new FlatMap());
Table test = tableEnv.fromDataStream(stream, "timestamp,appId,module,tt.proctime");
test.printSchema();
env.execute("Situational awareness");
}
public static class FlatMap implements MapFunction<JSONObject, User> {
@Override
public User map(JSONObject jsonObject) throws Exception {
return new User(new Date().getTime(), jsonObject.getString("tag"), jsonObject.getString("ip"));
}
}
}