概念

由于普通函数无法访问时间戳或者当前事件时间,DataStream Api 提供了一组相对底层的转换 — 处理函数.
这些函数除了基本功能,还可以访问记录的时间戳跟水位线,并支持注册在将来的某个特定时间触发的计时器,当然处理函数的侧输出功能还可以将数据发送到多个输入流中.

  • 总的来说 可以访问的组件有如下(总的来说 -> 毕竟他有不同的处理函数):

    Events / WaterMark / state / timers / output / ...


ProcessFunction函数是一个抽象的描述 具体实现函数如下:

image.png


KeyedProcessFunction

  1. 方法 以及 对象 调用

image.png

  1. ctx.timerService().registerEventTimeTimer();
  2. ctx.timerService().registerProcessingTimeTimer();
  3. ctx.timerService().currentWatermark()
  4. ctx.timerService().currentProcessingTime()
  5. ctx.timerService().deleteEventTimeTimer();
  6. ctx.timerService().deleteProcessingTimeTimer();

注 : 每个key 可以有多个计时器,但具体到某个时间戳的就只能有一个,默认情况下 KeyedProcessFunction 会将全部计时器的时间戳放到堆中的一个优先队列中.

所有计时器和其他状态一起写入CheckPoint, 如果需要故障恢复,那么在应用重启过程中过期的处理时间计时器也会应用恢复后立即出发,处理时间也是如此.

需要注意: 如果开启了增量检查点RocksDB 状态后端,且将计时器存储在堆中,计时器写入检查点的过程就会是同步的,这种情况下 建议不要使用太多计时器,以避免检查点生成时间过久.

侧输出


侧输出功能允许从同一个函数发出多条数据流,并且他们的类型可以不同.
每个侧输出都由一个 OutputTag[X] 对象标识 , X 为 输出结果流的类型. 可以通过Context对象将记录发送至一个或者多个侧输出.

注意: Filter 与 SideOutput 的区别 Filter 每次筛选过滤都要保留整个流,然后通过遍历整个流来获取相应的数据,显然很浪费性能



代码实现 KeyedProcessFunction/OutputTag[X]

带有测试数据 自己动手测试一下

public class KeyedFunDemo1 {

      /**
         * 场景: 收到服务器的实时心跳检测信息 
         * - 当信息中的 status 状态 为 DEAD 时 代表服务器异常 {
         *     如果发现异常后 同一服务器的 下一条数据status为 RUNNING 时代表误报
         *     否则 就在 5 秒后 发出警告 - 此处略去警告信息 尽量简化 业务代码 暴露逻辑实现
         * }
         * */

    /**
     * Test 数据
     * 192.168.1.101,2019-04-07 21:00,RUNNING
     * 192.168.1.101,2019-04-07 21:02,DEAD
     * 192.168.1.101,2019-04-07 21:03,DEAD
     */



    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> strDataStream = KafkaConsumer.getResultMethod(senv, args);


        strDataStream

                .map(new MapFunction<String, MessageInfo>() {
                    @Override
                    public MessageInfo map(String value) throws Exception {
                        String[] lines = value.split(",");
                        return new MessageInfo(lines[0], lines[1], lines[2]);
                    }
                })

                .keyBy(value -> value.getHostname())
                .process(new MyKeyedProcessFunction())
                .print();

