在本教程中,我们将讨论使用标准机器学习管道集成PySpark和XGBoost。我们将使用来自泰坦尼克号的数据:机器学习灾难中的众多Kaggle比赛之一。在开始之前,请知道您应该熟悉Apache SparkXgboost以及Python。本教程中使用的代码可以在github上的Jupyther笔记本中找到。

第1步:下载或构建XGBoost jar

python代码需要两个scala jar依赖项才能工作。您可以直接从maven下载它们:

如果您希望自己构建它们,可以从我之前的教程中找到如何进行构建。

第2步:下载XGBoost python包装器

您可以从此处下载PySpark XGBoost代码。这是我们要编写的部分和XGBoost scala实现之间的接口。我们将在本教程后面的代码中看到它如何集成。

第3步:启动一个新的Jupyter笔记本

我们将开始一个新的笔记本,以便能够编写我们的代码:

  1. jupyter notebook

第4步:将自定义XGBoost jar添加到Spark应用程序

在开始Spark之前,我们需要添加我们之前下载的jar。我们可以使用--jars标志来做到这一点:

  1. import os
  2. os.environ ['PYSPARK_SUBMIT_ARGS'] =' - jars xgboost4j-spark-0.72.jar,xgboost4j-0.72.jar pyspark-shell'

第5步:将PySpark集成到Jupyther笔记本中

使PySpark可用的最简单方法是使用该[findspark](https://github.com/minrk/findspark)软件包:

  1. import findspark
  2. findspark.init()

第6步:启动spark会话

我们现在准备开始火花会议。我们正在创建一个将在本地运行的spark应用程序,并将使用与使用的核心一样多的线程local[*]

  1. spark = SparkSession\
  2. .builder\
  3. .appName("PySpark XGBOOST Titanic")\
  4. .master("local[*]")\
  5. .getOrCreate()

第7步:添加PySpark XGBoost包装器代码

正如我们现在有了spark会话,我们可以添加先前下载的包装器代码:

  1. spark.sparkContext.addPyFile("YOUR_PATH/sparkxgb.zip")

第8步:定义架构

接下来,我们定义从csv读取的数据的模式。这通常比让火花推断模式更好,因为它消耗的资源更少,我们可以完全控制字段。

  1. schema = StructType(
  2. [StructField("PassengerId", DoubleType()),
  3. StructField("Survival", DoubleType()),
  4. StructField("Pclass", DoubleType()),
  5. StructField("Name", StringType()),
  6. StructField("Sex", StringType()),
  7. StructField("Age", DoubleType()),
  8. StructField("SibSp", DoubleType()),
  9. StructField("Parch", DoubleType()),
  10. StructField("Ticket", StringType()),
  11. StructField("Fare", DoubleType()),
  12. StructField("Cabin", StringType()),
  13. StructField("Embarked", StringType())
  14. ])

步骤9:将csv数据读入数据帧

我们将csv读入a DataFrame,确保我们提到我们有一个标题,我们也null用0 替换值:

  1. df_raw = spark\
  2. .read\
  3. .option("header", "true")\
  4. .schema(schema)\
  5. .csv("YOUR_PATH/train.csv")
  6. df = df_raw.na.fill(0)

步骤10:C 将标称值转换为数字

在浏览此步骤的代码之前,让我们简要介绍一些Spark ML概念。他们介绍了ML管道的概念,它是一组构建在其上的高级API DataFrames,可以更轻松地将多个算法组合到一个流程中。管道的主要元素是TransformerEstimator。第一个可以表示可以将a DataFrame转换为另一个DataFrame的算法,而后者是可以适合a DataFrame来生成a 的算法Transformer
为了将名义值转换为数字值,我们需要Transformer为每列定义一个:

  1. sexIndexer = StringIndexer()\
  2. .setInputCol("Sex")\
  3. .setOutputCol("SexIndex")\
  4. .setHandleInvalid("keep")
  5. cabinIndexer = StringIndexer()\
  6. .setInputCol("Cabin")\
  7. .setOutputCol("CabinIndex")\
  8. .setHandleInvalid("keep")
  9. embarkedIndexer = StringIndexer()\
  10. .setInputCol("Embarked")\
  11. .setOutputCol("EmbarkedIndex")\
  12. .setHandleInvalid("keep")

我们正在使用它StringIndexer来转换价值观。对于每个Transformer我们定义的输入列和输出列将包含修改后的值。

步骤11:将列组合成特征向量

我们将使用另一个Transformer将XGBoost分类中使用的列组合Estimator成一个向量:

  1. vectorAssembler = VectorAssembler()\
  2. .setInputCols(["Pclass", "SexIndex", "Age", "SibSp", "Parch", "Fare", "CabinIndex", "EmbarkedIndex"])\
  3. .setOutputCol("features")

第12步:定义XGBoostEstimator

在这一步中,我们定义了Estimator将产生模型的东西。这里使用的大多数参数都是默认的:

  1. xgboost = XGBoostEstimator(
  2. featuresCol="features",
  3. labelCol="Survival",
  4. predictionCol="prediction"
  5. )

我们只定义feature, label(必须匹配来自的列DataFrame)和prediction包含分类器输出的新列。

步骤13:建立管道和分类器

在我们创建了所有单独的步骤之后,我们可以定义实际的管道和操作的顺序:

  1. pipeline = Pipeline().setStages([sexIndexer, cabinIndexer, embarkedIndexer, vectorAssembler, xgboost])

输入DataFrame将被多次转换,最终将生成使用我们的数据训练的模型。

步骤14:训练模型并预测新的测试数据

我们首先将数据分成火车和测试,然后我们将模型与火车数据拟合,最后我们看到我们为每位乘客获得的预测:

  1. trainDF, testDF = df.randomSplit([0.8, 0.2], seed=24)
  2. model = pipeline.fit(trainDF)
  3. model.transform(testDF).select(col("PassengerId"), col("prediction")).show()