第7节我们详细介绍了PySpark中机器学习ML模块的各种函数的用法,本节将进行实际的案例分析。
# # 1 加载数据# In[18]:from pyspark.sql import typeslabels = [('INFANT_ALIVE_AT_REPORT', types.DoubleType()),('BIRTH_PLACE', types.StringType()),('MOTHER_AGE_YEARS', types.DoubleType()),('FATHER_COMBINED_AGE', types.DoubleType()),('CIG_BEFORE', types.DoubleType()),('CIG_1_TRI', types.DoubleType()),('CIG_2_TRI', types.DoubleType()),('CIG_3_TRI', types.DoubleType()),('MOTHER_HEIGHT_IN', types.DoubleType()),('MOTHER_PRE_WEIGHT', types.DoubleType()),('MOTHER_DELIVERY_WEIGHT', types.DoubleType()),('MOTHER_WEIGHT_GAIN', types.DoubleType()),('DIABETES_PRE', types.DoubleType()),('DIABETES_GEST', types.DoubleType()),('HYP_TENS_PRE', types.DoubleType()),('HYP_TENS_GEST', types.DoubleType()),('PREV_BIRTH_PRETERM', types.DoubleType())]schema = types.StructType([types.StructField(e[0], e[1], False) for e in labels])births = spark.read.csv('file:///opt/spark-2.2.1-bin-hadoop2.7/test/births_transformed.csv',header = True,schema = schema)
# In[19]:births.columns['INFANT_ALIVE_AT_REPORT','BIRTH_PLACE','MOTHER_AGE_YEARS','FATHER_COMBINED_AGE','CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI','MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT','MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN','DIABETES_PRE','DIABETES_GEST','HYP_TENS_PRE','HYP_TENS_GEST','PREV_BIRTH_PRETERM']
# In[20]:births.printSchema()root|-- INFANT_ALIVE_AT_REPORT: double (nullable = true)|-- BIRTH_PLACE: string (nullable = true)|-- MOTHER_AGE_YEARS: double (nullable = true)|-- FATHER_COMBINED_AGE: double (nullable = true)|-- CIG_BEFORE: double (nullable = true)|-- CIG_1_TRI: double (nullable = true)|-- CIG_2_TRI: double (nullable = true)|-- CIG_3_TRI: double (nullable = true)|-- MOTHER_HEIGHT_IN: double (nullable = true)|-- MOTHER_PRE_WEIGHT: double (nullable = true)|-- MOTHER_DELIVERY_WEIGHT: double (nullable = true)|-- MOTHER_WEIGHT_GAIN: double (nullable = true)|-- DIABETES_PRE: double (nullable = true)|-- DIABETES_GEST: double (nullable = true)|-- HYP_TENS_PRE: double (nullable = true)|-- HYP_TENS_GEST: double (nullable = true)|-- PREV_BIRTH_PRETERM: double (nullable = true)
# In[21]:births.select(births['BIRTH_PLACE']).show(3)+-----------+|BIRTH_PLACE|+-----------+| 1|| 1|| 1|+-----------+only showing top 3 rows
# # 2 创建转换器# 在建模前,由于统计模型只能对数值数据做操作,必须对BIRTH_PLACE变量进行编码## 使用OneHotEncoder方法来对BIRTH_PALCE列进行编码,该方法不接受StringType(),只能处理数字类型,首先将该列转换为IntegerType()# In[22]:births = births.withColumn('BIRTH_PALCE_INT',births['BIRTH_PLACE'].cast(types.IntegerType()))births.select(births['BIRTH_PLACE']).show(3)# In[23]:births.printSchema()root|-- INFANT_ALIVE_AT_REPORT: double (nullable = true)|-- BIRTH_PLACE: string (nullable = true)|-- MOTHER_AGE_YEARS: double (nullable = true)|-- FATHER_COMBINED_AGE: double (nullable = true)|-- CIG_BEFORE: double (nullable = true)|-- CIG_1_TRI: double (nullable = true)|-- CIG_2_TRI: double (nullable = true)|-- CIG_3_TRI: double (nullable = true)|-- MOTHER_HEIGHT_IN: double (nullable = true)|-- MOTHER_PRE_WEIGHT: double (nullable = true)|-- MOTHER_DELIVERY_WEIGHT: double (nullable = true)|-- MOTHER_WEIGHT_GAIN: double (nullable = true)|-- DIABETES_PRE: double (nullable = true)|-- DIABETES_GEST: double (nullable = true)|-- HYP_TENS_PRE: double (nullable = true)|-- HYP_TENS_GEST: double (nullable = true)|-- PREV_BIRTH_PRETERM: double (nullable = true)|-- BIRTH_PALCE_INT: integer (nullable = true)
# ## 2.1 创建第一个转换器# In[25]:from pyspark.ml import featureencoder = feature.OneHotEncoder(inputCol='BIRTH_PALCE_INT',outputCol='BIRTH_PALCE_VEC')
# ## 2.2 创建第二个转换器# 创建一个单一的列,它将所有特征整合在一起,使用VetorAssembler()转换器## 传递给VectorAssembler对象的inputCols参数是一个列表,该列表包含所有要合并在一起以组成outputCol——features的列# In[27]:featureCreator = feature.VectorAssembler(inputCols = [col[0] for col in labels[2:]] + [encoder.getOutputCol()],outputCol = 'features')# 使用编码器对象的输出(调用.getOutputCol()方法),在任何时候更改了编码器对象中输出列的名称,不必更改此参数的值# In[29]:encoder.getOutputCol()'BIRTH_PALCE_VEC'# In[30]:featureCreator.getOutputCol()'features'
# # 3 创建评估器# 先以逻辑回归模型作为分析# In[32]:from pyspark.ml import classificationlogistic = classification.LogisticRegression(maxIter=10,regParam=0.01,labelCol='INFANT_ALIVE_AT_REPORT')
# # 4 创建一个管道# In[34]:from pyspark.ml import Pipelinepipeline = Pipeline(stages=[encoder,featureCreator,logistic])
# # 5 拟合模型(耗时)# In[36]:births_train,births_test = births.randomSplit([0.7,0.3],seed = 666)model = pipeline.fit(births_train)
# # 6 预测# In[37]:test_model = model.transform(births_test)type(test_model)pyspark.sql.dataframe.DataFrame# In[39]:test_model.take(1)[Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]# rawPrediction是特征和系数的线性组合值;## probability是为每个类别计算出的概率;## prediction是最终的类分配
# # 7 评估模型的性能# In[40]:from pyspark.ml import evaluation# 使用BinaryClassificationEvaluator来检验模型的表现,rawPredictionCol可以是由评估器产生的rawPrediction列,也可以是probability列# In[44]:evaluator1 = evaluation.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT')evaluator2 = evaluation.BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='INFANT_ALIVE_AT_REPORT')# 查看最后表现性能,关于ROC和PR相关概念,需要读者自行查阅,后续作者会推出一些理论知识作为补充# In[48]:print(evaluator1.evaluate(test_model,{evaluator1.metricName:'areaUnderROC'}))print(evaluator1.evaluate(test_model,{evaluator1.metricName:'areaUnderPR'}))print("====================================================================")print(evaluator2.evaluate(test_model,{evaluator2.metricName:'areaUnderROC'}))print(evaluator2.evaluate(test_model,{evaluator2.metricName:'areaUnderPR'}))0.73645494028229950.7082584952026181====================================================================0.73645494028229950.7082584952026181
# # 8 保存模型# 可以保存管道、转换器和评估器以备再用
# ## 8.1 保存管道# In[49]:pipelinePath = './encoder_featureCreator_logistic_Pipeline'pipeline.write().overwrite().save(pipelinePath)
# ## 8.2 保存模型# In[50]:from pyspark.ml import PipelineModelmodelPath = './logistic_PipelineModel'model.write().overwrite().save(modelPath)
# ## 8.3 以后如果想要管道和模型,可以直接加载并.fit()预测# In[51]:loadedPipeline = Pipeline.load(pipelinePath)loadedPipeline.fit(births_train).transform(births_test).take(1)[Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]# In[52]:loadedPipelineModel = PipelineModel.load(modelPath)loadedPipelineModel.transform(births_test).take(1)[Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]
# # 9 超参调优(耗时)# 第一个模型几乎不可能是我们想要的最佳模型,超参调优是寻找最佳模型的科学方法## 超参调优:找到模型的最佳参数,例如找到逻辑回归模型所需要的最大迭代次数、决策树的最大深度等等。## grid search和train-validation splitting是超参调优的常用方法
# ## 9.1 网格搜索法# 根据给定评估指标,循环遍历定义的参数值列表,估计单独的模型,从而选择一个最佳的模型;## 当然如果定义优化的参数较多或者参数的值太多,需要耗费大量的时间才能选择除最佳模型。## 例如:两个参数,每个参数有两个值,则需要拟合4个模型(指数级增长)# In[53]:from pyspark.ml import tuning# 指定模型和要循环遍历的参数列表logistic = classification.LogisticRegression(labelCol = 'INFANT_ALIVE_AT_REPORT')grid = tuning.ParamGridBuilder().addGrid(logistic.maxIter,[2,10,50]).addGrid(logistic.regParam,[0.01,0.05,0.3]).build()# In[54]:# 某种比较模型的方法,使用CrossValidator需要评估器、estimatorParamMaps和evaluator。# 该模型循环遍历值的网格,评估各个模型,并使用evaluator比较其性能。evaluator = evaluation.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT')cv = tuning.CrossValidator(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator)# In[55]:# 创建一个只用于转换的管道pipeline = Pipeline(stages=[encoder,featureCreator])data_transformer = pipeline.fit(births_train)# In[56]:# 寻找模型的最佳组合参数,cvModel将返回估计的最佳模型cvModel = cv.fit(data_transformer.transform(births_train))# In[58]:# 超参调优,性能稍微有所改善data_train = data_transformer.transform(births_test)results = cvModel.transform(data_train)print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderROC'}))print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderPR'}))0.73759942848952420.7102487377506715# In[60]:# 最佳模型的参数提取results = [([{key.name: paramValue}for key, paramValuein zip(params.keys(),params.values())], metric)for params, metricin zip(cvModel.getEstimatorParamMaps(),cvModel.avgMetrics)]sorted(results,key=lambda el: el[1],reverse=True)[0]([{'regParam': 0.01}, {'maxIter': 50}], 0.7393782132646929)
# ## 9.2 Train-validation划分# 为了选择最佳模型,TrainValidationSplit模型对输入的数据集(训练集)执行随机划分,划分成两个子集:较小的训练集和测试集。划分仅执行一次。# **使用`ChiSqSelector`只选出前5个特征,以此来限制模型的复杂度**# In[62]:selector = feature.ChiSqSelector(numTopFeatures=5,featuresCol=featureCreator.getOutputCol(),outputCol='selectedFeatures',labelCol='INFANT_ALIVE_AT_REPORT')logistic = classification.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT',featuresCol='selectedFeatures')pipeline = Pipeline(stages=[encoder,featureCreator,selector])data_transformer = pipeline.fit(births_train)# In[63]:tvs = tuning.TrainValidationSplit(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator)# In[64]:# 特征少的模型比完整的模型表现稍差,但是差别不明显。tvsModel = tvs.fit(data_transformer.transform(births_train))data_train = data_transformer.transform(births_test)results = tvsModel.transform(data_train)print(evaluator.evaluate(results,{evaluator.metricName: 'areaUnderROC'}))print(evaluator.evaluate(results,{evaluator.metricName: 'areaUnderPR'}))0.72713777366092680.6992424489743029
# # 10 使用随机森林算法建模# **需要将label特征转化为DoubleType**# In[66]:from pyspark.sql import functionsbirths = births.withColumn('INFANT_ALIVE_AT_REPORT', functions.col('INFANT_ALIVE_AT_REPORT').cast(types.DoubleType()))births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)# In[67]:classifier = classification.RandomForestClassifier(numTrees=5,maxDepth=5,labelCol='INFANT_ALIVE_AT_REPORT')pipeline = Pipeline(stages=[encoder,featureCreator,classifier])model = pipeline.fit(births_train)test = model.transform(births_test)# In[68]:evaluator = evaluation.BinaryClassificationEvaluator(labelCol='INFANT_ALIVE_AT_REPORT')print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderROC"}))print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderPR"}))0.75980749904415180.7440993755919022# In[69]:
# 仅仅用一棵树的表现classifier = classification.DecisionTreeClassifier(maxDepth=5,labelCol='INFANT_ALIVE_AT_REPORT')pipeline = Pipeline(stages=[encoder,featureCreator,classifier])model = pipeline.fit(births_train)test = model.transform(births_test)evaluator = evaluation.BinaryClassificationEvaluator(labelCol='INFANT_ALIVE_AT_REPORT')print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderROC"}))print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderPR"}))0.75217692200060150.7595866540712273
