先keyedBy 会每一种key生成一个window,并不是所有key在一个window里
image.png

三种windows assiger:分配器

CountWindows

按照数据条数生成window。 是指相同key的数据条数 而不是所有数据条数

  • 滚动窗口

    1. input.keyBy("id")
    2. .countWindow(15)//窗口长度
  • 滑动窗口

    1. input.keyBy("id")
    2. .countWindow(155)//窗口长度、步长

    TimeWindows

    按照时间生成window

  • 滚动窗口

    1. input.keyBy("id")
    2. .timeWindow(Time.seconds(15))
    3. 或者
    4. input.keyBy("id")
    5. .windows(TumblingEventTimeWindows.of(Time.minutes(1))) //窗口长度
  • 滑动窗口

    1. input.keyBy("id")
    2. .timeWindow(Time.seconds(15), Time.seconds(5))
    3. 或者
    4. input.keyBy("id")
    5. .windows(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))) //窗口长度、步长
  • 会话窗口

非活动间隔 : 两次的活动间隔超过15秒,这个窗口就会关闭

  1. map.keyBy(0)
  2. .window(EventTimeSessionWindows.withGap(Time.seconds(15)))

全局窗口

  1. map.keyBy(0)
  2. .window(GlobalWindows.create())

窗口函数

keyBy-增量聚合函数

  • 窗口内每来一条数据,就进行计算
  • 如下三大类方法:
    • .reduce(new ReduceFunction):输入输出类型一致,有限制
    • .aggregate(new AggregateFunction):输入输出类型不一致,重写四个方法
    • sum、min、max ```java sum\avg\min\max\maxBy等 也可以自己实现(如下)

// 用AggregateFunction 实现pv/uv stream .keyBy(event -> true) // 所有数据放入一个分区 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(new MyAggregateFunction()) .print();

// 实现pv/uv=人均访问次数 public static class MyAggregateFunction implements AggregateFunction,Long>, Double>{ // 创建累加器 每个任务只会调用一次 @Override public Tuple2, Long> createAccumulator() { return Tuple2.of(new HashSet(), 0L);// f0:uv f1:pv } // 每条数据调佣一次 @Override public Tuple2, Long> add(Event event, Tuple2, Long> accumulator) { accumulator.f0.add(event.user); return Tuple2.of(accumulator.f0,accumulator.f1 + 1L); } // 窗口闭合时计算:从累加器 计算出最终结果 @Override public Double getResult(Tuple2, Long> accumulator) { return (double) accumulator.f1 / accumulator.f0.size(); } // 合并累加器:只在需要合并窗口时需要-会话窗口 @Override public Tuple2, Long> merge(Tuple2, Long> hashSetLongTuple2, Tuple2, Long> acc1) { return null; } }

  1. <a name="tviVG"></a>
  2. #### keyBy-全窗口函数
  3. - 1、窗口被触发时 才计算,全窗口数据储存下来 压力大。
  4. - 2、如下两种方法:ProcessWindowFunction、WindowFunction
  5. - .process(new ProcessWindowFunction)<输入、输出、key、本window>
  6. - 重写process方法(key、本context、输入、输出):拿到context的信息
  7. - .apply(new WindowFunction)<输入、输出、key、本window>
  8. - 重写apply方法(key、本window、输入、输出):拿到窗口的信息
  9. - 已逐渐弃用,用ProcessWindowFunction替代
  10. ```java
  11. //用windowFuncation 实现count计数功能
  12. SingleOutputStreamOperator<Tuple3<String, Long, Integer>> apply = map
  13. .keyBy(0)
  14. .timeWindow(Time.seconds(15))
  15. .apply(new WindowFunction<Sensorreading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {
  16. @Override
  17. public void apply(Tuple tuple, TimeWindow window, Iterable<Sensorreading> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
  18. String field = tuple.getField(0);//获取key。keyBy("")那么key的类型就是tuple
  19. long endTime = window.getEnd();
  20. int size = IteratorUtils.toList(input.iterator()).size();
  21. out.collect(new Tuple3<>(field, endTime, size));
  22. }
  23. });
  24. // ProcessWindowFunction
  25. public static class UvCountByWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow>{
  26. @Override
  27. public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
  28. HashSet<String> userSet = new HashSet<>();
  29. // 遍历所有数据,放到Set里去重
  30. for (Event event: elements){
  31. userSet.add(event.user);
  32. }
  33. // 结合窗口信息,包装输出内容
  34. Long start = context.window().getStart();
  35. Long end = context.window().getEnd();
  36. out.collect("窗口: " + new Timestamp(start) + " ~ " + new Timestamp(end)
  37. + " 的独立访客数量是:" + userSet.size());
  38. }
  39. }
  • context、windows讲解

    public abstract class Context implements java.io.Serializable {
      public abstract W window();
    
      public abstract long currentProcessingTime();
      public abstract long currentWatermark();
    
      public abstract KeyedStateStore windowState();
      public abstract KeyedStateStore globalState();
    }
    

    不keyBy-全窗口函数

  • reduce

  • aggregate
  • process
  • apply(new AllWindowFunction) 废弃被process替代 ```java SingleOutputStreamOperator uvStream = watermarkStream
              .filter(data -> "pv".equals(data.getBehavior()))
              .timeWindowAll(Time.hours(1))
              .apply(new UvCountResult());
    

public static class UvCountResult implements AllWindowFunction{ @Override public void apply(TimeWindow window, Iterable values, Collector out) throws Exception { //定义一个set结构去重 HashSet uidSet = new HashSet<>(); for (UserBehavior value : values) { uidSet.add(value.getUserId()); out.collect(new PageViewCount(“uv”, window.getEnd(), (long)uidSet.size())); } } }

<a name="yAq0m"></a>
#### 增量、全窗口两种函数联合使用
增量聚合reduce()和.aggregate()时,可以添加第二个参数ProcessWindowFunction()或WindowFunction()

- 过程:每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。不再缓存所有数据了,而是将增量聚合函数的结果拿来当作Iterable 类型的输入。一般情况下可迭代集合只有一个元素
- 优点:既可以增量聚合,也可以访问Context的元数据信息(如开始结束时间、状态等)
```java
SingleOutputStreamOperator<ItemViewCount> windowStream = watermarkStream.filter(data -> "pv".equals(data.getBehavior()))
                .keyBy("itemId")
                .timeWindow(Time.hours(1), Time.minutes(5))
                .aggregate(new MyAggregateFunction(), new MyWindowFunction());

