第一章.RDD编程练习
1.需求一:每个省份点击量最多的三个广告
数据准备:时间戳,省份,城市,用户,广告,中间字段使用空格分隔
package com.atguigu.spark.demoimport org.apache.spark.{SparkConf, SparkContext}/*** 需求一:统计每个省份点击量最多的三个广告* 数据准备:时间戳,省份,城市,用户,广告,中间字段使用空格分隔*/object demo1 {def main(args: Array[String]): Unit = {//创建SparkContext对象val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))//读取数据val datas = sc.textFile("datas/agent.log")//列裁剪[广告,省份],去重,过滤val selectRdd = datas.map(line => {val arr = line.split(" ")val province = arr(1)val adid = arr.last((province, adid), 1)})//按照省份+广告分组//统计每个省份,每个广告的点击次数val provinceAdRdd = selectRdd.reduceByKey(_ + _)//按照省份分组val provinceRdd = provinceAdRdd.groupBy({case ((province, adid), num) => province})//对每个省份的所有广告数据排序取前三val top3Rdd = provinceRdd.map(x => {val top3 = x._2.toList.sortBy({case ((province, adid), num) => num}).reverse.take(3).map({case ((province, adid), num) => (adid, num)})(x._1, top3)})//结果展示top3Rdd.collect().toList.foreach(println(_))}}
![$11[Spark练习] - 图1](/uploads/projects/liuye-6lcqc@gws1uf/0b9bcf023c1f31ed0ad24388cc8ade98.png)
第二章.SparkCore实战
1.数据准备
![$11[Spark练习] - 图2](/uploads/projects/liuye-6lcqc@gws1uf/0d2909774d719e8f1f634f2f7d459646.png)
2.需求一: Top10热门品类(第一种实现)
- SQL分析
select 点击品类id,count(1)from T where 点击品类id!='-1'group by 点击品类idleft joinselect 下单品类id,count(1)from T where 下单品类id != 'null'group by 下单品类idleft joinselect 支付品类id,count(1)from T where 支付品类id != 'null'group by 支付品类id
- 代码实现
package com.atguigu.spark.demoimport org.apache.spark.{SparkConf, SparkContext}/*** 获取top10热门品类(第一种实现)* 根据品类id的点击数,下单数,支付数来确定*/object demo2 {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))//1.读取数据val source = sc.textFile("datas/user_visit_action.txt")//2.过滤掉搜索数据val filterSource = source.filter(line => line.split("_")(5) == "null")//3.列裁剪[点击品类id,下单品类id,支付品类id]val selectSource = filterSource.map(line => {val arr = line.split("_")//点击品类idval clickid = arr(6)//下单品类idval orderids = arr(8)//支付品类idval payids = arr(10)(clickid, orderids, payids)})//4.统计品类点击次数//4.1过滤出点击数据val clickRdd = selectSource.filter({case (clickid, orderids, payids) => clickid != "-1"})//4.2统计出每个品类的点击次数val clickNumRdd = clickRdd.map {case (clickid, orderids, payids) => (clickid, 1)}.reduceByKey(_ + _)//5.统计品类下单次数//5.1过滤出下单数据val orderRdd = selectSource.filter({case (clickid, orderids, payids) => orderids != "null"})//5.2切割下单品类id+压平val orderExpload = orderRdd.flatMap({case (clickid, orderids, payids) => {val ss = orderids.split(",")ss.map(id => (id, 1))}})//5.3统计下单次数val orderNumRdd = orderExpload.reduceByKey(_ + _)//6.统计品类支付次数//6.1过滤出支付数据val payRdd = selectSource.filter({case (clickid, orderids, payids) => payids != "null"})//6.2切割支付品类id+压平val payExploadRdd = payRdd.flatMap({case (clickid, orderids, payids) => {payids.split(",").map(id => (id, 1))}})//6.3统计支付次数val payNumRdd = payExploadRdd.reduceByKey(_ + _)//7.三者join得到每个品类的点击次数,下单次数,支付次数val totalRdd = clickNumRdd.leftOuterJoin(orderNumRdd).leftOuterJoin(payNumRdd)//8.按照点击次数,下单次数,支付次数排序取前10val totalNumRdd = totalRdd.map({case (id, ((clickNum, orderNum), paynum)) => (id, clickNum, orderNum.getOrElse(0), paynum.getOrElse(0))})val top10 = totalNumRdd.sortBy({case (id, clickNum, orderNum, payNum) => (clickNum, orderNum, payNum)}, false).take(10)//9.结果展示top10.foreach(println(_))}}
![$11[Spark练习] - 图3](/uploads/projects/liuye-6lcqc@gws1uf/f6fd19d5347ab0b60295ac04a6757f5a.png)
共产生了4次shuffle,有5个stage,需要进行优化
![$11[Spark练习] - 图4](/uploads/projects/liuye-6lcqc@gws1uf/4791de58b5408445d4c9d53400ae00bb.png)
3.需求一: Top10热门品类(第二种实现)
- SQL分析
![$11[Spark练习] - 图5](/uploads/projects/liuye-6lcqc@gws1uf/ed3e42bbd97a9484225767df700446d1.png)
- 代码实现
package com.atguigu.spark.demoimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/*** 获取top10热门品类(第二种实现方式)*/object demo3 {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))//1.读取数据val source = sc.textFile("datas/user_visit_action.txt")//2.过滤(过滤掉搜索数据)val filterSource = source.filter(line => {line.split("_")(5) == "null"})//3.切割ids,识别行为+压平val exploadRdd:RDD[(String,(Int,Int,Int))] = filterSource.flatMap(line => {val arr = line.split("_")val clickid = arr(6)val orderids = arr(8)val payids = arr(10)//点击行为if (clickid != "-1") {(clickid, (1, 0, 0)) :: Nil}//下单行为else if (orderids != "null") {orderids.split(",").map(id => (id, (0, 1, 0)))}//支付行为else {payids.split(",").map(id => (id, (0, 0, 1)))}})//4.分组聚合val numRdd = exploadRdd.reduceByKey((agg, curr) => {(agg._1 + curr._1, agg._2 + curr._2, agg._3 + curr._3)})//5.排序取前十val top10 = numRdd.sortBy(_._2, false).take(10)//6.结果展示top10.foreach(println(_))Thread.sleep(1000000)}}
![$11[Spark练习] - 图6](/uploads/projects/liuye-6lcqc@gws1uf/6ffbf669ee4c51f02344ea7580b3c4bf.png)
共产生了两次shuffle,有3个stage
![$11[Spark练习] - 图7](/uploads/projects/liuye-6lcqc@gws1uf/947afe9819c2314272e4450fb1c01e64.png)
4.需求一: Top10热门品类(第三种实现)
使用累加器代替第二种实现方式的reduceByKey,其余代码不变
- 自定义累加器
package com.atguigu.spark.demoimport org.apache.spark.util.AccumulatorV2import scala.collection.mutableclass Top10Accumulator extends AccumulatorV2[(String,(Int,Int,Int)),mutable.Map[String,(Int,Int,Int)]]{//创建一个中间结果容器val map = mutable.Map[String, (Int, Int, Int)]()//判断累加器是否为空override def isZero: Boolean = map.isEmpty//复制新的累加器override def copy(): AccumulatorV2[(String, (Int, Int, Int)), mutable.Map[String, (Int, Int, Int)]] = new Top10Accumulator//重置累加器override def reset(): Unit = map.clear()//在每个分区中累加元素override def add(v: (String, (Int, Int, Int))): Unit = {//获取当前品类之前的统计次数val beforeCount = map.getOrElse(v._1, (0, 0, 0))//统计总次数val totalCount = (beforeCount._1 + v._2._1, beforeCount._2 + v._2._2, beforeCount._3 + v._2._3)map.put(v._1,totalCount)}//在Driver中对task结果汇总override def merge(other: AccumulatorV2[(String, (Int, Int, Int)), mutable.Map[String, (Int, Int, Int)]]): Unit = {//获取task的累加结果val taskResult = other.value//将Driver的中间结果与当前task统计结果合并val totalList = taskResult.toList ++ map.toList//根据品类id分组val groupedMap = totalList.groupBy(_._1)//统计每个品类的总次数val result = groupedMap.map(x => {x._2.reduce((agg, curr) => {(agg._1, (agg._2._1 + curr._2._1, agg._2._2 + curr._2._2, agg._2._3 + curr._2._3))})})map.++=(result)}override def value: mutable.Map[String, (Int, Int, Int)] = map}
- 代码实现
package com.atguigu.spark.demoimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDD/*** 获取top10热门品类(第三种实现方式)* 使用累加器*/object demo4 {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))//注册累加器val acc = new Top10Accumulatorsc.register(acc,"top10")//1.读取数据val source = sc.textFile("datas/user_visit_action.txt")//2.过滤(过滤掉搜索数据)val filterSource = source.filter(line => {line.split("_")(5) == "null"})//3.切割ids,识别行为+压平val exploadRdd:RDD[(String,(Int,Int,Int))] = filterSource.flatMap(line => {val arr = line.split("_")val clickid = arr(6)val orderids = arr(8)val payids = arr(10)//点击行为if (clickid != "-1") {(clickid, (1, 0, 0)) :: Nil}//下单行为else if (orderids != "null") {orderids.split(",").map(id => (id, (0, 1, 0)))}//支付行为else {payids.split(",").map(id => (id, (0, 0, 1)))}})exploadRdd.foreach(x=>acc.add(x))//获取所有品类的累加结果val map = acc.valuemap.toList.sortBy(_._2).reverse.take(10).foreach(println(_))Thread.sleep(1000000)}}
![$11[Spark练习] - 图8](/uploads/projects/liuye-6lcqc@gws1uf/07349fbea9a99cf454b765aeb9713808.png)
没有产生一次shuffle
![$11[Spark练习] - 图9](/uploads/projects/liuye-6lcqc@gws1uf/64844ecfa606dca2fb91c834e0c8075b.png)
5.需求二: Top10热门品类中每个品类的Top10活跃Session统计
![$11[Spark练习] - 图10](/uploads/projects/liuye-6lcqc@gws1uf/0f071581ff96199ee24f857159ba91d8.png)
package com.atguigu.spark.demoimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDD/*** Top10热门品类中每个品类的Top10活跃Session统计**/object demo5 {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))//1.获取top10热门品类id//注册累加器val acc = new Top10Accumulatorsc.register(acc,"top10")//读取数据val source = sc.textFile("datas/user_visit_action.txt")//过滤(过滤掉搜索数据)val filterSource = source.filter(line => {line.split("_")(5) == "null"})//切割ids,识别行为+压平val exploadRdd:RDD[(String,(Int,Int,Int))] = filterSource.flatMap(line => {val arr = line.split("_")val clickid = arr(6)val orderids = arr(8)val payids = arr(10)//点击行为if (clickid != "-1") {(clickid, (1, 0, 0)) :: Nil}//下单行为else if (orderids != "null") {orderids.split(",").map(id => (id, (0, 1, 0)))}//支付行为else {payids.split(",").map(id => (id, (0, 0, 1)))}})exploadRdd.foreach(x=>acc.add(x))//获取所有品类的累加结果val map = acc.valueval top10List = map.toList.sortBy(_._2).reverse.take(10)val top10ids = top10List.map(_._1)//广播热门品类idval bc = sc.broadcast(top10ids)//2.过滤[只保留热门品类数据,只需要点击数据]val filterRdd = source.filter(line => {val arr = line.split("_")val clickid = arr(6)clickid != "-1" && bc.value.contains(clickid)})//3.列裁剪[点击的品类id,sessionid]val sessionRdd = filterRdd.map(line => {val arr = line.split("_")((arr(6), arr(2)), 1)})//4.统计每个热门品类 每个session的次数val sessionClickRdd = sessionRdd.reduceByKey(_+_)//5.按照热门品类id分组val classRdd = sessionClickRdd.groupBy({case ((id, sessionid), num) => id})//6.对每个品类的所有session数据排序取前十val resRdd = classRdd.map(x => {//获取前十sessionval top10Session = x._2.toList.sortBy(_._2).reverse.take(10)val top10 = top10Session.map( {case ((id, session), num) => (session, num)})(x._1, top10)})//7.结果展示resRdd.collect().foreach(println(_))}}
![$11[Spark练习] - 图11](/uploads/projects/liuye-6lcqc@gws1uf/2e0f730675fd26b967afa1ae15e0251f.png)
6.需求三: 页面单跳转换率统计
package com.atguigu.spark.demoimport org.apache.spark.{SparkConf, SparkContext}/*** 页面单跳转化率统计*/object demo6 {def main(args: Array[String]): Unit = {val list = List(1,2,3,4,5,6,7)//获取待求取页面的跳转数据val pagelist = list.init.zip(list.tail)val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))//1.读取数据val source = sc.textFile("datas/user_visit_action.txt")//2.过滤数据(只要点击数据)val clickRdd = source.filter(line => line.split("_")(6) != "-1")//3.列裁剪[页面id,session,时间]val selectRdd = clickRdd.map(line => {val arr = line.split("_")val userid = arr(1)val pageid = arr(3).toIntval sessionid = arr(2)val time = arr(4)(userid, sessionid, pageid, time)})//4.获取分母//4.1过滤出1,2,3,4,5,6页面的数据val fmPageRdd = selectRdd.filter {case (userid, sessionid, pageid, time) => list.init.contains(pageid)}//4.2转换数据类型:页面id->1val fmMapRdd = fmPageRdd.map(x => (x._3, 1))//4.3统计页面访问总次数val fmMap = fmMapRdd.reduceByKey(_ + _).collect().toMap//5.获取分子[1->2]//5.1按照用户和session分组val sessionRdd = selectRdd.groupBy({case (userid, sessionid, pageid, time) => (userid, sessionid)})val fromToPageRdd = sessionRdd.flatMap(x => {//5.2对每个用户每个session的数据按照时间进行排序(升序)val sortedList = x._2.toList.sortBy(_._4)//5.3.滑窗(得到从哪个页面跳到哪个页面)val slidingList = sortedList.sliding(2)val fromToPageList = slidingList.map(x => {val fromPage = x.head._3val toPage = x.last._3((fromPage, toPage), 1)})//5.4.过滤掉不需要的跳转数据fromToPageList.filter(x => {pagelist.contains(x._1)})})//5.6统计页面跳转的总次数val fzMap = fromToPageRdd.reduceByKey(_ + _).collect().toMap//求得转化率pagelist.foreach({case(fromPage,toPage)=>//获取fromPage页面访问总次数val fm = fmMap.getOrElse(fromPage, 1)val fz = fzMap.getOrElse((fromPage,toPage),0)println(s"从${fromPage}跳到${toPage}的转化率=${(fz.toDouble*100)/fm}%")})}}
![$11[Spark练习] - 图12](/uploads/projects/liuye-6lcqc@gws1uf/9b2b51ef18070c8e173610ee783f1f09.png)
7.需求四: 获取用户每次会话的行为轨迹
- 数据"1001","2020-09-10 10:21:21","home.html","1001","2020-09-10 10:28:10","good_list.html","1001","2020-09-10 10:35:05","good_detail.html","1001","2020-09-10 10:42:55","cart.html","1001","2020-09-10 11:35:21","home.html","1001","2020-09-10 11:36:10","cart.html","1001","2020-09-10 11:38:12","trade.html","1001","2020-09-10 11:40:00","payment.html","1002","2020-09-10 09:40:00","home.html","1002","2020-09-10 09:41:00","mine.html","1002","2020-09-10 09:42:00","favor.html","1003","2020-09-10 13:10:00","home.html","1003","2020-09-10 13:15:00","search.html"
package com.atguigu.spark.demoimport org.apache.spark.{SparkConf, SparkContext}import java.text.SimpleDateFormatimport java.util.UUID/*** 需求:获取用户每次会话的行为轨迹*/object demo7 {def main(args: Array[String]): Unit = {val list = List[(String,String,String)](("1001","2020-09-10 10:21:21","home.html"),("1001","2020-09-10 10:28:10","good_list.html"),("1001","2020-09-10 10:35:05","good_detail.html"),("1001","2020-09-10 10:42:55","cart.html"),("1001","2020-09-10 11:35:21","home.html"),("1001","2020-09-10 11:36:10","cart.html"),("1001","2020-09-10 11:38:12","trade.html"),("1001","2020-09-10 11:40:00","payment.html"),("1002","2020-09-10 09:40:00","home.html"),("1002","2020-09-10 09:41:00","mine.html"),("1002","2020-09-10 09:42:00","favor.html"),("1003","2020-09-10 13:10:00","home.html"),("1003","2020-09-10 13:15:00","search.html"))val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val rdd = sc.parallelize(list)//1.数据类型转换val rdd2 = rdd.map {case (userid, timestr, page) =>val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val time = formatter.parse(timestr).getTimeUserAnalysis(userid, time, page)}//rdd2.foreach(println(_))//2.按照用户分组val rdd3 = rdd2.groupBy(_.userid)//3.对每个用户的所有数据按照时间排序val rdd4 = rdd3.flatMap(x => {//4.滑窗[数据两两比较,如果时间小于等于半小时,属于一次会话,时间大于半小时,属于不同会话]val slidingList = x._2.toList.sortBy(_.time).sliding(2)slidingList.foreach(list => {val headElement = list.headval lastElement = list.lastif (lastElement.time - headElement.time <= 30 * 60 * 1000) {lastElement.session = headElement.sessionlastElement.step = headElement.step + 1}})x._2})//5.结果展示rdd4.collect().foreach(println(_))}}case class UserAnalysis(userid:String,time:Long,page:String,var session:String=UUID.randomUUID().toString,var step:Int=1)
![$11[Spark练习] - 图13](/uploads/projects/liuye-6lcqc@gws1uf/ef6489424970e279a7580182e0ce6dde.png)
