需求:每个热门品类 的 点击量 的 前十 用户

  1. object Spark05_Req2_HotCategoryTop10SessionAnalysis {
  2. def main(args: Array[String]): Unit = {
  3. // TODO : Top10热门品类
  4. val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
  5. val sc = new SparkContext(sparConf)
  6. val actionRDD = sc.textFile("datas/user_visit_action.txt")
  7. actionRDD.cache()
  8. val top10Ids: Array[String] = top10Category(actionRDD)
  9. // 1. 过滤原始数据,保留点击和前10品类ID
  10. val filterActionRDD = actionRDD.filter(
  11. action => {
  12. val datas = action.split("_")
  13. if ( datas(6) != "-1" ) {
  14. top10Ids.contains(datas(6))
  15. } else {
  16. false
  17. }
  18. }
  19. )
  20. // 2. 根据品类ID和sessionid进行点击量的统计
  21. val reduceRDD: RDD[((String, String), Int)] = filterActionRDD.map(
  22. action => {
  23. val datas = action.split("_")
  24. ((datas(6), datas(2)), 1)
  25. }
  26. ).reduceByKey(_ + _)
  27. // 3. 将统计的结果进行结构的转换
  28. // (( 品类ID,sessionId ),sum) => ( 品类ID,(sessionId, sum) )
  29. val mapRDD = reduceRDD.map{
  30. case ( (cid, sid), sum ) => {
  31. ( cid), (sid, sum) )
  32. }
  33. }
  34. // 4. 相同的品类进行分组
  35. val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()
  36. // 5. 将分组后的数据进行点击量的排序,取前10名
  37. val resultRDD = groupRDD.mapValues(
  38. iter => {
  39. iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
  40. }
  41. )
  42. resultRDD.collect().foreach(println)
  43. sc.stop()
  44. }
  45. def top10Category(actionRDD:RDD[String]) = {
  46. val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
  47. action => {
  48. val datas = action.split("_")
  49. if (datas(6) != "-1") {
  50. // 点击的场合
  51. List((datas(6), (1, 0, 0)))
  52. } else if (datas(8) != "null") {
  53. // 下单的场合
  54. val ids = datas(8).split(",")
  55. ids.map(id => (id, (0, 1, 0)))
  56. } else if (datas(10) != "null") {
  57. // 支付的场合
  58. val ids = datas(10).split(",")
  59. ids.map(id => (id, (0, 0, 1)))
  60. } else {
  61. Nil
  62. }
  63. }
  64. )
  65. val analysisRDD = flatRDD.reduceByKey(
  66. (t1, t2) => {
  67. ( t1._1+t2._1, t1._2 + t2._2, t1._3 + t2._3 )
  68. }
  69. )
  70. analysisRDD.sortBy(_._2, false).take(10).map(_._1)
  71. }
  72. }