数据

data.txt

步骤

第一步读取数据

代码

  1. val spark = SparkSession.builder()
  2. .config(
  3. new SparkConf()
  4. .setMaster("local[*]")
  5. .setAppName("text")
  6. )
  7. // 添加满足 inner join 的配置
  8. .config("spark.sql.crossJoin.enabled","true")
  9. .getOrCreate()
  10. import spark.implicits._
  11. def parseRating(str: String): Rating = {
  12. val fields = str.split("::")
  13. Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
  14. }
  15. // textFile 传入路径
  16. val ratings = spark.sparkContext.textFile("data/scala/sample_movielens_ratings.txt")
  17. .map(parseRating)
  18. .toDF()
  19. ratings.show(false)

一部分结果

image.png

第二步 处理分母需要的数据

代码

分母是 向量模开根号 乘积 这里先计算每一个向量的模开根号

  1. val df_rating = ratings.rdd.map(x => {
  2. (x(0).toString, x(2).toString)
  3. })
  4. .groupByKey()
  5. .mapValues(
  6. /*sqrt 开根号*/
  7. score => math.sqrt(
  8. score.toArray
  9. /*pow 代表一第一个参数为底 第二个参数为幂*/
  10. .map(x => {
  11. math.pow(x.toDouble, 2)
  12. })
  13. .reduce(_ + _)
  14. )
  15. ).toDF("userId", "rating")

一部分结果

image.png

第三步求分子

第一步

  1. 分子是两个向量点的乘积和
  2. 先连接两个用户 还有两个人对电影的评分

    代码

    1. val _ratingScore = ratings.select(
    2. $"userId".alias("_userId")
    3. , $"movieId"
    4. , $"rating".alias("_rating")
    5. )
    6. .join(ratings, "movieId")
    7. .where($"userId" =!= $"_userId")
    8. .select(
    9. $"movieId"
    10. , $"userId"
    11. , $"_userId"
    12. , $"rating"
    13. , $"_rating"
    14. )
    15. _ratingScore.show(false)
    16. _ratingScore.printSchema()

    一部分结果

    image.png
    image.png

    第二步

    思路

  3. 先求 rating和_rating乘积

  4. 按照userId和_userId 分组 对乘积求和得出分子

    代码

    ```java // 求出分子 rating和_rating乘积和 val sumFenMu = _ratingScore.select(
    1. $"userId"
    , $”_userId”
    1. , ($"rating" * $"_rating").alias("score_mo1")
    ) .groupBy( $”userId” , $”_userId”) .agg(sum($”score_mo1”).alias(“sumScore”))
  1. <a name="ADLXk"></a>
  2. #### 一部分结果
  3. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/12593807/1652926812326-241ab370-8eb6-4eb7-9453-ae7fd766c7da.png#clientId=uc9fc30a1-ab20-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=163&id=u1a151c9a&margin=%5Bobject%20Object%5D&name=image.png&originHeight=163&originWidth=198&originalType=binary&ratio=1&rotation=0&showTitle=false&size=6929&status=done&style=none&taskId=u990151de-c973-4913-a84a-01458efa757&title=&width=198)
  4. <a name="hBfBu"></a>
  5. ## 整合求余弦相似度并且按照top5输出
  6. <a name="nB3O8"></a>
  7. ### 第一步
  8. <a name="hHjBL"></a>
  9. #### 思路
  10. 1. 把分子的表 与 完成一部分分母的表进行联表
  11. 1. 通过结果表 对 两个分母的一部分进行乘积
  12. 1. 分子除以分母得出余弦相似度
  13. 1. 开窗函数保留前五
  14. <a name="ZiT40"></a>
  15. #### 代码
  16. ```java
  17. // 先复制一份分母表
  18. val _df_rating = df_rating.select($"userId".alias("_userId"), $"rating".as("_rating"))
  19. // 得到 _userId|userId|sumScore|rating|_rating
  20. val frame: DataFrame = sumFenMu.join(
  21. df_rating,
  22. "userId"
  23. ).join(
  24. _df_rating,
  25. "_userId"
  26. )
  27. frame.show(false)
  28. frame.createTempView("frame")
  29. // 按照公式 sumScore/ (rating * _rating)
  30. spark.sql("select * from frame")
  31. .select(
  32. col("userId")
  33. , col("_userId")
  34. , (col("sumScore")/( col("rating") * col("_rating"))).as("cos")
  35. )
  36. // 开窗函数求出 余弦相似度最接近的top5
  37. .withColumn("top", row_number() over(Window.partitionBy("userId")
  38. .orderBy($"cos".desc)))
  39. .where($"top"<=5)
  40. .drop($"top")
  41. .show(false)

