第一章.RDD编程练习

1.需求一:每个省份点击量最多的三个广告

数据准备:时间戳,省份,城市,用户,广告,中间字段使用空格分隔

agent.log

  1. package com.atguigu.spark.demo
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * 需求一:统计每个省份点击量最多的三个广告
  5. * 数据准备:时间戳,省份,城市,用户,广告,中间字段使用空格分隔
  6. */
  7. object demo1 {
  8. def main(args: Array[String]): Unit = {
  9. //创建SparkContext对象
  10. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  11. //读取数据
  12. val datas = sc.textFile("datas/agent.log")
  13. //列裁剪[广告,省份],去重,过滤
  14. val selectRdd = datas.map(line => {
  15. val arr = line.split(" ")
  16. val province = arr(1)
  17. val adid = arr.last
  18. ((province, adid), 1)
  19. })
  20. //按照省份+广告分组
  21. //统计每个省份,每个广告的点击次数
  22. val provinceAdRdd = selectRdd.reduceByKey(_ + _)
  23. //按照省份分组
  24. val provinceRdd = provinceAdRdd.groupBy({
  25. case ((province, adid), num) => province
  26. })
  27. //对每个省份的所有广告数据排序取前三
  28. val top3Rdd = provinceRdd.map(x => {
  29. val top3 = x._2.toList.sortBy({
  30. case ((province, adid), num) => num
  31. }).reverse
  32. .take(3).map({
  33. case ((province, adid), num) => (adid, num)
  34. })
  35. (x._1, top3)
  36. })
  37. //结果展示
  38. top3Rdd.collect().toList.foreach(println(_))
  39. }
  40. }

$11[Spark练习] - 图1

第二章.SparkCore实战

1.数据准备

user_visit_action.txt

$11[Spark练习] - 图2

2.需求一: Top10热门品类(第一种实现)

  • SQL分析
  1. select 点击品类id,count(1)
  2. from T where 点击品类id!='-1'
  3. group by 点击品类id
  4. left join
  5. select 下单品类id,count(1)
  6. from T where 下单品类id != 'null'
  7. group by 下单品类id
  8. left join
  9. select 支付品类id,count(1)
  10. from T where 支付品类id != 'null'
  11. group by 支付品类id
  • 代码实现
  1. package com.atguigu.spark.demo
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * 获取top10热门品类(第一种实现)
  5. * 根据品类id的点击数,下单数,支付数来确定
  6. */
  7. object demo2 {
  8. def main(args: Array[String]): Unit = {
  9. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  10. //1.读取数据
  11. val source = sc.textFile("datas/user_visit_action.txt")
  12. //2.过滤掉搜索数据
  13. val filterSource = source.filter(line => line.split("_")(5) == "null")
  14. //3.列裁剪[点击品类id,下单品类id,支付品类id]
  15. val selectSource = filterSource.map(line => {
  16. val arr = line.split("_")
  17. //点击品类id
  18. val clickid = arr(6)
  19. //下单品类id
  20. val orderids = arr(8)
  21. //支付品类id
  22. val payids = arr(10)
  23. (clickid, orderids, payids)
  24. })
  25. //4.统计品类点击次数
  26. //4.1过滤出点击数据
  27. val clickRdd = selectSource.filter({
  28. case (clickid, orderids, payids) => clickid != "-1"
  29. })
  30. //4.2统计出每个品类的点击次数
  31. val clickNumRdd = clickRdd.map {
  32. case (clickid, orderids, payids) => (clickid, 1)
  33. }.reduceByKey(_ + _)
  34. //5.统计品类下单次数
  35. //5.1过滤出下单数据
  36. val orderRdd = selectSource.filter({
  37. case (clickid, orderids, payids) => orderids != "null"
  38. })
  39. //5.2切割下单品类id+压平
  40. val orderExpload = orderRdd.flatMap({
  41. case (clickid, orderids, payids) => {
  42. val ss = orderids.split(",")
  43. ss.map(id => (id, 1))
  44. }
  45. })
  46. //5.3统计下单次数
  47. val orderNumRdd = orderExpload.reduceByKey(_ + _)
  48. //6.统计品类支付次数
  49. //6.1过滤出支付数据
  50. val payRdd = selectSource.filter({
  51. case (clickid, orderids, payids) => payids != "null"
  52. })
  53. //6.2切割支付品类id+压平
  54. val payExploadRdd = payRdd.flatMap({
  55. case (clickid, orderids, payids) => {
  56. payids.split(",").map(id => (id, 1))
  57. }
  58. })
  59. //6.3统计支付次数
  60. val payNumRdd = payExploadRdd.reduceByKey(_ + _)
  61. //7.三者join得到每个品类的点击次数,下单次数,支付次数
  62. val totalRdd = clickNumRdd.leftOuterJoin(orderNumRdd).leftOuterJoin(payNumRdd)
  63. //8.按照点击次数,下单次数,支付次数排序取前10
  64. val totalNumRdd = totalRdd.map({
  65. case (id, ((clickNum, orderNum), paynum)) => (id, clickNum, orderNum.getOrElse(0), paynum.getOrElse(0))
  66. })
  67. val top10 = totalNumRdd.sortBy({
  68. case (id, clickNum, orderNum, payNum) => (clickNum, orderNum, payNum)
  69. }, false).take(10)
  70. //9.结果展示
  71. top10.foreach(println(_))
  72. }
  73. }

