第7节我们详细介绍了PySpark中机器学习ML模块的各种函数的用法,本节将进行实际的案例分析。

    1. # # 1 加载数据
    2. # In[18]:
    3. from pyspark.sql import types
    4. labels = [
    5. ('INFANT_ALIVE_AT_REPORT', types.DoubleType()),
    6. ('BIRTH_PLACE', types.StringType()),
    7. ('MOTHER_AGE_YEARS', types.DoubleType()),
    8. ('FATHER_COMBINED_AGE', types.DoubleType()),
    9. ('CIG_BEFORE', types.DoubleType()),
    10. ('CIG_1_TRI', types.DoubleType()),
    11. ('CIG_2_TRI', types.DoubleType()),
    12. ('CIG_3_TRI', types.DoubleType()),
    13. ('MOTHER_HEIGHT_IN', types.DoubleType()),
    14. ('MOTHER_PRE_WEIGHT', types.DoubleType()),
    15. ('MOTHER_DELIVERY_WEIGHT', types.DoubleType()),
    16. ('MOTHER_WEIGHT_GAIN', types.DoubleType()),
    17. ('DIABETES_PRE', types.DoubleType()),
    18. ('DIABETES_GEST', types.DoubleType()),
    19. ('HYP_TENS_PRE', types.DoubleType()),
    20. ('HYP_TENS_GEST', types.DoubleType()),
    21. ('PREV_BIRTH_PRETERM', types.DoubleType())
    22. ]
    23. schema = types.StructType([
    24. types.StructField(e[0], e[1], False) for e in labels
    25. ])
    26. births = spark.read.csv('file:///opt/spark-2.2.1-bin-hadoop2.7/test/births_transformed.csv',header = True,schema = schema)
    1. # In[19]:
    2. births.columns
    3. ['INFANT_ALIVE_AT_REPORT',
    4. 'BIRTH_PLACE',
    5. 'MOTHER_AGE_YEARS',
    6. 'FATHER_COMBINED_AGE',
    7. 'CIG_BEFORE',
    8. 'CIG_1_TRI',
    9. 'CIG_2_TRI',
    10. 'CIG_3_TRI',
    11. 'MOTHER_HEIGHT_IN',
    12. 'MOTHER_PRE_WEIGHT',
    13. 'MOTHER_DELIVERY_WEIGHT',
    14. 'MOTHER_WEIGHT_GAIN',
    15. 'DIABETES_PRE',
    16. 'DIABETES_GEST',
    17. 'HYP_TENS_PRE',
    18. 'HYP_TENS_GEST',
    19. 'PREV_BIRTH_PRETERM']
    1. # In[20]:
    2. births.printSchema()
    3. root
    4. |-- INFANT_ALIVE_AT_REPORT: double (nullable = true)
    5. |-- BIRTH_PLACE: string (nullable = true)
    6. |-- MOTHER_AGE_YEARS: double (nullable = true)
    7. |-- FATHER_COMBINED_AGE: double (nullable = true)
    8. |-- CIG_BEFORE: double (nullable = true)
    9. |-- CIG_1_TRI: double (nullable = true)
    10. |-- CIG_2_TRI: double (nullable = true)
    11. |-- CIG_3_TRI: double (nullable = true)
    12. |-- MOTHER_HEIGHT_IN: double (nullable = true)
    13. |-- MOTHER_PRE_WEIGHT: double (nullable = true)
    14. |-- MOTHER_DELIVERY_WEIGHT: double (nullable = true)
    15. |-- MOTHER_WEIGHT_GAIN: double (nullable = true)
    16. |-- DIABETES_PRE: double (nullable = true)
    17. |-- DIABETES_GEST: double (nullable = true)
    18. |-- HYP_TENS_PRE: double (nullable = true)
    19. |-- HYP_TENS_GEST: double (nullable = true)
    20. |-- PREV_BIRTH_PRETERM: double (nullable = true)
    1. # In[21]:
    2. births.select(births['BIRTH_PLACE']).show(3)
    3. +-----------+
    4. |BIRTH_PLACE|
    5. +-----------+
    6. | 1|
    7. | 1|
    8. | 1|
    9. +-----------+
    10. only showing top 3 rows
    1. # # 2 创建转换器
    2. # 在建模前,由于统计模型只能对数值数据做操作,必须对BIRTH_PLACE变量进行编码
    3. #
    4. # 使用OneHotEncoder方法来对BIRTH_PALCE列进行编码,该方法不接受StringType(),只能处理数字类型,首先将该列转换为IntegerType()
    5. # In[22]:
    6. births = births.withColumn('BIRTH_PALCE_INT',births['BIRTH_PLACE'].cast(types.IntegerType()))
    7. births.select(births['BIRTH_PLACE']).show(3)
    8. # In[23]:
    9. births.printSchema()
    10. root
    11. |-- INFANT_ALIVE_AT_REPORT: double (nullable = true)
    12. |-- BIRTH_PLACE: string (nullable = true)
    13. |-- MOTHER_AGE_YEARS: double (nullable = true)
    14. |-- FATHER_COMBINED_AGE: double (nullable = true)
    15. |-- CIG_BEFORE: double (nullable = true)
    16. |-- CIG_1_TRI: double (nullable = true)
    17. |-- CIG_2_TRI: double (nullable = true)
    18. |-- CIG_3_TRI: double (nullable = true)
    19. |-- MOTHER_HEIGHT_IN: double (nullable = true)
    20. |-- MOTHER_PRE_WEIGHT: double (nullable = true)
    21. |-- MOTHER_DELIVERY_WEIGHT: double (nullable = true)
    22. |-- MOTHER_WEIGHT_GAIN: double (nullable = true)
    23. |-- DIABETES_PRE: double (nullable = true)
    24. |-- DIABETES_GEST: double (nullable = true)
    25. |-- HYP_TENS_PRE: double (nullable = true)
    26. |-- HYP_TENS_GEST: double (nullable = true)
    27. |-- PREV_BIRTH_PRETERM: double (nullable = true)
    28. |-- BIRTH_PALCE_INT: integer (nullable = true)
    1. # ## 2.1 创建第一个转换器
    2. # In[25]:
    3. from pyspark.ml import feature
    4. encoder = feature.OneHotEncoder(inputCol='BIRTH_PALCE_INT',outputCol='BIRTH_PALCE_VEC')
    1. # ## 2.2 创建第二个转换器
    2. # 创建一个单一的列,它将所有特征整合在一起,使用VetorAssembler()转换器
    3. #
    4. # 传递给VectorAssembler对象的inputCols参数是一个列表,该列表包含所有要合并在一起以组成outputCol——features的列
    5. # In[27]:
    6. featureCreator = feature.VectorAssembler(inputCols = [col[0] for col in labels[2:]] + [encoder.getOutputCol()],
    7. outputCol = 'features')
    8. # 使用编码器对象的输出(调用.getOutputCol()方法),在任何时候更改了编码器对象中输出列的名称,不必更改此参数的值
    9. # In[29]:
    10. encoder.getOutputCol()
    11. 'BIRTH_PALCE_VEC'
    12. # In[30]:
    13. featureCreator.getOutputCol()
    14. 'features'
    1. # # 3 创建评估器
    2. # 先以逻辑回归模型作为分析
    3. # In[32]:
    4. from pyspark.ml import classification
    5. logistic = classification.LogisticRegression(maxIter=10,regParam=0.01,labelCol='INFANT_ALIVE_AT_REPORT')
    1. # # 4 创建一个管道
    2. # In[34]:
    3. from pyspark.ml import Pipeline
    4. pipeline = Pipeline(stages=[
    5. encoder,
    6. featureCreator,
    7. logistic
    8. ])
    1. # # 5 拟合模型(耗时)
    2. # In[36]:
    3. births_train,births_test = births.randomSplit([0.7,0.3],seed = 666)
    4. model = pipeline.fit(births_train)
    1. # # 6 预测
    2. # In[37]:
    3. test_model = model.transform(births_test)
    4. type(test_model)
    5. pyspark.sql.dataframe.DataFrame
    6. # In[39]:
    7. test_model.take(1)
    8. [Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]
    9. # rawPrediction是特征和系数的线性组合值;
    10. #
    11. # probability是为每个类别计算出的概率;
    12. #
    13. # prediction是最终的类分配
    1. # # 7 评估模型的性能
    2. # In[40]:
    3. from pyspark.ml import evaluation
    4. # 使用BinaryClassificationEvaluator来检验模型的表现,rawPredictionCol可以是由评估器产生的rawPrediction列,也可以是probability列
    5. # In[44]:
    6. evaluator1 = evaluation.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT')
    7. evaluator2 = evaluation.BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='INFANT_ALIVE_AT_REPORT')
    8. # 查看最后表现性能,关于ROC和PR相关概念,需要读者自行查阅,后续作者会推出一些理论知识作为补充
    9. # In[48]:
    10. print(evaluator1.evaluate(test_model,{evaluator1.metricName:'areaUnderROC'}))
    11. print(evaluator1.evaluate(test_model,{evaluator1.metricName:'areaUnderPR'}))
    12. print("====================================================================")
    13. print(evaluator2.evaluate(test_model,{evaluator2.metricName:'areaUnderROC'}))
    14. print(evaluator2.evaluate(test_model,{evaluator2.metricName:'areaUnderPR'}))
    15. 0.7364549402822995
    16. 0.7082584952026181
    17. ====================================================================
    18. 0.7364549402822995
    19. 0.7082584952026181
    1. # # 8 保存模型
    2. # 可以保存管道、转换器和评估器以备再用
    1. # ## 8.1 保存管道
    2. # In[49]:
    3. pipelinePath = './encoder_featureCreator_logistic_Pipeline'
    4. pipeline.write().overwrite().save(pipelinePath)
    1. # ## 8.2 保存模型
    2. # In[50]:
    3. from pyspark.ml import PipelineModel
    4. modelPath = './logistic_PipelineModel'
    5. model.write().overwrite().save(modelPath)
    1. # ## 8.3 以后如果想要管道和模型,可以直接加载并.fit()预测
    2. # In[51]:
    3. loadedPipeline = Pipeline.load(pipelinePath)
    4. loadedPipeline.fit(births_train).transform(births_test).take(1)
    5. [Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]
    6. # In[52]:
    7. loadedPipelineModel = PipelineModel.load(modelPath)
    8. loadedPipelineModel.transform(births_test).take(1)
    9. [Row(INFANT_ALIVE_AT_REPORT=0.0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13.0, FATHER_COMBINED_AGE=99.0, CIG_BEFORE=0.0, CIG_1_TRI=0.0, CIG_2_TRI=0.0, CIG_3_TRI=0.0, MOTHER_HEIGHT_IN=66.0, MOTHER_PRE_WEIGHT=133.0, MOTHER_DELIVERY_WEIGHT=135.0, MOTHER_WEIGHT_GAIN=2.0, DIABETES_PRE=0.0, DIABETES_GEST=0.0, HYP_TENS_PRE=0.0, HYP_TENS_GEST=0.0, PREV_BIRTH_PRETERM=0.0, BIRTH_PALCE_INT=1, BIRTH_PALCE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0758, -1.0758]), probability=DenseVector([0.7457, 0.2543]), prediction=0.0)]
    1. # # 9 超参调优(耗时)
    2. # 第一个模型几乎不可能是我们想要的最佳模型,超参调优是寻找最佳模型的科学方法
    3. #
    4. # 超参调优:找到模型的最佳参数,例如找到逻辑回归模型所需要的最大迭代次数、决策树的最大深度等等。
    5. #
    6. # grid search和train-validation splitting是超参调优的常用方法
    1. # ## 9.1 网格搜索法
    2. # 根据给定评估指标,循环遍历定义的参数值列表,估计单独的模型,从而选择一个最佳的模型;
    3. #
    4. # 当然如果定义优化的参数较多或者参数的值太多,需要耗费大量的时间才能选择除最佳模型。
    5. #
    6. # 例如:两个参数,每个参数有两个值,则需要拟合4个模型(指数级增长)
    7. # In[53]:
    8. from pyspark.ml import tuning
    9. # 指定模型和要循环遍历的参数列表
    10. logistic = classification.LogisticRegression(labelCol = 'INFANT_ALIVE_AT_REPORT')
    11. grid = tuning.ParamGridBuilder().addGrid(logistic.maxIter,[2,10,50]).addGrid(logistic.regParam,[0.01,0.05,0.3]).build()
    12. # In[54]:
    13. # 某种比较模型的方法,使用CrossValidator需要评估器、estimatorParamMaps和evaluator。
    14. # 该模型循环遍历值的网格,评估各个模型,并使用evaluator比较其性能。
    15. evaluator = evaluation.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT')
    16. cv = tuning.CrossValidator(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator)
    17. # In[55]:
    18. # 创建一个只用于转换的管道
    19. pipeline = Pipeline(stages=[encoder,featureCreator])
    20. data_transformer = pipeline.fit(births_train)
    21. # In[56]:
    22. # 寻找模型的最佳组合参数,cvModel将返回估计的最佳模型
    23. cvModel = cv.fit(data_transformer.transform(births_train))
    24. # In[58]:
    25. # 超参调优,性能稍微有所改善
    26. data_train = data_transformer.transform(births_test)
    27. results = cvModel.transform(data_train)
    28. print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderROC'}))
    29. print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderPR'}))
    30. 0.7375994284895242
    31. 0.7102487377506715
    32. # In[60]:
    33. # 最佳模型的参数提取
    34. results = [
    35. (
    36. [
    37. {key.name: paramValue}
    38. for key, paramValue
    39. in zip(
    40. params.keys(),
    41. params.values())
    42. ], metric
    43. )
    44. for params, metric
    45. in zip(
    46. cvModel.getEstimatorParamMaps(),
    47. cvModel.avgMetrics
    48. )
    49. ]
    50. sorted(results,
    51. key=lambda el: el[1],
    52. reverse=True)[0]
    53. ([{'regParam': 0.01}, {'maxIter': 50}], 0.7393782132646929)
    1. # ## 9.2 Train-validation划分
    2. # 为了选择最佳模型,TrainValidationSplit模型对输入的数据集(训练集)执行随机划分,划分成两个子集:较小的训练集和测试集。划分仅执行一次。
    3. # **使用`ChiSqSelector`只选出前5个特征,以此来限制模型的复杂度**
    4. # In[62]:
    5. selector = feature.ChiSqSelector(
    6. numTopFeatures=5,
    7. featuresCol=featureCreator.getOutputCol(),
    8. outputCol='selectedFeatures',
    9. labelCol='INFANT_ALIVE_AT_REPORT'
    10. )
    11. logistic = classification.LogisticRegression(
    12. labelCol='INFANT_ALIVE_AT_REPORT',
    13. featuresCol='selectedFeatures'
    14. )
    15. pipeline = Pipeline(stages=[encoder,featureCreator,selector])
    16. data_transformer = pipeline.fit(births_train)
    17. # In[63]:
    18. tvs = tuning.TrainValidationSplit(
    19. estimator=logistic,
    20. estimatorParamMaps=grid,
    21. evaluator=evaluator
    22. )
    23. # In[64]:
    24. # 特征少的模型比完整的模型表现稍差,但是差别不明显。
    25. tvsModel = tvs.fit(data_transformer.transform(births_train))
    26. data_train = data_transformer.transform(births_test)
    27. results = tvsModel.transform(data_train)
    28. print(evaluator.evaluate(results,{evaluator.metricName: 'areaUnderROC'}))
    29. print(evaluator.evaluate(results,{evaluator.metricName: 'areaUnderPR'}))
    30. 0.7271377736609268
    31. 0.6992424489743029
    1. # # 10 使用随机森林算法建模
    2. # **需要将label特征转化为DoubleType**
    3. # In[66]:
    4. from pyspark.sql import functions
    5. births = births.withColumn('INFANT_ALIVE_AT_REPORT', functions.col('INFANT_ALIVE_AT_REPORT').cast(types.DoubleType()))
    6. births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)
    7. # In[67]:
    8. classifier = classification.RandomForestClassifier(
    9. numTrees=5,
    10. maxDepth=5,
    11. labelCol='INFANT_ALIVE_AT_REPORT')
    12. pipeline = Pipeline(
    13. stages=[
    14. encoder,
    15. featureCreator,
    16. classifier])
    17. model = pipeline.fit(births_train)
    18. test = model.transform(births_test)
    19. # In[68]:
    20. evaluator = evaluation.BinaryClassificationEvaluator(labelCol='INFANT_ALIVE_AT_REPORT')
    21. print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderROC"}))
    22. print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderPR"}))
    23. 0.7598074990441518
    24. 0.7440993755919022
    25. # In[69]:
    1. # 仅仅用一棵树的表现
    2. classifier = classification.DecisionTreeClassifier(
    3. maxDepth=5,
    4. labelCol='INFANT_ALIVE_AT_REPORT')
    5. pipeline = Pipeline(stages=[
    6. encoder,
    7. featureCreator,
    8. classifier]
    9. )
    10. model = pipeline.fit(births_train)
    11. test = model.transform(births_test)
    12. evaluator = evaluation.BinaryClassificationEvaluator(labelCol='INFANT_ALIVE_AT_REPORT')
    13. print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderROC"}))
    14. print(evaluator.evaluate(test,{evaluator.metricName: "areaUnderPR"}))
    15. 0.7521769220006015
    16. 0.7595866540712273