5.1 交替最小二乘法(ALS)


在 Spark 中有一个经常被使用的推荐算法,
交替最小二乘法(ALS)**。该算法利用一种叫做协同过滤(collaborative filtering)的方法,这种方法仅使用用户过去的采购数据做推荐,它并不需要或使用用户或项目的其他任何特征,它还支持多种 ALS 变体(例如,显式或隐式反馈)。除了 ALS,Spark 还提供了在购物篮分析中用于发现关联规则的频繁模式挖掘(Frequent Pattern Mining)技术。最后,Spark 的 RDD API 提供了低级的矩阵分解运算。

ALS 为每个用户和物品建立 k 维的特征向量,从而可以通过用户和物品向量的点积来估算该用户对该物品的评分值,所以只需要用户 - 物品键值对的评分数据作为输入数据集,其中有三列:用户 id 列、物品(如电影)id 列和评分列。评分可以是显式的,即想要直接预测的数值等级;或隐式的,在这种情况下每个分数表示用户和物品之间的交互强度(例如,访问特定页的次数),它衡量用户对该物品的偏好程度。根据输入的 DataFrame,模型将产生特征向量,可以使用它来预测用户对尚未评分的物品的评级。

在实践中需要注意的一个问题是,该算法偏好推荐那些非常常见的或者有大量信息的物品,如果正在推出一种还没有用户熟悉的新产品,该算法就不会推荐它。此外如果新用户加入平台,他们可能在训练集中没有任何打分记录,算法也将不知道推荐他们些什么,这些称为冷启动(Cold Start)问题。

在 ALS 中,这些配置用于确定模型的结构以及具体的协同过滤问题:

  • rank:rank(秩)确定了用户和物品特征向量的维度,这通常是通过实验来调整,一个重要权衡是过高的秩导致过拟合,而过低的秩导致不能做出最好的预测,默认值为 10。
  • alpha:在基于隐式反馈(用户行为)的数据上进行训练时,alpha 设置偏好的基线置信度(BaselineConfidence),这个值越大则越认为用户和他没有评分的物品之间没有关联。默认值 1.0,需要通过实验调参。
  • regParam:控制正则化参数来防止过拟合,需要测试不同的值来找到针对当前问题的最优值。默认值为 0.1。
  • implicitPrefs:此布尔值指定是在隐式数据(true)还是显式数据(false)上进行训练,应根据模型输入的数据来设置此值。如果数据是基于对产品的被动交互(如通过单击或网页访问),则应设置为隐式****与此相反,如果数据是显式分级的(例如,用户给这家餐厅 4/5 的星级),则应设置为显式。默认值是显式。
  • nonnegative:如果设置为 true,则将非负约束置于最小二乘问题上,并且只返回非负特征向量,这可以提高某些应用程序的性能。默认值为 false。

交替最小二乘法的训练参数与其他模型中的训练参数不同,这是因为将会对数据在整个集群中的分布方式进行更低级的控制,分布在集群中的数据组称为数据块(block),**确定在每个块中放置的数据对训练时间有很大影响**(不是最终结果)通常的经验法则是每个 block 大概分配一百万到五百万个评分值,如果每个 block 的数据量少于这个数字,则太多的 block 可能会影响性能

说明:

  • numUserBlocks:用于确定将用户数据拆分成多少个数据块,默认值为 10。
  • numItemBlocks:用于确定将物品数据拆分为多少个数据块,默认值为 10。
  • maxIter:训练的迭代次数,改变它可能不会太影响结果,所以这不应该是调参的首要参数,默认值为 10。在检查训练历史记录之后,如果发现迭代多少次之后误差曲线还没有平缓,可以考虑增大该值
  • checkpointInterval:设置 checkpoint 可以在训练过程中保存模型状态,以便更快地从节点故障中恢复。可以使用 SparkContext。setCheckpointDir 设置检查点目录。
  • seed:指定随机种子帮助复现实验结果。

预测参数用来指定如何用训练好的模型进行预测。在例子中有一个参数,就是冷启动策略(通过 coldStartStrategy 设置),该参数用来设置模型应给未出现过在训练集中的用户或物品推荐什么。当在实际生产中使用模型时,冷启动的挑战就会凸显出来,模型对新用户和物品没有评分的历史记录无法做出推荐。当在 Spark 的 CrossValidator 或 TrainValidationSplit 中使用简单随机拆分时,也可能发生这种情况,遇到不在训练集中的用户和物品非常常见。

默认情况下,Spark 在遇到实际模型中不存在的用户和物品时,预测值将使用默认值 NaN,但是这在训练过程中是不可取的,因为它会破坏 Evaluator 正确评估模型的能力,这使得无法合理地选择模型。用户可以将 coldStartStrategy 参数设置为 drop**,以便把在 DataFrame 中预测为 NaN 值的行删除,排除 NaN 数据后将通过非 NaN 数据计算评估指标**。下面示例将使用 MovieLens 电影评分数据集,这个数据集具有与电影推荐相关的信息,首先使用此数据集来训练模型:

  1. from pyspark.ml.recommendation import ALS
  2. from pyspark.sql import Row
  3. ratings = spark.read.text("/data/sample_movielens_ratings.txt")\
  4. .rdd.toDF()\
  5. .selectExpr("split(value , '::') as col")\
  6. .selectExpr(
  7. "cast(col[0] as int) as userId",
  8. "cast(col[1] as int) as movieId",
  9. "cast(col[2] as float) as rating",
  10. "cast(col[3] as long) as timestamp")
  11. training, test = ratings.randomSplit([0.8, 0.2])
  12. als = ALS()\
  13. .setMaxIter(5)\
  14. .setRegParam(0.01)\
  15. .setUserCol("userId")\
  16. .setItemCol("movieId")\
  17. .setRatingCol("rating")
  18. print als.explainParams()
  19. alsModel = als.fit(training)
  20. predictions = alsModel.transform(test)