        senv.execute();
    }


    public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, MessageInfo, String> {

        ValueState<String> lastStatus;
        ValueState<Long> warningTimer;
        ValueState<Integer> count;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            this.lastStatus = getRuntimeContext().getState(new ValueStateDescriptor<String>("lastStatus", String.class));
            this.warningTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>("warning-timer", Long.class));
            //this.count = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("count", Integer.class));

        }

        @Override
        public void processElement(MessageInfo value, Context ctx, Collector<String> out) throws Exception {


            String currentStatus = value.getStatus();
            Long currentTimer = warningTimer.value();

            /*连续两次状态都是2 宕机状态,则新建定时器 30秒后进行告警*/

            if ("DEAD".equals(currentStatus) && "DEAD".equals(lastStatus.value())) {

                long timeTs = ctx.timerService().currentProcessingTime() + 5000L;
                ctx.timerService().registerProcessingTimeTimer(timeTs);
                warningTimer.update(timeTs);

            }

            /*如果不是连续告警,我们认为是误报警,删除定时器*/
            else if ("RUNNING".equals(currentStatus) && "DEAD".equals(lastStatus.value())) {

                if (null != currentTimer) {
                    ctx.timerService().deleteProcessingTimeTimer(currentTimer);
                }
                warningTimer.clear();
            }

            /*更新上一次的状态信息*/
            lastStatus.update(value.getStatus());

        }


        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            /*输出报警信息,Regionserver两次状态监测为2 宕机*/
            out.collect("主机IP:" + ctx.getCurrentKey() + " 两次Regionserver状态监测宕机,请监测!!!" + "\n");

        }
    }
}


@Slf4j
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
public class MessageInfo {
    String hostname;
    String msgTime;
    String status;
}

代码实现侧输出 OutputTag[X]

public class SideOutputExample {

    /**
     * We need to create an {@link OutputTag} so that we can reference it when emitting
     * data to a side output and also to retrieve the side output stream from an operation.
     */
    private static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};

    @SuppressWarnings({"rawtypes", "serial"})
    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);
        final boolean fileOutput = params.has("output");

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);


        DataStream<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            // get default test text data
            text = env.fromElements(WordCountData.WORDS);
        }

        SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized = text
                .keyBy(new KeySelector<String, Integer>() {
                    @Override
                    public Integer getKey(String value) throws Exception {
                        return 0;
                    }
                })

                .process(new Tokenizer());


        //   对侧输出的修饰 SideOutput
        SingleOutputStreamOperator<Tuple2<String, Long>> SideOutput = tokenized
                .getSideOutput(rejectedWordsTag)

                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return Tuple2.of(value, 1L);
                    }
                })

                .keyBy(0)
                .reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1));


        //  对tokenized的修饰
        SingleOutputStreamOperator<Tuple2<String, Integer>> operator = tokenized
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum(1);


        // emit result
        if (fileOutput) {
            operator.writeAsText(params.get("output"));
            SideOutput.writeAsText(params.get("rejected-words-output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            operator.print();
            SideOutput.print();
        }

        env.execute();

    }

    public static final class Tokenizer extends ProcessFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(
                String value,
                Context ctx,
                Collector<Tuple2<String, Integer>> out) throws Exception {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 5) {
                    ctx.output(rejectedWordsTag, token);
                } else if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }

        }
    }

      public static class WordCountData {

        public static final String[] WORDS = new String[]{
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles,",
                "And by opposing end them?--To die,--to sleep,--",
                "No more; and by a sleep to say we end",
                "The heartache, and the thousand natural shocks",
                "That flesh is heir to,--'tis a consummation",
                "Devoutly to be wish'd. To die,--to sleep;--",
                "To sleep! perchance to dream:--ay, there's the rub;",
                "For in that sleep of death what dreams may come,",
                "When we have shuffled off this mortal coil,",
                "Must give us pause: there's the respect",
                "That makes calamity of so long life;",
                "For who would bear the whips and scorns of time,",
                "The oppressor's wrong, the proud man's contumely,",
                "The pangs of despis'd love, the law's delay,",
                "The insolence of office, and the spurns",
                "That patient merit of the unworthy takes,",
                "When he himself might his quietus make",
                "With a bare bodkin? who would these fardels bear,",
                "To grunt and sweat under a weary life,",
                "But that the dread of something after death,--",
                "The undiscover'd country, from whose bourn",
                "No traveller returns,--puzzles the will,",
                "And makes us rather bear those ills we have",
                "Than fly to others that we know not of?",
                "Thus conscience does make cowards of us all;",
                "And thus the native hue of resolution",
                "Is sicklied o'er with the pale cast of thought;",
                "And enterprises of great pith and moment,",
                "With this regard, their currents turn awry,",
                "And lose the name of action.--Soft you now!",
                "The fair Ophelia!--Nymph, in thy orisons",
                "Be all my sins remember'd."
        };
    }