$11[Spark练习] - 图3

共产生了4次shuffle,有5个stage,需要进行优化

$11[Spark练习] - 图4

3.需求一: Top10热门品类(第二种实现)

  • SQL分析

$11[Spark练习] - 图5

  • 代码实现
  1. package com.atguigu.spark.demo
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. /**
  5. * 获取top10热门品类(第二种实现方式)
  6. */
  7. object demo3 {
  8. def main(args: Array[String]): Unit = {
  9. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  10. //1.读取数据
  11. val source = sc.textFile("datas/user_visit_action.txt")
  12. //2.过滤(过滤掉搜索数据)
  13. val filterSource = source.filter(line => {
  14. line.split("_")(5) == "null"
  15. })
  16. //3.切割ids,识别行为+压平
  17. val exploadRdd:RDD[(String,(Int,Int,Int))] = filterSource.flatMap(line => {
  18. val arr = line.split("_")
  19. val clickid = arr(6)
  20. val orderids = arr(8)
  21. val payids = arr(10)
  22. //点击行为
  23. if (clickid != "-1") {
  24. (clickid, (1, 0, 0)) :: Nil
  25. }
  26. //下单行为
  27. else if (orderids != "null") {
  28. orderids.split(",").map(id => (id, (0, 1, 0)))
  29. }
  30. //支付行为
  31. else {
  32. payids.split(",").map(id => (id, (0, 0, 1)))
  33. }
  34. })
  35. //4.分组聚合
  36. val numRdd = exploadRdd.reduceByKey((agg, curr) => {
  37. (agg._1 + curr._1, agg._2 + curr._2, agg._3 + curr._3)
  38. })
  39. //5.排序取前十
  40. val top10 = numRdd.sortBy(_._2, false).take(10)
  41. //6.结果展示
  42. top10.foreach(println(_))
  43. Thread.sleep(1000000)
  44. }
  45. }

$11[Spark练习] - 图6

共产生了两次shuffle,有3个stage

$11[Spark练习] - 图7

4.需求一: Top10热门品类(第三种实现)

