在本教程中,我们将讨论使用标准机器学习管道集成PySpark和XGBoost。我们将使用来自泰坦尼克号的数据:机器学习灾难中的众多Kaggle比赛之一。在开始之前,请知道您应该熟悉Apache Spark和Xgboost以及Python。本教程中使用的代码可以在github上的Jupyther笔记本中找到。
第1步:下载或构建XGBoost jar
python代码需要两个scala jar依赖项才能工作。您可以直接从maven下载它们:
如果您希望自己构建它们,可以从我之前的教程中找到如何进行构建。
第2步:下载XGBoost python包装器
您可以从此处下载PySpark XGBoost代码。这是我们要编写的部分和XGBoost scala实现之间的接口。我们将在本教程后面的代码中看到它如何集成。
第3步:启动一个新的Jupyter笔记本
我们将开始一个新的笔记本,以便能够编写我们的代码:
jupyter notebook
第4步:将自定义XGBoost jar添加到Spark应用程序
在开始Spark之前,我们需要添加我们之前下载的jar。我们可以使用--jars
标志来做到这一点:
import os
os.environ ['PYSPARK_SUBMIT_ARGS'] =' - jars xgboost4j-spark-0.72.jar,xgboost4j-0.72.jar pyspark-shell'
第5步:将PySpark集成到Jupyther笔记本中
使PySpark可用的最简单方法是使用该[findspark](https://github.com/minrk/findspark)
软件包:
import findspark
findspark.init()
第6步:启动spark会话
我们现在准备开始火花会议。我们正在创建一个将在本地运行的spark应用程序,并将使用与使用的核心一样多的线程local[*]
:
spark = SparkSession\
.builder\
.appName("PySpark XGBOOST Titanic")\
.master("local[*]")\
.getOrCreate()
第7步:添加PySpark XGBoost包装器代码
正如我们现在有了spark会话,我们可以添加先前下载的包装器代码:
spark.sparkContext.addPyFile("YOUR_PATH/sparkxgb.zip")
第8步:定义架构
接下来,我们定义从csv读取的数据的模式。这通常比让火花推断模式更好,因为它消耗的资源更少,我们可以完全控制字段。
schema = StructType(
[StructField("PassengerId", DoubleType()),
StructField("Survival", DoubleType()),
StructField("Pclass", DoubleType()),
StructField("Name", StringType()),
StructField("Sex", StringType()),
StructField("Age", DoubleType()),
StructField("SibSp", DoubleType()),
StructField("Parch", DoubleType()),
StructField("Ticket", StringType()),
StructField("Fare", DoubleType()),
StructField("Cabin", StringType()),
StructField("Embarked", StringType())
])
步骤9:将csv数据读入数据帧
我们将csv读入a DataFrame
,确保我们提到我们有一个标题,我们也null
用0 替换值:
df_raw = spark\
.read\
.option("header", "true")\
.schema(schema)\
.csv("YOUR_PATH/train.csv")
df = df_raw.na.fill(0)
步骤10:C 将标称值转换为数字
在浏览此步骤的代码之前,让我们简要介绍一些Spark ML概念。他们介绍了ML管道的概念,它是一组构建在其上的高级API DataFrames
,可以更轻松地将多个算法组合到一个流程中。管道的主要元素是Transformer
和Estimator
。第一个可以表示可以将a DataFrame
转换为另一个DataFrame
的算法,而后者是可以适合a DataFrame
来生成a 的算法Transformer
。
为了将名义值转换为数字值,我们需要Transformer
为每列定义一个:
sexIndexer = StringIndexer()\
.setInputCol("Sex")\
.setOutputCol("SexIndex")\
.setHandleInvalid("keep")
cabinIndexer = StringIndexer()\
.setInputCol("Cabin")\
.setOutputCol("CabinIndex")\
.setHandleInvalid("keep")
embarkedIndexer = StringIndexer()\
.setInputCol("Embarked")\
.setOutputCol("EmbarkedIndex")\
.setHandleInvalid("keep")
我们正在使用它StringIndexer
来转换价值观。对于每个Transformer
我们定义的输入列和输出列将包含修改后的值。
步骤11:将列组合成特征向量
我们将使用另一个Transformer
将XGBoost分类中使用的列组合Estimator
成一个向量:
vectorAssembler = VectorAssembler()\
.setInputCols(["Pclass", "SexIndex", "Age", "SibSp", "Parch", "Fare", "CabinIndex", "EmbarkedIndex"])\
.setOutputCol("features")
第12步:定义XGBoostEstimator
在这一步中,我们定义了Estimator
将产生模型的东西。这里使用的大多数参数都是默认的:
xgboost = XGBoostEstimator(
featuresCol="features",
labelCol="Survival",
predictionCol="prediction"
)
我们只定义feature, label
(必须匹配来自的列DataFrame
)和prediction
包含分类器输出的新列。
步骤13:建立管道和分类器
在我们创建了所有单独的步骤之后,我们可以定义实际的管道和操作的顺序:
pipeline = Pipeline().setStages([sexIndexer, cabinIndexer, embarkedIndexer, vectorAssembler, xgboost])
输入DataFrame
将被多次转换,最终将生成使用我们的数据训练的模型。
步骤14:训练模型并预测新的测试数据
我们首先将数据分成火车和测试,然后我们将模型与火车数据拟合,最后我们看到我们为每位乘客获得的预测:
trainDF, testDF = df.randomSplit([0.8, 0.2], seed=24)
model = pipeline.fit(trainDF)
model.transform(testDF).select(col("PassengerId"), col("prediction")).show()