将当前长度为3的时间窗口中的所有数据元素根据key进行 合并。统计前三秒不同单词出现的次数
irs
val ssc=new StreamingContest(sc.Seconds(1))
val lines=ssc.socketTextStream("localhost",8888)
val words=lines.flatMap(_.split(" "))
val pairs=wprds.map(word=>(word,1))
val windowWords=pairs.reduceByKeyAndWindow((a:int,b:int)=>a+b,Seconds(3),Seconds(1))
windowWords.print()
ssc.start()
开启监听窗口
nc -l 8888
设置窗口长度为3s,滑动时间间隔为1s
val windowWords=words.window(Seconds(3),Seconds(1))
每隔10秒钟统计最近60秒的热点热搜词的搜索频率
val lines=ssc.socketTextStream("localhost",8888)
val words=lines.flatMap(_.split(" "))
//窗口长度 滑动时间间隔
val windowWords=words.map(x=>x._1).reduceByKeyAndWindow((a:Int,b:Int)=>a+b),Seconds(60),Seconds(10)
2.排序 DStream operation :transfrom
val hottestWords=windowWords.transform(rdd={rdd.sort(_.2,false)})
//取前十打印出来
hottestWords.foreachRDD(_.take(10).foreach(println))
ssc.start()
2.DStream输出操作
2.1 保存DStream中的内容到文件
1.saveAsTextFiles:以文本形式保存,保存到任何文件系统
2.saveAsObjectFiles:序列化格式
3.saveAsHadoopFiles:以文件形式保存到HDFS上
2.2 文件名前缀 (prefix)&后缀(suffix)
1.将输出的内容保存到hdfs中
lines.saveSaTextFiles("hdfs://le/saveAsTextFile",'txt');
prefix:前缀
suffix:后缀
网站热词排名