使用累加器代替第二种实现方式的reduceByKey,其余代码不变

  • 自定义累加器
  1. package com.atguigu.spark.demo
  2. import org.apache.spark.util.AccumulatorV2
  3. import scala.collection.mutable
  4. class Top10Accumulator extends AccumulatorV2[(String,(Int,Int,Int)),mutable.Map[String,(Int,Int,Int)]]{
  5. //创建一个中间结果容器
  6. val map = mutable.Map[String, (Int, Int, Int)]()
  7. //判断累加器是否为空
  8. override def isZero: Boolean = map.isEmpty
  9. //复制新的累加器
  10. override def copy(): AccumulatorV2[(String, (Int, Int, Int)), mutable.Map[String, (Int, Int, Int)]] = new Top10Accumulator
  11. //重置累加器
  12. override def reset(): Unit = map.clear()
  13. //在每个分区中累加元素
  14. override def add(v: (String, (Int, Int, Int))): Unit = {
  15. //获取当前品类之前的统计次数
  16. val beforeCount = map.getOrElse(v._1, (0, 0, 0))
  17. //统计总次数
  18. val totalCount = (beforeCount._1 + v._2._1, beforeCount._2 + v._2._2, beforeCount._3 + v._2._3)
  19. map.put(v._1,totalCount)
  20. }
  21. //在Driver中对task结果汇总
  22. override def merge(other: AccumulatorV2[(String, (Int, Int, Int)), mutable.Map[String, (Int, Int, Int)]]): Unit = {
  23. //获取task的累加结果
  24. val taskResult = other.value
  25. //将Driver的中间结果与当前task统计结果合并
  26. val totalList = taskResult.toList ++ map.toList
  27. //根据品类id分组
  28. val groupedMap = totalList.groupBy(_._1)
  29. //统计每个品类的总次数
  30. val result = groupedMap.map(x => {
  31. x._2.reduce((agg, curr) => {
  32. (agg._1, (agg._2._1 + curr._2._1, agg._2._2 + curr._2._2, agg._2._3 + curr._2._3))
  33. })
  34. })
  35. map.++=(result)
  36. }
  37. override def value: mutable.Map[String, (Int, Int, Int)] = map
  38. }
  • 代码实现
  1. package com.atguigu.spark.demo
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.rdd.RDD
  4. /**
  5. * 获取top10热门品类(第三种实现方式)
  6. * 使用累加器
  7. */
  8. object demo4 {
  9. def main(args: Array[String]): Unit = {
  10. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  11. //注册累加器
  12. val acc = new Top10Accumulator
  13. sc.register(acc,"top10")
  14. //1.读取数据
  15. val source = sc.textFile("datas/user_visit_action.txt")
  16. //2.过滤(过滤掉搜索数据)
  17. val filterSource = source.filter(line => {
  18. line.split("_")(5) == "null"
  19. })
  20. //3.切割ids,识别行为+压平
  21. val exploadRdd:RDD[(String,(Int,Int,Int))] = filterSource.flatMap(line => {
  22. val arr = line.split("_")
  23. val clickid = arr(6)
  24. val orderids = arr(8)
  25. val payids = arr(10)
  26. //点击行为
  27. if (clickid != "-1") {
  28. (clickid, (1, 0, 0)) :: Nil
  29. }
  30. //下单行为
  31. else if (orderids != "null") {
  32. orderids.split(",").map(id => (id, (0, 1, 0)))
  33. }
  34. //支付行为
  35. else {
  36. payids.split(",").map(id => (id, (0, 0, 1)))
  37. }
  38. })
  39. exploadRdd.foreach(x=>acc.add(x))
  40. //获取所有品类的累加结果
  41. val map = acc.value
  42. map.toList.sortBy(_._2).reverse.take(10).foreach(println(_))
  43. Thread.sleep(1000000)
  44. }
  45. }

$11[Spark练习] - 图8

没有产生一次shuffle

$11[Spark练习] - 图9

5.需求二: Top10热门品类中每个品类的Top10活跃Session统计

