需求:筛选出 省份中 点击率前三的广告

数据源 时间戳 省份 城市 用户 广告

  1. object Spark24_RDD_Req {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("R17")
  4. val sc = new SparkContext(conf)
  5. val rdd: RDD[String] = sc.textFile("datas/agent.log")
  6. // TODO 筛选出 省份中 点击率前三的广告
  7. //时间戳 省份 城市 用户 广告
  8. // 1. 对原始数据结构进行转换 ==> ((省份,广告),1)
  9. val mapRDD: RDD[((String, String), Int)] = rdd.map(lines => {
  10. val line: Array[String] = lines.split(" ")
  11. ((line(1), line(4)), 1)
  12. })
  13. // 2. 对相同key 进行聚合 ==> ((省份,广告), sum )
  14. val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey((a, b) => a + b)
  15. // 3. 在对数据进行转换 ==> (省份,(广告,sum))
  16. val mapTwoRDD: RDD[(String, (String, Int))] = reduceRDD.map(item => (item._1._1, (item._1._2, item._2)))
  17. // 4. 分组 ==> (省份,[(广告,sum) 。。。。])
  18. val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapTwoRDD.groupByKey()
  19. // 5. 分组内排序,选出各自分组的前三
  20. val resRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues(
  21. //Iterable 无法排序 必须转成 List/Array 才能排序
  22. items => items.toList.sortBy(item => item._2)(Ordering.Int.reverse).take(3)
  23. )
  24. resRDD.collect().foreach(println)
  25. sc.stop()
  26. }
  27. }

image.png