“入门一”使用的是DataStream API,比较底层,要写很多东西。这次来学习高级API——Tabel API
首先要解决问题是:如何生成TabelStream?目前Table API已经是批流统一,因此这里只用 StreamTableEnvironment。方式很简单
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
这样就可以得到一个 table api的环境。注意:不是同一个 StreamTableEnvironment 的表是不能交互的。
简单的table api应用有很多,网上都是,但不符合我的需求。我需要的是支持旁路输出。可以先生成SingleOutputStreamOperator,再用StreamTableEnvironment去处理。理论具备,开始实践。难点要支持fastjson的解析。此时要用POJO类来实现
User.java (简略版)
package cn.coremail.flink.monitor;import java.io.Serializable;public class User implements Serializable{public Long timestamp;public String tag;public String ip;public User() { }public User(Long timestamp, String tag, String ip) {this.timestamp = timestamp;this.tag = tag;this.ip = ip;}public long getTimestamp() { return timestamp; }public String getAppId() { return tag; }public String getModule() { return ip; }}
重点来了
StreamJob.java (只保留关键代码)
public class StreamingJob {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");properties.setProperty("group.id", "flink");String TOPIC_NAME = "test";Schema KafKaSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"fields\":[{\"name\":\"headers\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"body\",\"type\":\"bytes\"}]}");DataStream<GenericRecord> KafkaStream = env.addSource(new FlinkKafkaConsumer<GenericRecord>(TOPIC_NAME, AvroDeserializationSchema.forGeneric(KafKaSchema), properties));final OutputTag<JSONObject> tagDistribution = new OutputTag<JSONObject>("tag distribution", TypeInformation.of(JSONObject.class));SingleOutputStreamOperator<JSONObject> outputStream = KafkaStream.process(new ProcessFunction<JSONObject, JSONObject>(){@Overridepublic void processElement(JSONObject LogInfo, Context ctx, Collector<JSONObject> out) throws Exception {ctx.output(tagDistribution, LogInfo);}});// 关键的转换,旁路输出的Stream要经过 FlatMap得到POJO实例,才可以被 table 所解析DataStream<User> stream = outputStream.getSideOutput(tagDistribution).map(new FlatMap());Table test = tableEnv.fromDataStream(stream, "timestamp,appId,module,tt.proctime");test.printSchema();env.execute("Situational awareness");}public static class FlatMap implements MapFunction<JSONObject, User> {@Overridepublic User map(JSONObject jsonObject) throws Exception {return new User(new Date().getTime(), jsonObject.getString("tag"), jsonObject.getString("ip"));}}}