$11[Spark练习] - 图10

  1. package com.atguigu.spark.demo
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.rdd.RDD
  4. /**
  5. * Top10热门品类中每个品类的Top10活跃Session统计
  6. *
  7. */
  8. object demo5 {
  9. def main(args: Array[String]): Unit = {
  10. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  11. //1.获取top10热门品类id
  12. //注册累加器
  13. val acc = new Top10Accumulator
  14. sc.register(acc,"top10")
  15. //读取数据
  16. val source = sc.textFile("datas/user_visit_action.txt")
  17. //过滤(过滤掉搜索数据)
  18. val filterSource = source.filter(line => {
  19. line.split("_")(5) == "null"
  20. })
  21. //切割ids,识别行为+压平
  22. val exploadRdd:RDD[(String,(Int,Int,Int))] = filterSource.flatMap(line => {
  23. val arr = line.split("_")
  24. val clickid = arr(6)
  25. val orderids = arr(8)
  26. val payids = arr(10)
  27. //点击行为
  28. if (clickid != "-1") {
  29. (clickid, (1, 0, 0)) :: Nil
  30. }
  31. //下单行为
  32. else if (orderids != "null") {
  33. orderids.split(",").map(id => (id, (0, 1, 0)))
  34. }
  35. //支付行为
  36. else {
  37. payids.split(",").map(id => (id, (0, 0, 1)))
  38. }
  39. })
  40. exploadRdd.foreach(x=>acc.add(x))
  41. //获取所有品类的累加结果
  42. val map = acc.value
  43. val top10List = map.toList.sortBy(_._2).reverse.take(10)
  44. val top10ids = top10List.map(_._1)
  45. //广播热门品类id
  46. val bc = sc.broadcast(top10ids)
  47. //2.过滤[只保留热门品类数据,只需要点击数据]
  48. val filterRdd = source.filter(line => {
  49. val arr = line.split("_")
  50. val clickid = arr(6)
  51. clickid != "-1" && bc.value.contains(clickid)
  52. })
  53. //3.列裁剪[点击的品类id,sessionid]
  54. val sessionRdd = filterRdd.map(line => {
  55. val arr = line.split("_")
  56. ((arr(6), arr(2)), 1)
  57. })
  58. //4.统计每个热门品类 每个session的次数
  59. val sessionClickRdd = sessionRdd.reduceByKey(_+_)
  60. //5.按照热门品类id分组
  61. val classRdd = sessionClickRdd.groupBy({
  62. case ((id, sessionid), num) => id
  63. })
  64. //6.对每个品类的所有session数据排序取前十
  65. val resRdd = classRdd.map(x => {
  66. //获取前十session
  67. val top10Session = x._2.toList.sortBy(_._2).reverse.take(10)
  68. val top10 = top10Session.map( {
  69. case ((id, session), num) => (session, num)
  70. })
  71. (x._1, top10)
  72. })
  73. //7.结果展示
  74. resRdd.collect().foreach(println(_))
  75. }
  76. }

$11[Spark练习] - 图11

6.需求三: 页面单跳转换率统计

  1. package com.atguigu.spark.demo
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * 页面单跳转化率统计
  5. */
  6. object demo6 {
  7. def main(args: Array[String]): Unit = {
  8. val list = List(1,2,3,4,5,6,7)
  9. //获取待求取页面的跳转数据
  10. val pagelist = list.init.zip(list.tail)
  11. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  12. //1.读取数据
  13. val source = sc.textFile("datas/user_visit_action.txt")
  14. //2.过滤数据(只要点击数据)
  15. val clickRdd = source.filter(line => line.split("_")(6) != "-1")
  16. //3.列裁剪[页面id,session,时间]
  17. val selectRdd = clickRdd.map(line => {
  18. val arr = line.split("_")
  19. val userid = arr(1)
  20. val pageid = arr(3).toInt
  21. val sessionid = arr(2)
  22. val time = arr(4)
  23. (userid, sessionid, pageid, time)
  24. })
  25. //4.获取分母
  26. //4.1过滤出1,2,3,4,5,6页面的数据
  27. val fmPageRdd = selectRdd.filter {
  28. case (userid, sessionid, pageid, time) => list.init.contains(pageid)
  29. }
  30. //4.2转换数据类型:页面id->1
  31. val fmMapRdd = fmPageRdd.map(x => (x._3, 1))
  32. //4.3统计页面访问总次数
  33. val fmMap = fmMapRdd.reduceByKey(_ + _).collect().toMap
  34. //5.获取分子[1->2]
  35. //5.1按照用户和session分组
  36. val sessionRdd = selectRdd.groupBy({
  37. case (userid, sessionid, pageid, time) => (userid, sessionid)
  38. })
  39. val fromToPageRdd = sessionRdd.flatMap(x => {
  40. //5.2对每个用户每个session的数据按照时间进行排序(升序)
  41. val sortedList = x._2.toList.sortBy(_._4)
  42. //5.3.滑窗(得到从哪个页面跳到哪个页面)
  43. val slidingList = sortedList.sliding(2)
  44. val fromToPageList = slidingList.map(x => {
  45. val fromPage = x.head._3
  46. val toPage = x.last._3
  47. ((fromPage, toPage), 1)
  48. })
  49. //5.4.过滤掉不需要的跳转数据
  50. fromToPageList.filter(x => {
  51. pagelist.contains(x._1)
  52. })
  53. })
  54. //5.6统计页面跳转的总次数
  55. val fzMap = fromToPageRdd.reduceByKey(_ + _).collect().toMap
  56. //求得转化率
  57. pagelist.foreach({
  58. case(fromPage,toPage)=>
  59. //获取fromPage页面访问总次数
  60. val fm = fmMap.getOrElse(fromPage, 1)
  61. val fz = fzMap.getOrElse((fromPage,toPage),0)
  62. println(s"从${fromPage}跳到${toPage}的转化率=${(fz.toDouble*100)/fm}%")
  63. })
  64. }
  65. }

