import org.apache.spark.SparkConfimport org.apache.spark.ml.recommendation.ALSimport org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._import org.apache.spark.sql.{DataFrame, SparkSession}object helloBy { case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long) def main(args: Array[String]): Unit = { // 机器学习 基于协同过滤推荐系统 System.setProperty("hadoop.home.dir", "H:\\winutils\\winutils-master\\hadoop-2.6.0") val spark = SparkSession.builder().config( new SparkConf() .setMaster("local[*]") .setAppName("text") ).getOrCreate() import spark.implicits._ def parseRating(str: String): Rating = { val fields = str.split("::") Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong) } val ratings = spark.sparkContext.textFile("data/scala/sample_movielens_ratings.txt") .map(parseRating) .toDF() val Array(training, test) = ratings.randomSplit(Array(0.5, 0.5)) // Build the recommendation model using ALS on the training data val als = new ALS() .setMaxIter(5) .setRegParam(0.01) .setUserCol("userId") .setItemCol("movieId") .setRatingCol("rating") val model = als.fit(training) // 我的思路就是评分高就推荐给用户 val frame: DataFrame = model.transform(test) frame .drop("rating", "timestamp") .where($"userId" === 5 ) .orderBy($"prediction".desc) .limit(5) .show(false) // 给电影推荐用户 // 先获取某电影评分最高 val w = Window.partitionBy($"movieId", $"userId").orderBy($"prediction".desc) frame .drop("rating", "timestamp") .where($"movieId" === 20 ) .withColumn("rn", row_number().over(w)).where($"rn" === 1).drop($"rn") .orderBy($"prediction".desc) .limit(5) .select("movieId", "userId", "prediction") .show(false) }}