现在可以针对每个用户或影片输出评分最高的 k 个推荐。模型的 recommendForAllUsers 方法返回对应某个 userId 的 DataFrame,包含推荐电影的数组,以及每个影片的评分。recommendForAllItems 返回对应某个 movieId 的 DataFrame 以及最有可能给该影片打高分的前几个用户:

  1. #in Python
  2. alsModel.recommendForAllUsers(10)\
  3. .selectExpr("userId", "explode(recommendations)").show()
  4. alsModel.recommendForAllItems(10)\
  5. .selectExpr("movieId", "explode(recommendations)").show()

在涉及到冷启动策略时,可以在使用 ALS 时设置自动化的模型评估器。有一件事可能不太明显,就是这个推荐问题其实只是一种回归问题,由于正在预测给定用户的评分值,希望通过优化措施以减少预测的用户评分和真实值之间的差别,可以使用 RegressionEvaluator 来执行,将其放在 pipeline 中以自动化训练过程。执行 Evaluator 操作时,还应将冷启动策略设置为 drop 而不是 NaN然后在生产系统中实际进行预测时将其切换回 NaN

  1. from pyspark.ml.evaluation import RegressionEvaluator
  2. evaluator = RegressionEvaluator()\
  3. .setMetricName("rmse")\
  4. .setLabelCol("rating")\
  5. .setPredictionCol("prediction")
  6. rmse = evaluator.evaluate(predictions)
  7. print("Root-mean-square error = %f" % rmse)

推荐结果可以使用标准的回归 metric 指标和一些特定于推荐的 metric 指标来衡量。毫无疑问,在评估推荐结果的问题上,比单纯的基于回归的评估更为复杂,这些指标对于评估最终模型特别有用。可以继续使用回归的 metric 指标来评估推荐结果,简单地查看每个用户和项目的实际评级与预测值的接近程度

  1. from pyspark.mllib.evaluation import RegressionMetrics
  2. regComparison = predictions.select("rating", "prediction")\
  3. .rdd.map(lambda x: (x(0), x(1)))
  4. metrics = RegressionMetrics(regComparison)

还有另一个工具就是排名指标RankingMetric 将推荐结果与给定用户的实际评分值(或实际偏好)进行比较,它不关注具体的评分值,而是关注算法是否能够把已经打分的物品再次推荐给用户。这需要做一些数据准备工作,首先需要为给定的用户收集一组高评分的电影,在该例子中将使用一个相当低的阈值,即电影评分在 2.5 以上的电影,选择一个合适的阈值属于商业决策,如下所示:

  1. from pyspark.mllib.evaluation import RankingMetrics, RegressionMetrics
  2. from pyspark.sql.functions import col, expr
  3. perUserActual = predictions\
  4. .where("rating > 2.5")\
  5. .groupBy("userId")\
  6. .agg(expr("collect_set(movieId) as movies"))

现在有一个用户的集合,连同他们对电影的真实打分,下面将为每个用户依据算法取得前 10 个推荐电影,再观察前 10 个推荐是否出现在排名前 10 的电影集合中,如果训练了一个很好的模型,它将正确地推荐用户真正喜欢的电影,否则它还没有能学习到用户的喜好:

  1. perUserPredictions = predictions\
  2. .orderBy(col("userId"), expr("prediction DESC"))\
  3. .groupBy("userId")\
  4. .agg(expr("collect_list(movieId) as movies"))

现在有两个 DataFrame、一个预测值,另一个是真实的打分最高的电影集合,可以将它们传递到 RankingMetrics 对象中,此对象接受这些组合的 RDD 作为参数,如以下 join 操作和 RDD transformation 的例子所示:

  1. # in Python
  2. perUserActualvPred = perUserActual.join(perUserPredictions, ["userId"]).rdd\
  3. .map(lambda row: (row[1], row[2][:15]))
  4. ranks = RankingMetrics(perUserActualvPred)

现在,可以根据这个排名来度量推荐算法。例如,可以根据平均精度值指标来查看算法的准确度如何,还可以查看某排名位置之前的准确度,如查看在哪个排名位置开始正向预测开始大量失败:

  1. # in Python
  2. ranks.meanAveragePrecision
  3. ranks.precisionAt(5)

5.2 频繁模式挖掘

除了 ALS 之外,MLlib 提供的另一个推荐系统工具是频繁模式挖掘(Frequent Pattern Mining),有时称为购物篮分析(market basket analysis),根据原始数据发现关联规则。例如鉴于大量的交易,可能会发现购买热狗的用户几乎总是购买热狗面包。这种技术可以应用于推荐中,尤其是当人们在购物时。Spark 实现了频繁模式挖掘的 FP-growth 算法。