$11[Spark练习] - 图12

7.需求四: 获取用户每次会话的行为轨迹

  1. - 数据
  2. "1001","2020-09-10 10:21:21","home.html",
  3. "1001","2020-09-10 10:28:10","good_list.html",
  4. "1001","2020-09-10 10:35:05","good_detail.html",
  5. "1001","2020-09-10 10:42:55","cart.html",
  6. "1001","2020-09-10 11:35:21","home.html",
  7. "1001","2020-09-10 11:36:10","cart.html",
  8. "1001","2020-09-10 11:38:12","trade.html",
  9. "1001","2020-09-10 11:40:00","payment.html",
  10. "1002","2020-09-10 09:40:00","home.html",
  11. "1002","2020-09-10 09:41:00","mine.html",
  12. "1002","2020-09-10 09:42:00","favor.html",
  13. "1003","2020-09-10 13:10:00","home.html",
  14. "1003","2020-09-10 13:15:00","search.html"
  1. package com.atguigu.spark.demo
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import java.text.SimpleDateFormat
  4. import java.util.UUID
  5. /**
  6. * 需求:获取用户每次会话的行为轨迹
  7. */
  8. object demo7 {
  9. def main(args: Array[String]): Unit = {
  10. val list = List[(String,String,String)](
  11. ("1001","2020-09-10 10:21:21","home.html"),
  12. ("1001","2020-09-10 10:28:10","good_list.html"),
  13. ("1001","2020-09-10 10:35:05","good_detail.html"),
  14. ("1001","2020-09-10 10:42:55","cart.html"),
  15. ("1001","2020-09-10 11:35:21","home.html"),
  16. ("1001","2020-09-10 11:36:10","cart.html"),
  17. ("1001","2020-09-10 11:38:12","trade.html"),
  18. ("1001","2020-09-10 11:40:00","payment.html"),
  19. ("1002","2020-09-10 09:40:00","home.html"),
  20. ("1002","2020-09-10 09:41:00","mine.html"),
  21. ("1002","2020-09-10 09:42:00","favor.html"),
  22. ("1003","2020-09-10 13:10:00","home.html"),
  23. ("1003","2020-09-10 13:15:00","search.html")
  24. )
  25. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  26. val rdd = sc.parallelize(list)
  27. //1.数据类型转换
  28. val rdd2 = rdd.map {
  29. case (userid, timestr, page) =>
  30. val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  31. val time = formatter.parse(timestr).getTime
  32. UserAnalysis(userid, time, page)
  33. }
  34. //rdd2.foreach(println(_))
  35. //2.按照用户分组
  36. val rdd3 = rdd2.groupBy(_.userid)
  37. //3.对每个用户的所有数据按照时间排序
  38. val rdd4 = rdd3.flatMap(x => {
  39. //4.滑窗[数据两两比较,如果时间小于等于半小时,属于一次会话,时间大于半小时,属于不同会话]
  40. val slidingList = x._2.toList.sortBy(_.time).sliding(2)
  41. slidingList.foreach(list => {
  42. val headElement = list.head
  43. val lastElement = list.last
  44. if (lastElement.time - headElement.time <= 30 * 60 * 1000) {
  45. lastElement.session = headElement.session
  46. lastElement.step = headElement.step + 1
  47. }
  48. })
  49. x._2
  50. })
  51. //5.结果展示
  52. rdd4.collect().foreach(println(_))
  53. }
  54. }
  55. case class UserAnalysis(userid:String,time:Long,page:String,var session:String=UUID.randomUUID().toString,var step:Int=1)

$11[Spark练习] - 图13