一部分结果

image.png

整体代码

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.ml.recommendation.ALS
  3. import org.apache.spark.sql.expressions.Window
  4. import org.apache.spark.sql.functions._
  5. import org.apache.spark.sql.{DataFrame, SparkSession}
  6. object helloBy {
  7. case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
  8. def main(args: Array[String]): Unit = {
  9. // 机器学习 基于协同过滤推荐系统
  10. System.setProperty("hadoop.home.dir", "H:\\winutils\\winutils-master\\hadoop-2.6.0")
  11. val spark = SparkSession.builder()
  12. .config(
  13. new SparkConf()
  14. .setMaster("local[*]")
  15. .setAppName("text")
  16. )
  17. // 添加满足 inner join 的配置
  18. .config("spark.sql.crossJoin.enabled","true")
  19. .getOrCreate()
  20. import spark.implicits._
  21. def parseRating(str: String): Rating = {
  22. val fields = str.split("::")
  23. Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
  24. }
  25. val ratings = spark.sparkContext.textFile("data/scala/sample_movielens_ratings.txt")
  26. .map(parseRating)
  27. .toDF()
  28. ratings.show(false)
  29. // 通过余弦相似度计算用户相似度
  30. // 分母每个向量模的乘积
  31. // 获取独自的每一个用户模开根号
  32. //每一个点的乘积和就是模
  33. val df_rating = ratings.rdd.map(x => {
  34. (x(0).toString, x(2).toString)
  35. })
  36. .groupByKey()
  37. .mapValues(
  38. /*sqrt 开根号*/
  39. score => math.sqrt(
  40. score.toArray
  41. /*pow 代表一第一个参数为底 第二个参数为幂*/
  42. .map(x => {
  43. math.pow(x.toDouble, 2)
  44. })
  45. .reduce(_ + _)
  46. )
  47. ).toDF("userId", "rating")
  48. df_rating.show(false)
  49. // 用户两两匹配
  50. val _ratingScore = ratings.select(
  51. $"userId".alias("_userId")
  52. , $"movieId"
  53. , $"rating".alias("_rating")
  54. )
  55. .join(ratings, "movieId")
  56. .where($"userId" =!= $"_userId")
  57. .select(
  58. $"movieId"
  59. , $"userId"
  60. , $"_userId"
  61. , $"rating"
  62. , $"_rating"
  63. )
  64. _ratingScore.show(false)
  65. _ratingScore.printSchema()
  66. _ratingScore.cache()
  67. // 求出分子 rating和_rating乘积和
  68. val sumFenMu = _ratingScore.select(
  69. $"userId"
  70. , $"_userId"
  71. , ($"rating" * $"_rating").alias("score_mo1")
  72. )
  73. .groupBy(
  74. $"userId"
  75. , $"_userId")
  76. .agg(sum($"score_mo1").alias("sumScore"))
  77. sumFenMu.show(false)
  78. val _df_rating = df_rating.select($"userId".alias("_userId"), $"rating".as("_rating"))
  79. // 得到 _userId|userId|sumScore|rating|_rating
  80. val frame: DataFrame = sumFenMu.join(
  81. df_rating,
  82. "userId"
  83. ).join(
  84. _df_rating,
  85. "_userId"
  86. )
  87. frame.show(false)
  88. frame.createTempView("frame")
  89. // 按照公式 sumScore/ (rating * _rating)
  90. spark.sql("select * from frame")
  91. .select(
  92. col("userId")
  93. , col("_userId")
  94. , (col("sumScore")/( col("rating") * col("_rating"))).as("cos")
  95. )
  96. // 开窗函数求出 余弦相似度最接近的top5
  97. .withColumn("top", row_number() over(Window.partitionBy("userId").orderBy($"cos".desc)))
  98. .where($"top"<=5)
  99. .drop($"top")
  100. .show(false)
  101. }
  102. }

注意

  1. 分母是分两步求得 一步到位不好处理(但是可以不用分母 分子 分母) 可以用(分母 分母 分子)这种顺序也是可以得
  2. 一定要对公式特别熟悉