SparkMl使用的不多,一两年前业务上需要就用了一下,之后就没再使用了,最近又有需求了,使用SparkMl做了一下时序预测,先在这一篇笔记里记录一下之前使用SparkMl的简单应用。
这个案例使用的是随机森林模型。
1.先准备一下训练数据和测试数据,两个数据集的scheme的一样
val data: DataFrame=......
val trainDF: DataFrame= data.where("date_time<'2024-02-22'")
val testDF: DataFrame= data.where("date_time='2024-02-22'")
2.离散变量转为连续变量,硬编码
val indexCols = Array(
"peaks_pm_out", "lot_nature_code_new", "seg", "peaks_inner_pm_value", "peaks_am_out", "peaks_inner_am_value",
"peaks_am_inner", "weekday", "is_holiday", "peaks_pf", "peaks_af", "peaks_out_pm_value", "is_positive",
"peaks_pm_inner", "peaks_out_am_value")
//离散变量 转为连续变量 硬编码
val labelIndexers: Array[StringIndexer] = indexCols.map(col => {
new StringIndexer()
.setInputCol(col)
.setOutputCol(s"le_$col") //转换后的字段名为原字段名加上le前缀
.setStringOrderType("alphabetAsc")
.setHandleInvalid("keep")
})
3.设置特征向量,这些字段将作为特征用于训练模型,会汇总成一个字段”features”
val assembler = new VectorAssembler().setInputCols(
Array("out_num"
, "avg_time_seg_median"
, "le_peaks_pm_out"
, "le_lot_nature_code_new"
, "le_seg"
, "le_peaks_inner_pm_value"
, "come_num"
, "le_peaks_am_out"
, "flow_diff"
, "le_peaks_inner_am_value"
, "le_peaks_am_inner"
, "le_weekday"
, "avg_time_day7_mean"
, "le_is_holiday"
, "le_peaks_pf"
, "le_peaks_af"
, "le_peaks_out_pm_value"
, "le_is_positive"
, "le_peaks_pm_inner"
, "le_peaks_out_am_value"
, "total_space")
).setOutputCol("features").setHandleInvalid("skip")
4.创建模型对象,各个参数有啥用就需要去看随机森林的文档了
//随机森林模型对象
val rf: RandomForestRegressor = new RandomForestRegressor()
.setNumTrees(100)
.setMaxDepth(15)
.setMinInstancesPerNode(1)
.setMinInfoGain(0.0)
.setImpurity("variance")
.setFeaturesCol("features")
.setLabelCol("lable")
.setFeatureSubsetStrategy("auto")
.setMaxBins(97)
.setCacheNodeIds(true)
.setSubsamplingRate(1.0)
.setSeed(42)
5.创建机器学习管道,并训练得到模型
//创建 机器学习pipeline
val serializables = labelIndexers :+ assembler :+ rf
val pipeline = new Pipeline().setStages(serializables)
//基于训练集 模型训练
val model: PipelineModel = pipeline.fit(train)
6.使用上述模型结果预测数据
//测试数据集 模型预测结果验证,result这个dataframe中包含了label值和prediction值,可以基于这两个值
//进行偏差的计算,比如mae,mse
val result = model.transform(testDF)
7.加入结果不错,模型可以保存下来,共后续使用,我是保存到hdfs中
model.write.overwrite().save(s"hdfs://xxxxx/spark/model/xxxxx/xxxx_model")
8.其它spark程序加载模型并使用模型预测结果
val result: DataFrame=.....
val model = PipelineModel.load("hhdfs://xxxxx/spark/model/xxxxx/xxxx_model")
//预测结果
val predictionDF = model
.transform(result)
.where("prediction<=1.0")
.drop("features")