我们这次 Spark-sql 操作中所有的数据均来自Hive,首先在Hive 中创建表,,并导入数据。
一共有3 张表: 1 张用户行为表,1 张城市表,1 张产品表

需求:各区域热门商品 Top3

这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每
个商品在主要城市中的分布比例,超过两个城市用其他显示。
例如:
image.png

需求分析

➢查询出来所有的点击记录,并与city_info 表连接得到每个城市所在的地区 与Product_info 表连接得到产品名称
➢按照 地区和商品id 分组,统计出每个商品在每个地区的总点击次数
➢ 每个地区内按照点击次数降序排列
➢只取前三名
➢城市备注需要自定义 UDAF 函数

功能实现

➢ 连接三张表的数据,获取完整的数据(只有点击)
➢ 将数据根据地区,商品名称分组
➢ 统计商品点击次数总和,取 Top3
➢ 实现自定义聚合函数显示备注

代码

创建表

  1. package com.yang.bigdata.spark.sql
  2. import org.apache.spark.sql._
  3. /**
  4. * @author :ywb
  5. * @date :Created in 2022/1/23 6:10 下午
  6. * @description:toDo
  7. */
  8. object Spark06_SparkSQK_createTable {
  9. def main(args: Array[String]): Unit = {
  10. val spark: SparkSession = SparkSession
  11. .builder()
  12. .master("local[*]")
  13. .appName("sqlTest")
  14. .enableHiveSupport()
  15. .getOrCreate()
  16. //spark.sql("create database yang")
  17. spark.sql("use yang")
  18. //准备数据
  19. spark.sql(
  20. """
  21. |CREATE TABLE `user_visit_action`(
  22. |`date` string,
  23. |`user_id` bigint,
  24. |`session_id` string,
  25. |`page_id` bigint,
  26. |`action_time` string,
  27. |`search_keyword` string,
  28. |`click_category_id` bigint,
  29. |`click_product_id` bigint,
  30. |`order_category_ids` string,
  31. |`order_product_ids` string,
  32. |`pay_category_ids` string,
  33. |`pay_product_ids` string,
  34. |`city_id` bigint)
  35. |row format delimited fields terminated by '\t'
  36. """.stripMargin)
  37. spark.sql(
  38. """
  39. |load data local inpath 'datas/user_visit_action.txt' into table yang.user_visit_action
  40. |""".stripMargin)
  41. spark.sql(
  42. """
  43. |CREATE TABLE `product_info`(
  44. |`product_id` bigint,
  45. |`product_name` string,
  46. |`extend_info` string)
  47. |row format delimited fields terminated by '\t'
  48. |""".stripMargin)
  49. spark.sql(
  50. """
  51. |load data local inpath 'datas/product_info.txt' into table yang.product_info
  52. |""".stripMargin)
  53. spark.sql(
  54. """
  55. |CREATE TABLE `city_info`(
  56. |`city_id` bigint,
  57. |`city_name` string,
  58. |`area` string)
  59. |row format delimited fields terminated by '\t'
  60. |""".stripMargin)
  61. spark.sql(
  62. """
  63. |load data local inpath 'datas/city_info.txt' into table yang.city_info
  64. |""".stripMargin)
  65. spark.sql("""select * from city_info""").show()
  66. spark.close()
  67. }
  68. }
  1. object Spark06_SparkSQK_TODO1 {
  2. def main(args: Array[String]): Unit = {
  3. val spark: SparkSession = SparkSession
  4. .builder()
  5. .master("local[*]")
  6. .appName("sqlTest")
  7. .enableHiveSupport()
  8. .getOrCreate()
  9. spark.sql("use yang")
  10. spark.sql(
  11. """
  12. |select
  13. | *
  14. |from (
  15. | select
  16. | *,
  17. | rank() over(partition by area order by clickCount desc) as rank
  18. | from (
  19. | select
  20. | area,
  21. | product_name,
  22. | count(*) as clickCount
  23. | from (
  24. | select
  25. | u.*,
  26. | p.product_name,
  27. | c.area,
  28. | c.city_name
  29. | from user_visit_action u
  30. | join product_info p on u.click_product_id = p.product_id
  31. | join city_info c on u.city_id = c.city_id
  32. | where u.click_product_id > -1
  33. | ) t1 group by area,product_name
  34. | ) t2
  35. |) t3 where rank <= 3
  36. |""".stripMargin).show()
  37. spark.close()
  38. }
  39. }
  1. package com.yang.bigdata.spark.sql
  2. import org.apache.commons.lang.mutable.Mutable
  3. import org.apache.spark.sql._
  4. import org.apache.spark.sql.expressions.Aggregator
  5. import scala.collection.mutable
  6. import scala.collection.mutable.ListBuffer
  7. /**
  8. * @author :ywb
  9. * @date :Created in 2022/1/23 6:10 下午
  10. * @description:toDo
  11. */
  12. object Spark06_SparkSQK_TODO2 {
  13. def main(args: Array[String]): Unit = {
  14. val spark: SparkSession = SparkSession
  15. .builder()
  16. .master("local[*]")
  17. .appName("sqlTest2")
  18. .enableHiveSupport()
  19. .getOrCreate()
  20. spark.sql("use yang")
  21. spark.sql(
  22. """
  23. |select
  24. | a.*,
  25. | p.product_name,
  26. | c.area,
  27. | c.city_name
  28. |from user_visit_action a
  29. |join product_info p on a.click_product_id = p.product_id
  30. |join city_info c on a.city_id = c.city_id
  31. |where a.click_product_id > -1
  32. |""".stripMargin).createOrReplaceTempView("t1")
  33. spark.udf.register("dsc", functions.udaf(new DescAgg))
  34. spark.sql(
  35. """
  36. |select
  37. | area,
  38. | product_name,
  39. | count(*) as click_count,
  40. | dsc(city_name)
  41. |from t1
  42. |group by area,product_name
  43. |""".stripMargin).createOrReplaceTempView("t2")
  44. spark.sql(
  45. """
  46. |select
  47. | *,
  48. | rank() over(partition by area order by click_count desc) as rank
  49. |from t2
  50. |""".stripMargin).createOrReplaceTempView("t3")
  51. spark.sql(
  52. """
  53. |select
  54. | *
  55. |from t3
  56. |where rank <= 3
  57. |""".stripMargin).show()
  58. spark.close()
  59. }
  60. // countSum ==》 [(cityName,count),(cityName,count) ...... ]
  61. case class Buff( var sum : Long , var map : mutable.Map[String,Long] )
  62. class DescAgg extends Aggregator[String,Buff,String]{
  63. //初始化
  64. override def zero: Buff = {
  65. Buff( 0L , mutable.Map[String,Long]())
  66. }
  67. //聚合
  68. override def reduce(buff: Buff, cityName: String): Buff = {
  69. buff.sum += 1L
  70. val count: Long = buff.map.getOrElse(cityName, 0L) + 1
  71. buff.map.update(cityName,count)
  72. buff
  73. }
  74. //合并
  75. override def merge(b1: Buff, b2: Buff): Buff = {
  76. val buffMap1: mutable.Map[String, Long] = b1.map
  77. val buffMap2: mutable.Map[String, Long] = b2.map
  78. buffMap2.foreach(
  79. {
  80. case (k,v) => {
  81. val count: Long = buffMap1.getOrElse(k, 0L) + v
  82. buffMap1.update(k,count)
  83. }
  84. }
  85. )
  86. b1.sum += b2.sum
  87. b1.map = buffMap1
  88. b1
  89. }
  90. // 最终值
  91. override def finish(temp: Buff): String = {
  92. val resList: ListBuffer[String] = ListBuffer[String]()
  93. val sum: Long = temp.sum
  94. val map: mutable.Map[String, Long] = temp.map
  95. val list: List[(String, Long)] = map.toList
  96. val hasMore: Boolean = list.size > 2
  97. val res: List[(String, Long)] = list.sortBy(_._2).take(2)
  98. var percentage: Long = 0L
  99. res.foreach(
  100. {
  101. case (k,v) => {
  102. var p: Long = v * 100 / sum
  103. percentage += p
  104. resList.append(s"$k $p%")
  105. }
  106. }
  107. )
  108. if (hasMore){
  109. resList.append(s"其他 ${100-percentage}%")
  110. }
  111. resList.mkString(",")
  112. }
  113. override def bufferEncoder: Encoder[Buff] = Encoders.product
  114. override def outputEncoder: Encoder[String] = Encoders.STRING
  115. }
  116. }