SparkStreaming实时计算框架(二) - 图1

  1. 将当前长度为3的时间窗口中的所有数据元素根据key进行 合并。统计前三秒不同单词出现的次数
  2. irs
  3. val ssc=new StreamingContest(sc.Seconds(1))
  4. val lines=ssc.socketTextStream("localhost",8888)
  5. val words=lines.flatMap(_.split(" "))
  6. val pairs=wprds.map(word=>(word,1))
  7. val windowWords=pairs.reduceByKeyAndWindow((a:int,b:int)=>a+b,Seconds(3),Seconds(1))
  8. windowWords.print()
  9. ssc.start()
  10. 开启监听窗口
  11. nc -l 8888
  12. 设置窗口长度为3s,滑动时间间隔为1s
  13. val windowWords=words.window(Seconds(3),Seconds(1))

每隔10秒钟统计最近60秒的热点热搜词的搜索频率

val lines=ssc.socketTextStream("localhost",8888)
val words=lines.flatMap(_.split(" "))
//窗口长度 滑动时间间隔
val windowWords=words.map(x=>x._1).reduceByKeyAndWindow((a:Int,b:Int)=>a+b),Seconds(60),Seconds(10)
2.排序 DStream operation :transfrom
val hottestWords=windowWords.transform(rdd={rdd.sort(_.2,false)})
//取前十打印出来
hottestWords.foreachRDD(_.take(10).foreach(println))
ssc.start()

2.DStream输出操作

2.1 保存DStream中的内容到文件

1.saveAsTextFiles:以文本形式保存,保存到任何文件系统
2.saveAsObjectFiles:序列化格式
3.saveAsHadoopFiles:以文件形式保存到HDFS上
2.2 文件名前缀 (prefix)&后缀(suffix)
image.png

1.将输出的内容保存到hdfs中
lines.saveSaTextFiles("hdfs://le/saveAsTextFile",'txt');
prefix:前缀
suffix:后缀

网站热词排名
image.png
image.png
image.png
image.png