需求

计算每个大区当天金币收入TopN的主播

背景是这样的,我们有一款直播APP,已经在很多国家上线并运营了一段时间,产品经理希望开发一个功能,topN主播排行榜,按天更新排名信息,统计的维度有多种,其中有一个维度是针对主播当天直播的金币收入进行排名。 在我们的直播平台中有大区这个概念,一个大区下面包含多个国家,不同大区的运营策略是不一样的,所以就把不同国家划分到不同大区里面,方便运营。
那这个TopN主播排行榜在统计的时候就需要分大区统计了。
针对主播每天的开播数据我们已经有了,以及直播间内用户的送礼记录也都是有的。那这样其实就可以统计主播当天的金币收入了
主播一天可能会开播多次,所以后期在统计主播当天收入的时候是需要把他当天所有直播中的金币收入都计算在内的。

数据

我们有两份数据,数据都是json格式的
video_info.log
gift_record.log
video_info.log主播的开播记录,其中包含主播的id:uid、直播间id:vid 、大区:area、视频开播时长:length、增加粉丝数量:follow等信息

  1. {
  2. "uid": "8407173251001",
  3. "vid": "14943445328940001",
  4. "area": "US",
  5. "status": "1",
  6. "start_time": "1494344544",
  7. "end_time": "1494344570",
  8. "watch_num": 101,
  9. "share_num": "21",
  10. "type": "video_info"
  11. }

gift_info.log用户送礼记录,其中包含送礼人id:uid,直播间id:vid,礼物id:good_id,金币数量:gold 等信息

  1. {
  2. "uid": "7201232141001",
  3. "vid": "14943445328940001",
  4. "good_id": "223",
  5. "gold": "10",
  6. "timestamp": 1494344574,
  7. "type": "gift_record"
  8. }

vid与uid是一一对应的

分析

基于以上两份数据,计算每个大区当天金币收入TopN的主播
其实就是按照当天主播所有开播的直播间内的收入汇总,按大区分组,统计每个大区内收入TopN的主播

1:首先获取两份数据中的核心字段,使用fastjson包解析数据
主播开播记录:主播ID:uid,直播间ID:vid,大区:area
(vid,(uid,area))
用户送礼记录:直播间ID:vid,金币数量:gold
(vid,gold)
这样的可以把这两份数据关联到一块就能获取到大区、主播、金币这些信息了,使用直播间vid进行关联

2:对用户送礼记录数据进行聚合,对相同vid的数据求和,因为一个直播间可能收到很多礼物
(vid,gold_sum)

3:把这两份数据join到一块,vid作为join的key
(vid,((uid,area),gold_sum))

4:使用map迭代join之后的数据,最后获取到uid,area,gold_sum字段,由于一个主播一天可能会开播多次,后面需要基于uid和area再做一次聚合,所以把数据转换成这种格式
uid和area是一一对应的,一个人只能属于一个大区
((uid,area),gold_sum)

5:使用reduceByKey算子对数据进行聚合
((uid,area),gold_sum_all)

6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换
因为我们要分区统计TopN,所以要根据大区分组
map:(area,(uid,gold_sum_all))
groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>

7:使用map迭代每个分组内的数据,按金币数量倒序排序,取前N个,最终输出area、topN
这个TopN其实就是把前几名主播的id还有金币数量拼接成一个字符串
(area,topN)

8:使用foreach将结果打印到控制台,多个字段使用制表符分割
area topN

