sparkMl

    SparkMl使用的不多,一两年前业务上需要就用了一下,之后就没再使用了,最近又有需求了,使用SparkMl做了一下时序预测,先在这一篇笔记里记录一下之前使用SparkMl的简单应用。

    这个案例使用的是随机森林模型。

    1.先准备一下训练数据和测试数据,两个数据集的scheme的一样

    1. val data: DataFrame=......
    2. val trainDF: DataFrame= data.where("date_time<'2024-02-22'")
    3. val testDF: DataFrame= data.where("date_time='2024-02-22'")

    2.离散变量转为连续变量,硬编码

    1. val indexCols = Array(
    2. "peaks_pm_out", "lot_nature_code_new", "seg", "peaks_inner_pm_value", "peaks_am_out", "peaks_inner_am_value",
    3. "peaks_am_inner", "weekday", "is_holiday", "peaks_pf", "peaks_af", "peaks_out_pm_value", "is_positive",
    4. "peaks_pm_inner", "peaks_out_am_value")
    5. //离散变量 转为连续变量 硬编码
    6. val labelIndexers: Array[StringIndexer] = indexCols.map(col => {
    7. new StringIndexer()
    8. .setInputCol(col)
    9. .setOutputCol(s"le_$col") //转换后的字段名为原字段名加上le前缀
    10. .setStringOrderType("alphabetAsc")
    11. .setHandleInvalid("keep")
    12. })

    3.设置特征向量,这些字段将作为特征用于训练模型,会汇总成一个字段”features”

    1. val assembler = new VectorAssembler().setInputCols(
    2. Array("out_num"
    3. , "avg_time_seg_median"
    4. , "le_peaks_pm_out"
    5. , "le_lot_nature_code_new"
    6. , "le_seg"
    7. , "le_peaks_inner_pm_value"
    8. , "come_num"
    9. , "le_peaks_am_out"
    10. , "flow_diff"
    11. , "le_peaks_inner_am_value"
    12. , "le_peaks_am_inner"
    13. , "le_weekday"
    14. , "avg_time_day7_mean"
    15. , "le_is_holiday"
    16. , "le_peaks_pf"
    17. , "le_peaks_af"
    18. , "le_peaks_out_pm_value"
    19. , "le_is_positive"
    20. , "le_peaks_pm_inner"
    21. , "le_peaks_out_am_value"
    22. , "total_space")
    23. ).setOutputCol("features").setHandleInvalid("skip")

    4.创建模型对象,各个参数有啥用就需要去看随机森林的文档了

    1. //随机森林模型对象
    2. val rf: RandomForestRegressor = new RandomForestRegressor()
    3. .setNumTrees(100)
    4. .setMaxDepth(15)
    5. .setMinInstancesPerNode(1)
    6. .setMinInfoGain(0.0)
    7. .setImpurity("variance")
    8. .setFeaturesCol("features")
    9. .setLabelCol("lable")
    10. .setFeatureSubsetStrategy("auto")
    11. .setMaxBins(97)
    12. .setCacheNodeIds(true)
    13. .setSubsamplingRate(1.0)
    14. .setSeed(42)

    5.创建机器学习管道,并训练得到模型

    1. //创建 机器学习pipeline
    2. val serializables = labelIndexers :+ assembler :+ rf
    3. val pipeline = new Pipeline().setStages(serializables)
    4. //基于训练集 模型训练
    5. val model: PipelineModel = pipeline.fit(train)

    6.使用上述模型结果预测数据

    1. //测试数据集 模型预测结果验证,result这个dataframe中包含了label值和prediction值,可以基于这两个值
    2. //进行偏差的计算,比如mae,mse
    3. val result = model.transform(testDF)

    7.加入结果不错,模型可以保存下来,共后续使用,我是保存到hdfs中

    1. model.write.overwrite().save(s"hdfs://xxxxx/spark/model/xxxxx/xxxx_model")

    8.其它spark程序加载模型并使用模型预测结果

    1. val result: DataFrame=.....
    2. val model = PipelineModel.load("hhdfs://xxxxx/spark/model/xxxxx/xxxx_model")
    3. //预测结果
    4. val predictionDF = model
    5. .transform(result)
    6. .where("prediction<=1.0")
    7. .drop("features")