“入门一”使用的是DataStream API,比较底层,要写很多东西。这次来学习高级API——Tabel API

    首先要解决问题是:如何生成TabelStream?目前Table API已经是批流统一,因此这里只用 StreamTableEnvironment。方式很简单

    1. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    5. final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);

    这样就可以得到一个 table api的环境。注意:不是同一个 StreamTableEnvironment 的表是不能交互的。

    简单的table api应用有很多,网上都是,但不符合我的需求。我需要的是支持旁路输出。可以先生成SingleOutputStreamOperator,再用StreamTableEnvironment去处理。理论具备,开始实践。难点要支持fastjson的解析。此时要用POJO类来实现

    User.java (简略版)

    1. package cn.coremail.flink.monitor;
    2. import java.io.Serializable;
    3. public class User implements Serializable{
    4. public Long timestamp;
    5. public String tag;
    6. public String ip;
    7. public User() { }
    8. public User(Long timestamp, String tag, String ip) {
    9. this.timestamp = timestamp;
    10. this.tag = tag;
    11. this.ip = ip;
    12. }
    13. public long getTimestamp() { return timestamp; }
    14. public String getAppId() { return tag; }
    15. public String getModule() { return ip; }
    16. }

    重点来了
    StreamJob.java (只保留关键代码)

    1. public class StreamingJob {
    2. public static void main(String[] args) throws Exception {
    3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    4. final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    5. final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
    6. Properties properties = new Properties();
    7. properties.setProperty("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092");
    8. properties.setProperty("group.id", "flink");
    9. String TOPIC_NAME = "test";
    10. Schema KafKaSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Event\",\"fields\":[{\"name\":\"headers\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"body\",\"type\":\"bytes\"}]}");
    11. DataStream<GenericRecord> KafkaStream = env.addSource(new FlinkKafkaConsumer<GenericRecord>(TOPIC_NAME, AvroDeserializationSchema.forGeneric(KafKaSchema), properties));
    12. final OutputTag<JSONObject> tagDistribution = new OutputTag<JSONObject>("tag distribution", TypeInformation.of(JSONObject.class));
    13. SingleOutputStreamOperator<JSONObject> outputStream = KafkaStream.process(new ProcessFunction<JSONObject, JSONObject>(){
    14. @Override
    15. public void processElement(JSONObject LogInfo, Context ctx, Collector<JSONObject> out) throws Exception {
    16. ctx.output(tagDistribution, LogInfo);
    17. }
    18. });
    19. // 关键的转换,旁路输出的Stream要经过 FlatMap得到POJO实例,才可以被 table 所解析
    20. DataStream<User> stream = outputStream.getSideOutput(tagDistribution).map(new FlatMap());
    21. Table test = tableEnv.fromDataStream(stream, "timestamp,appId,module,tt.proctime");
    22. test.printSchema();
    23. env.execute("Situational awareness");
    24. }
    25. public static class FlatMap implements MapFunction<JSONObject, User> {
    26. @Override
    27. public User map(JSONObject jsonObject) throws Exception {
    28. return new User(new Date().getTime(), jsonObject.getString("tag"), jsonObject.getString("ip"));
    29. }
    30. }
    31. }