数据

data.txt

思路

  1. 加载数据
  2. 获取用户-电影-评分
  3. 获取电影-电影相似度

    加载数据

    ```java val spark = SparkSession.builder() .config( new SparkConf()
    1. .setMaster("local[*]")
    2. .setAppName("text")
    ) // 添加满足 inner join 的配置 .config(“spark.sql.crossJoin.enabled”,”true”) .getOrCreate() import spark.implicits._
  1. def parseRating(str: String): Rating = {
  2. val fields = str.split("::")
  3. Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
  4. }
  5. val ratings = spark.sparkContext.textFile("data/scala/sample_movielens_ratings.txt")
  6. .map(parseRating)
  7. .toDF()
  8. ratings.show(false)
  1. <a name="QNXXp"></a>
  2. # 第二步求得 movieId_1 和 movieId_2 的共同个数
  3. ```java
  4. // 物品相似度表(矩阵)
  5. val dataFrame: DataFrame = ratings.select("userId", "movieId")
  6. val frame: DataFrame = dataFrame
  7. .join(dataFrame, "userId")
  8. .map(x => {
  9. (x(0).toString, (x(1).toString, x(2).toString))
  10. })
  11. .map(x => {
  12. (x._2._1, x._2._2, 1)
  13. })
  14. .groupBy("_1", "_2").agg(sum("_3").as("sim"))
  15. .withColumnRenamed("_1", "movieId_1")
  16. .withColumnRenamed("_2", "movieId_2")

部分结果

image.png

第三步 得出结果

  1. 形成 userId movieId rating movieId_1 movieId_2 sim
  2. userId | movieId_2 | interse interse = rating * sim
  3. 利用开窗函数获得前四
    1. ratings.join(frame, ratings("movieId") === frame("movieId_1"), "inner" )
    2. // 获取sim与rating乘积
    3. .withColumn("interse", frame("sim")*ratings("rating"))
    4. .select("userId", "movieId_2", "interse")
    5. // 按照用户与movieId_2 分组 求 interse得总和 并且获取top4
    6. .groupBy("userId", "movieId_2")
    7. .agg(sum("interse").as("interse"))
    8. // 开窗函数
    9. .withColumn("top", row_number().over(Window.partitionBy("userId").orderBy($"interse".desc)))
    10. .where($"top" < 5)
    11. .show(false)
    12. /*如果推荐的前四电影中 用户没有看过则 这个电影就是推荐给用户得其中之一*/

    一部分结果

    image.png

得出结果中间一部分代码得图片

  1. ratings.join(frame, ratings("movieId") === frame("movieId_1"), "inner" )
  2. // 获取sim与rating乘积
  3. .withColumn("interse", frame("sim")*ratings("rating")).show(false)

image.png
之后对这个表进行操作得到推荐表 我写在了一起

整体代码

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.ml.recommendation.ALS
  3. import org.apache.spark.sql.expressions.Window
  4. import org.apache.spark.sql.functions._
  5. import org.apache.spark.sql.{DataFrame, SparkSession}
  6. object hello {
  7. case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
  8. def main(args: Array[String]): Unit = {
  9. // 机器学习 基于协同过滤推荐系统
  10. System.setProperty("hadoop.home.dir", "H:\\winutils\\winutils-master\\hadoop-2.6.0")
  11. val spark = SparkSession.builder()
  12. .config(
  13. new SparkConf()
  14. .setMaster("local[*]")
  15. .setAppName("text")
  16. )
  17. // 添加满足 inner join 的配置
  18. .config("spark.sql.crossJoin.enabled", "true")
  19. .getOrCreate()
  20. import spark.implicits._
  21. def parseRating(str: String): Rating = {
  22. val fields = str.split("::")
  23. Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
  24. }
  25. var ratings = spark.sparkContext.textFile("data/scala/sample_movielens_ratings.txt")
  26. .map(parseRating)
  27. .toDF()
  28. ratings = ratings.select("userId","movieId","rating").where($"rating" > 0)
  29. // val useItemDate = ratings.groupBy("userId")
  30. // .pivot("movieId")
  31. // .sum("rating")
  32. // .na
  33. // .fill(0)
  34. // useItemDate.show(false)
  35. // 物品相似度表(矩阵)
  36. val dataFrame: DataFrame = ratings.select("userId", "movieId")
  37. val frame: DataFrame = dataFrame
  38. .join(dataFrame, "userId")
  39. .map(x => {
  40. (x(0).toString, (x(1).toString, x(2).toString))
  41. })
  42. .map(x => {
  43. (x._2._1, x._2._2, 1)
  44. })
  45. .groupBy("_1", "_2").agg(sum("_3").as("sim"))
  46. .withColumnRenamed("_1", "movieId_1")
  47. .withColumnRenamed("_2", "movieId_2")
  48. frame.show(false)
  49. ratings.join(frame, ratings("movieId") === frame("movieId_1"), "inner" )
  50. // 获取sim与rating乘积
  51. .withColumn("interse", frame("sim")*ratings("rating"))
  52. .select("userId", "movieId_2", "interse")
  53. // 按照用户与movieId_2 分组 求 interse得总和 并且获取top4
  54. .groupBy("userId", "movieId_2")
  55. .agg(sum("interse").as("interse"))
  56. // 开窗函数
  57. .withColumn("top", row_number().over(Window.partitionBy("userId").orderBy($"interse".desc)))
  58. .where($"top" < 5)
  59. .show(false)
  60. /*如果推荐的前四电影中 用户没有看过则 这个电影就是推荐给用户得其中之一*/
  61. ratings.join(frame, ratings("movieId") === frame("movieId_1"), "inner" )
  62. // 获取sim与rating乘积
  63. .withColumn("interse", frame("sim")*ratings("rating")).show(false)
  64. // val frame1: DataFrame = frame.groupBy("movieId_1")
  65. // .pivot("movieId_2")
  66. // .sum("sim")
  67. // .withColumnRenamed("movieId_1", "movieId")
  68. // .na
  69. // .fill(0)
  70. // frame1.show(false)
  71. }
  72. }

注意

  1. 一定要注意表的对应列和对应关系
  2. 联表注意联表的两列不是我们需要的 而是附带的另一列 比如联表是(movieId, movieId_1) 我们需要的是 (movieId_2)