public static class MyAggregateFunction implements AggregateFunction<UserBehavior,Long,Long>{
    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(UserBehavior value, Long accumulator) {
        return accumulator + 1;
    }

    @Override
    public Long getResult(Long accumulator) {
        return accumulator;
    }

    @Override
    public Long merge(Long a, Long b) {
        return a + b;
    }
}

public static class MyWindowFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow>{
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
        Long itemId = tuple.getField(0);
        long windowEnd = window.getEnd();
        Long count = input.iterator().next();//直接获取next即可
        out.collect(new ItemViewCount(itemId,windowEnd,count));
    }
}

其他可选api

  • .trigger() —— 触发器

    • 定义 window 什么时候关闭,触发计算并输出结果
    • CONTINUE(继续):什么都不做
    • FIRE(触发):触发计算,输出结果
    • PURGE(清除):清空窗口中的所有数据,销毁窗口
    • FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口
    • 比如:窗口开的太大,计算pv需要10秒,但我们希望每1秒触发一次计算

      public class TriggerTest {
      
      public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         DataStreamSource<Event> dataStreamSource = env.addSource(new ClickSource());
      
         // 创建一个watermark策略
         WatermarkStrategy<Event> watermarkStrategy = WatermarkStrategy
                 .<Event>forMonotonousTimestamps()
                 .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                     @Override
                     public long extractTimestamp(Event event, long l) {
                         return event.timestamp;
                     }
                 });
      
         dataStreamSource
                 .assignTimestampsAndWatermarks(watermarkStrategy)
                 .keyBy(event -> event.url)
                 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                 .trigger(new MyTrigger())
                 .process(new WindowsResult())
                 .print();
      
         env.execute();
      }
      
      public static class WindowsResult extends ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow> {
         @Override
         public void process(String s, Context context, Iterable<Event> iterable, Collector<UrlViewCount> collector) throws Exception {
             collector.collect(new UrlViewCount(
                     s,
                     iterable.spliterator().getExactSizeIfKnown(),// 获取迭代器中的元素个数
                     context.window().getStart(),
                     context.window().getEnd()
             ));
         }
      }
      
      /**
      * 10秒一个窗口 每一秒执行一次
      *  CONTINUE(继续):什么都不做
      *  FIRE(触发):触发计算,输出结果
      *  PURGE(清除):清空窗口中的所有数据,销毁窗口
      *  FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口
      */
      public static class MyTrigger extends Trigger<Event, TimeWindow>{
         // 每一个元素到来都会调用本方法
         @Override
         public TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
             ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
             if (isFirstEvent.value() == null) {//如果是窗口的第一个元素则是null 因为有clear方法 以及下面的代码:update(true)
                 for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 1000L) {
                     triggerContext.registerEventTimeTimer(i);
                 }
                 isFirstEvent.update(true);
             }
             return TriggerResult.CONTINUE;
         }
      
         //当注册的处理时间定时器触发时,将调用这个方法
         @Override
         public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
             return TriggerResult.FIRE;
         }
      
         //当注册的事件时间定时器触发时,将调用这个方法
         @Override
         public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
             return TriggerResult.FIRE;
         }
      
         // 当窗口关闭时运行,一般用来清理状态
         @Override
         public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
             ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
             isFirstEvent.clear();
         }
      }
      }
      
  • .evitor() —— 移除器

    • 定义移除某些数据的逻辑 ,两个方法:
    • evictBefore():执行窗口函数之前的移除数据操作(默认 大部分)
    • evictAfter() :执行窗口函数之后的移除数据操作
  • .allowedLateness()
    • 允许处理迟到的数据:默认0,这个时间内的数据会被再分配给窗口并调用窗口函数
  • .sideOutputLateData()
    • 将迟到的数据放入侧输出流
    • .getSideOutput() 获取侧输出流 ```java //使用侧输出流案例:允许数据迟到1分钟 晚于一分钟的数据放入侧输出流 OutputTag outputTag = new OutputTag(“late”) {};

SingleOutputStreamOperator sumStream = dataStream.keyBy(“id”) .timeWindow(Time.seconds(15)) //.trigger() //.evictor() // 一分钟内的数据会再次调用窗口的函数,一分钟后的数据会侧输出 .allowedLateness(Time.minutes(1)) .sideOutputLateData(outputTag) .sum(“temperature”);

sumStream.getSideOutput(outputTag).print(“late”);

还需要把侧输出流和主流的sum结果数据 进行相同sum逻辑。


---

<a name="YcieJ"></a>
### windows的一些底层问题
<a name="JDbdn"></a>
#### 滑动窗口是通过复制来实现的
滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相关的窗口中。例如,如果您每隔 15 分钟就有 24 小时的滑动窗口,则每个事件将被复制到 4 * 24 = 96 个窗口中。
<a name="S6VvN"></a>
#### 时间窗口会和时间对齐
假设:一个小时的时间窗口、并在 12:05 开始运行您的应用程序,并不意味着第一个窗口将在 1:05 关闭。<br />因为:第一个窗口将长 55 分钟,并在 1:00 关闭。<br />详见 下面【窗口起始点、偏移量】
<a name="KIPRQ"></a>
#### 空的时间窗口不会输出结果
事件会触发窗口的创建。换句话说,如果在特定的窗口内没有事件,就不会有窗口,就不会有输出结果。
<a name="sfrpK"></a>
### 窗口起始点、偏移量
<a name="WTfDy"></a>
#### window的起始点
滚动窗口的窗口是如何分配的?<br /> --TumblingEventTimeWindows类,里有一个assignWindows方法,用途是分配窗口。方法里又调用了getWindowStartWithOffset方法,获取窗口的开始时间。

- 本质是:
   - 如果offset=0.则窗口开始时间=在timestamp前边,且最近的一个窗口长度的整倍数
   - 如果offset不是0,则窗口开始时间=在timestamp前边,且最近的一个(窗口长度的整倍数+offset)
```java
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
    if (timestamp > Long.MIN_VALUE) {
        // Long.MIN_VALUE is currently assigned when no timestamp is present
        long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
        return Collections.singletonList(new TimeWindow(start, start + size));
    } else {
        throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                "'DataStream.assignTimestampsAndWatermarks(...)'?");
    }
}