代码

  1. object TopNScala {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf()
  4. conf.setAppName("BroadCastOpScala")
  5. conf.setMaster("local")
  6. val sc = new SparkContext(conf)
  7. //1:首先获取两份数据中的核心字段,使用fastjson包解析数据
  8. val videoInfoRDD = sc.textFile("D:\\test\\video_info.log")
  9. val giftRecordRDD = sc.textFile("D:\\test\\gift_record.log")
  10. //开播记录=>(vid,(uid,area))
  11. val videoInfoFieldRDD = videoInfoRDD.map(line => {
  12. val jsonObject = JSON.parseObject(line)
  13. val vid = jsonObject.getString("vid") //直播间ID
  14. val uid = jsonObject.getString("uid") //主播ID
  15. val area = jsonObject.getString("area") //大区
  16. (vid, (uid, area))
  17. })
  18. //送礼记录=>(vid,gold)
  19. val giftRecordFieldRDD = giftRecordRDD.map(line => {
  20. val jsonObject = JSON.parseObject(line)
  21. val vid = jsonObject.getString("vid")
  22. val gold = Integer.parseInt(jsonObject.getString("gold")) //金币
  23. (vid, gold)
  24. })
  25. //2:对用户送礼记录数据进行聚合,对相同vid的数据求和,因为一个直播间可能收到很多礼物,得到 (vid,gold_sum)
  26. val giftRecordFieldAggRDD = giftRecordFieldRDD.reduceByKey(_ + _)
  27. //3:把这两份数据join到一块,vid作为join的key ,得到(vid,((uid,area),gold_sum))
  28. val joinRDD = videoInfoFieldRDD.join(giftRecordFieldAggRDD)
  29. /**
  30. * 4:使用map迭代join之后的数据,最后获取到uid、area、gold_sum字段
  31. * 由于一个主播一天可能会开播多次,后面需要基于uid和area再做一次聚合,所以把数据转换成((uid,area),gold_sum)
  32. * uid和area是一一对应的,一个人只能属于大区
  33. */
  34. val joinMapRDD = joinRDD.map(tup => {
  35. //joinRDD的格式:(vid,((uid,area),gold_sum))
  36. val uid = tup._2._1._1
  37. val area = tup._2._1._2
  38. val gold_sum = tup._2._2
  39. ((uid, area), gold_sum)
  40. })
  41. /**
  42. * 5:由于一个主播一天可能会开播多次,使用reduceByKey算子对数据进行聚合=>((uid,area),gold_sum_all)
  43. */
  44. val reduceRDD = joinMapRDD.reduceByKey(_ + _)
  45. /**
  46. * 6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换(因为我们要分区统计TopN,所以要根据大区分组)
  47. * map后:(area,(uid,gold_sum_all))
  48. * groupByKey后: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>
  49. */
  50. val groupRDD = reduceRDD.map(tup => {
  51. //reduceRDD格式((uid,area),gold_sum_all)
  52. val area = tup._1._2
  53. val uid = tup._1._1
  54. val gold_sum_all = tup._2
  55. (area, (uid, gold_sum_all))
  56. }).groupByKey()
  57. /**
  58. * 7:使用map迭代每个分组内的数据,按照金币数量倒序排序,取前N个,最终输出area,topN
  59. * 这个topN其实就是把前几名主播的id还有金币数量拼接成一个字符串
  60. * (area,topN)
  61. */
  62. val top3RDD = groupRDD.map(tup => {
  63. //groupRDD的格式:(area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>)
  64. val area = tup._1
  65. /**
  66. * toList:把iterable转成list
  67. * sortBy:排序,默认是正序
  68. * reverse:反转,实现倒序效果
  69. * take(3):取前3个元素
  70. * mkString:使用指定字符把集合转成字符串
  71. * top3的格式 uid:gold_sum_all,uid:gold_sum_all,uid:gold_sum_all
  72. */
  73. val top3 = tup._2.toList.sortBy(_._2).reverse.take(3).map(tup => tup._1 + ":" + tup._2).mkString(",")
  74. (area, top3)
  75. })
  76. //8:使用foreach将结果打印到控制台,多个字段使用制表符分割 area topN
  77. top3RDD.foreach(tup => println(tup._1 + "\t" + tup._2))
  78. }
  79. }

运行结果
image.png

作业

把这个Spark任务的输入数据上传到hdfs上面,然后把结果保存到hdfs上的/data/topn目录下,针对Spark任务打jar包,使用on yarn模式提交到集群执行。

注意:针对fastjson这个依赖,有两种选择
1:把这个依赖一起打进jar包里面
2:在提交任务的时候动态指定这个依赖jar包
第二种方式后面文章再分析这个功能。