理解窗口的两个关键概念,窗口⻓度(window length)滑动间隔(slide interval)。 窗口函数会把Original DStream 的若干批次的数据合并成为一个新的带窗口的(windowed)DStream。其中窗口⻓度即每次生成Original DStream 需合并的Original DStream 个数。滑动间隔即合并的Original DStream 的时间间隔。如下图,窗口⻓度为3,滑动间隔为2,注意这两个值必须是Original DStream批处理时间间隔的倍数
Spark Streaming 窗口函数 - 图1

window

  1. # 根据窗口⻓度和窗口移动速率合并原始DStream 生成新 DStream。每 2 秒生成一个窗口⻓度为 5 秒的
  2. Dstream
  3. val windowedDstream = dstream.countByWindow(Seconds( 5 ), Seconds( 2))

countByWindow

  1. # 返回指定⻓度窗口中的元素个数
  2. # 每 2 秒统计一次近 5 秒⻓度时间窗口的 DStream 中元素的个数
  3. val windowedDstream = dstream.countByWindow(Seconds( 5 ), Seconds( 2))

reduceByWindow(func, windowLength, slideInterval)

  1. #对设定窗口的 DStream 做 reduce 操作,类似 RDD 的 reduce 操作,只是增加了时间窗口维度。
  2. #每 2 秒合并一次近 5 秒⻓度时间窗口的 DStream 中元素用“-”分隔
  3. val windowedDstream = dstream.reduceByWindow(_ + "-" + _, Seconds( 5 ), Seconds( 2))

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

  1. #根据 Key 和 Window 来做 Reduce 聚合操作,在上述 reduceByWindow 的基础上增加了 Key 维度,
  2. func 是相同 Key value 值的聚合操作函数。数据源的 DStream 中的元素格式必须为 (k, v) 形式,
  3. windowLength slideInterval 同样是用于确定一个窗口 Dstream 作为数据源。numTasks 是一个
  4. 可选的并发数参数。
  5. # 每 2 秒根据 Key 聚合一次窗口⻓度为 5 的 DStream 中元素,下例中聚合的方式为 value 相加。
  6. val windowedDstream = pairsDstream.reduceByKeyAndWindow((a:Int , b:Int) $ (a + b)
  7. , Seconds(5) , Seconds( 2 ))

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

  1. # 这个方法比上一个多传入一个函数 invFunc。func 是 value 值的聚合操作函数,在数据流入的时候执行
  2. 这个操作。invFunc 是在数据流出窗口的范围后执行的操作。
  3. # 每 2 秒根据 Key 聚合一次窗口⻓度为 5 的 DStream 中元素,聚合的方式为 value 相加。
  4. invFunc:假设 invFunc 的参数如下例为 a b,那么 a 是上个 window 经过 func 操作后的结果,
  5. b 为此次 window 与上次 window 在时间上交叉的元素经过 func 操作后结果。
  6. val windowedDstream = pairsDstream.reduceByKeyAndWindow((a: Int, b:Int )
  7. $ (a + b) , (a:Int, b: Int) $ (a - b) , Seconds(5) , Seconds( 2 ))

countByValueAndWindow(windowLength, slideInterval, [numTasks])

  1. #统计时间窗口中元素值相同的元素个数,类似于 RDD 的 countByValue 操作,在这个基础上增加了时间窗口
  2. 维度。同样,数据源的 DStream 中的元素格式必须为 (k, v) 形式,返回的 DStream 格式为 (K,Long)。
  3. # 每 2 秒根据 Key 聚合一次窗口⻓度为 5 的 DStream 中元素,下例中聚合的方式为 value 相加
  4. val windowedDstream = pairsDstream.countByValueAndWindow(Seconds( 5 ), Seconds( 2))