需求:筛选出 省份中 点击率前三的广告
数据源 时间戳 省份 城市 用户 广告
object Spark24_RDD_Req {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("R17")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("datas/agent.log")
// TODO 筛选出 省份中 点击率前三的广告
//时间戳 省份 城市 用户 广告
// 1. 对原始数据结构进行转换 ==> ((省份,广告),1)
val mapRDD: RDD[((String, String), Int)] = rdd.map(lines => {
val line: Array[String] = lines.split(" ")
((line(1), line(4)), 1)
})
// 2. 对相同key 进行聚合 ==> ((省份,广告), sum )
val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey((a, b) => a + b)
// 3. 在对数据进行转换 ==> (省份,(广告,sum))
val mapTwoRDD: RDD[(String, (String, Int))] = reduceRDD.map(item => (item._1._1, (item._1._2, item._2)))
// 4. 分组 ==> (省份,[(广告,sum) 。。。。])
val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapTwoRDD.groupByKey()
// 5. 分组内排序,选出各自分组的前三
val resRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues(
//Iterable 无法排序 必须转成 List/Array 才能排序
items => items.toList.sortBy(item => item._2)(Ordering.Int.reverse).take(3)
)
resRDD.collect().foreach(println)
sc.stop()
}
}