/**
     * Method to get the window start for a timestamp.
     *
     * @param timestamp epoch millisecond to get the window start.
     * @param offset The offset which window start would be shifted by.
     * @param windowSize The size of the generated windows.
     * @return window start
     */
    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

偏移量offset如何使用

举个栗子: 现在时间10、offset2、windowSize5 :10 -(10-2+5)% 5 = 7

  • 一般用来处理不同时区的时间
    • 因为北京时间比格林尼治时间早8小时,如果窗口长度为一天。
      • 期望窗口为(北京时间0点-0点)
      • 实际窗口时间为(格林尼治0点-0点)(也就是北京时间8点-8点)。这里要用offset。
        //北京时间偏移到格林尼治时间
        //窗口的开始时间从 北京8点调整到北京0点,需要 -8
        .window(TumblingEventTimeWindows.of(Time.days(1),Time.hours(-8)))
        

一些个人问题

  • keyBy(“ID”).timeWindow(Time.Second(5)).min(“temperature”);
  • sensor_1的窗口:195-210

image.png

  • 上述代码+图片中 按照id hash,hash分区后 多个id可能同时存在于一个分区,那么每一个分区一个watermark 还是每一个key一个watermar?
    • 一个分区一个watermark!
  • 上述代码+图片中 为什么一个key的watermark 到期,所有key的数据都会计算输出?
    • 首先:问题表述的不对,不是【一个key的watermark 到期】,而是【一个分区的watermark 到期】。因为这里设置了并行度是1。
    • 其次:图片中输入的第二个字段-时间,只有sensor_1这个key的时间最小,所以会以此作为窗口开始时间的里程碑,去寻找窗口开始时间
  • 如果并行度大于1呢?
    • 根据 滚动窗口,所有分区的窗口结束时间相同的原理:
    • 如果并行度大于1,那么所有分区的watermark都到窗口结束时间才会计算本窗口并输出!