在本文中,我们将讨论构建完整的机器学习管道。第一部分将侧重于在标准批处理模式下训练二元分类器,在第二部分中我们将进行一些实时预测。我们将使用来自泰坦尼克号的数据:机器学习灾难中的众多Kaggle比赛之一。在开始之前,请知道您应该熟悉Scala,Apache Spark和Xgboost。所有源代码也将在Github上提供。很酷,现在让我们开始吧!
训练
我们将使用Spark中的ML管道训练XGBoost分类器。分类器将保存为输出,并将在Spark Structured Streaming实时应用程序中用于预测新的测试数据。
第1步:启动spark会话
我们正在创建一个将在本地运行的spark应用程序,并将使用与使用的核心一样多的线程local[*]
:
val spark = SparkSession.builder()
.appName("Spark XGBOOST Titanic Training")
.master("local[*]")
.getOrCreate()
第2步:定义架构
接下来,我们定义从csv读取的数据的模式。这通常比让火花推断模式更好,因为它消耗的资源更少,我们可以完全控制字段。
val schema = StructType(
Array(StructField("PassengerId", DoubleType),
StructField("Survival", DoubleType),
StructField("Pclass", DoubleType),
StructField("Name", StringType),
StructField("Sex", StringType),
StructField("Age", DoubleType),
StructField("SibSp", DoubleType),
StructField("Parch", DoubleType),
StructField("Ticket", StringType),
StructField("Fare", DoubleType),
StructField("Cabin", StringType),
StructField("Embarked", StringType)
))
第3步:读取数据
我们把csv读成a DataFrame
,确保我们提到我们有一个标题。
val df_raw = spark
.read
.option("header", "true")
.schema(schema)
.csv(filePath)
第4步:删除空值
所有空值都替换为0.这不是理想的,但是对于本教程的目的,它是可以的。
val df = df_raw.na.fill(0)
第5步:将名义值转换为数字
在浏览此步骤的代码之前,让我们简要介绍一些Spark ML概念。他们介绍了ML管道的概念,它是一组构建在其上的高级API DataFrames
,可以更轻松地将多个算法组合到一个流程中。管道的主要元素是Transformer
和Estimator
。第一个可以表示可以将a DataFrame
转换为另一个DataFrame
的算法,而后者是可以适合a DataFrame
来生成a 的算法Transformer
。
为了将名义值转换为数字值,我们需要Transformer
为每列定义一个:
val sexIndexer = new StringIndexer()
.setInputCol("Sex")
.setOutputCol("SexIndex")
.setHandleInvalid("keep")
val cabinIndexer = new StringIndexer()
.setInputCol("Cabin")
.setOutputCol("CabinIndex")
.setHandleInvalid("keep")
val embarkedIndexer = new StringIndexer()
.setInputCol("Embarked")
.setOutputCol("EmbarkedIndex")
.setHandleInvalid("keep")
我们正在使用它StringIndexer
来转换价值观。对于每个Transformer
我们定义的输入列和输出列将包含修改后的值。
步骤6:将列组合成特征向量
我们将使用另一个Transformer
将XGBoost分类中使用的列组合Estimator
成一个向量:
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("Pclass", "SexIndex", "Age", "SibSp", "Parch", "Fare", "CabinIndex", "EmbarkedIndex"))
.setOutputCol("features")
第7步:添加XGBoost估算器
定义Estimator
它将产生模型。可以在地图中定义估计器的设置。我们还可以设置功能和标签列:
val xgbEstimator = new XGBoostEstimator(Map[String, Any]("num_rounds" -> 100))
.setFeaturesCol("features")
.setLabelCol("Survival")
第8步:构建管道和分类器
在我们创建了所有单独的步骤之后,我们可以定义实际的管道和操作的顺序:
val pipeline = new Pipeline().setStages(Array(sexIndexer, cabinIndexer, embarkedIndexer, vectorAssembler, xgbEstimator))
输入DataFrame
将被多次转换,最终将生成使用我们的数据训练的模型。我们将保存输出,以便在第二个实时应用程序中使用它。
val cvModel = pipeline.fit(df)
cvModel.write.overwrite.save(modelPath)
预测
我们将使用Spark Structured Streaming来基本传输文件中的数据。在现实世界的应用程序中,我们从专用的分布式队列(例如Apache Kafka或AWS Kinesis)读取数据,但是对于此演示,我们将只使用一个简单的文件。
简要描述Spark Structured Streaming是一个构建在Spark SQL之上的流处理引擎。它使用相同的概念,DataFrames
数据存储在一个无限制的表中,当数据流入时,该表随着新行的增长而增长。
第1步:创建输入读取流
我们再次创建一个spark会话并定义数据的架构。请注意,测试csv不包含标签Survival
。最后我们可以创建输入流DataFrame,
df
。输入路径必须是我们存储csv文件的目录。它可以包含一个或多个具有相同模式的文件。
val spark: SparkSession = SparkSession.builder()
.appName("Spark Structured Streaming XGBOOST")
.master("local[*]")
.getOrCreate()
val schema = StructType(
Array(StructField("PassengerId", DoubleType),
StructField("Pclass", DoubleType),
StructField("Name", StringType),
StructField("Sex", StringType),
StructField("Age", DoubleType),
StructField("SibSp", DoubleType),
StructField("Parch", DoubleType),
StructField("Ticket", StringType),
StructField("Fare", DoubleType),
StructField("Cabin", StringType),
StructField("Embarked", StringType)
))
val df = spark
.readStream
.option("header", "true")
.schema(schema)
.csv(fileDir)
第2步:加载XGBoost模型
在对象中,XGBoostModel
我们加载预先训练的模型,该模型将应用于我们在流中读取的每一批新行。
object XGBoostModel {
private val modelPath = "your_path"
private val model = PipelineModel.read.load(modelPath)
def transform(df: DataFrame) = {
// replace nan values with 0
val df_clean = df.na.fill(0)
// run the model on new data
val result = model.transform(df_clean)
// display the results
result.show()
}
}
第3步:定义自定义ML接收器
为了能够将我们的分类器应用于新数据,我们需要创建一个新的接收器(流和输出之间的接口,在我们的例子中是XGBoost模型)。为此,我们需要一个自定义接收器(MLSink
),一个抽象接收器提供器(MLSinkProvider
)和provider()的实现XGBoostMLSinkProvider
。
abstract class MLSinkProvider extends StreamSinkProvider {
def process(df: DataFrame): Unit
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): MLSink = {
new MLSink(process)
}
}
case class MLSink(process: DataFrame => Unit) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
process(data)
}
}
class XGBoostMLSinkProvider extends MLSinkProvider {
override def process(df: DataFrame) {
XGBoostModel.transform(df)
}
}
第4步:在我们的自定义接收器中写入数据
最后一步是定义一个将数据写入自定义接收器的查询。还必须定义检查点位置,以便应用程序“记住”在发生故障时在流中读取的最新行。如果我们运行程序,每个新批次的数据将显示在控制台上,同时包含预测的标签。
df.writeStream
.format("titanic.XGBoostMLSinkProvider")
.queryName("XGBoostQuery")
.option("checkpointLocation", checkpoint_location)
.start()