在本文中,我们将讨论构建完整的机器学习管道。第一部分将侧重于在标准批处理模式下训练二元分类器,在第二部分中我们将进行一些实时预测。我们将使用来自泰坦尼克号的数据:机器学习灾难中的众多Kaggle比赛之一。在开始之前,请知道您应该熟悉Scala,Apache Spark和Xgboost。所有源代码也将在Github上提供。很酷,现在让我们开始吧!
训练
我们将使用Spark中的ML管道训练XGBoost分类器。分类器将保存为输出,并将在Spark Structured Streaming实时应用程序中用于预测新的测试数据。
第1步:启动spark会话
我们正在创建一个将在本地运行的spark应用程序,并将使用与使用的核心一样多的线程local[*] :
val spark = SparkSession.builder().appName("Spark XGBOOST Titanic Training").master("local[*]").getOrCreate()
第2步:定义架构
接下来,我们定义从csv读取的数据的模式。这通常比让火花推断模式更好,因为它消耗的资源更少,我们可以完全控制字段。
val schema = StructType(Array(StructField("PassengerId", DoubleType),StructField("Survival", DoubleType),StructField("Pclass", DoubleType),StructField("Name", StringType),StructField("Sex", StringType),StructField("Age", DoubleType),StructField("SibSp", DoubleType),StructField("Parch", DoubleType),StructField("Ticket", StringType),StructField("Fare", DoubleType),StructField("Cabin", StringType),StructField("Embarked", StringType)))
第3步:读取数据
我们把csv读成a DataFrame,确保我们提到我们有一个标题。
val df_raw = spark.read.option("header", "true").schema(schema).csv(filePath)
第4步:删除空值
所有空值都替换为0.这不是理想的,但是对于本教程的目的,它是可以的。
val df = df_raw.na.fill(0)
第5步:将名义值转换为数字
在浏览此步骤的代码之前,让我们简要介绍一些Spark ML概念。他们介绍了ML管道的概念,它是一组构建在其上的高级API DataFrames,可以更轻松地将多个算法组合到一个流程中。管道的主要元素是Transformer和Estimator。第一个可以表示可以将a DataFrame转换为另一个DataFrame的算法,而后者是可以适合a DataFrame来生成a 的算法Transformer 。
为了将名义值转换为数字值,我们需要Transformer为每列定义一个:
val sexIndexer = new StringIndexer().setInputCol("Sex").setOutputCol("SexIndex").setHandleInvalid("keep")val cabinIndexer = new StringIndexer().setInputCol("Cabin").setOutputCol("CabinIndex").setHandleInvalid("keep")val embarkedIndexer = new StringIndexer().setInputCol("Embarked").setOutputCol("EmbarkedIndex").setHandleInvalid("keep")
我们正在使用它StringIndexer来转换价值观。对于每个Transformer我们定义的输入列和输出列将包含修改后的值。
步骤6:将列组合成特征向量
我们将使用另一个Transformer将XGBoost分类中使用的列组合Estimator成一个向量:
val vectorAssembler = new VectorAssembler().setInputCols(Array("Pclass", "SexIndex", "Age", "SibSp", "Parch", "Fare", "CabinIndex", "EmbarkedIndex")).setOutputCol("features")
第7步:添加XGBoost估算器
定义Estimator它将产生模型。可以在地图中定义估计器的设置。我们还可以设置功能和标签列:
val xgbEstimator = new XGBoostEstimator(Map[String, Any]("num_rounds" -> 100)).setFeaturesCol("features").setLabelCol("Survival")
第8步:构建管道和分类器
在我们创建了所有单独的步骤之后,我们可以定义实际的管道和操作的顺序:
val pipeline = new Pipeline().setStages(Array(sexIndexer, cabinIndexer, embarkedIndexer, vectorAssembler, xgbEstimator))

输入DataFrame将被多次转换,最终将生成使用我们的数据训练的模型。我们将保存输出,以便在第二个实时应用程序中使用它。
val cvModel = pipeline.fit(df)cvModel.write.overwrite.save(modelPath)
预测
我们将使用Spark Structured Streaming来基本传输文件中的数据。在现实世界的应用程序中,我们从专用的分布式队列(例如Apache Kafka或AWS Kinesis)读取数据,但是对于此演示,我们将只使用一个简单的文件。
简要描述Spark Structured Streaming是一个构建在Spark SQL之上的流处理引擎。它使用相同的概念,DataFrames数据存储在一个无限制的表中,当数据流入时,该表随着新行的增长而增长。
第1步:创建输入读取流
我们再次创建一个spark会话并定义数据的架构。请注意,测试csv不包含标签Survival 。最后我们可以创建输入流DataFrame, df。输入路径必须是我们存储csv文件的目录。它可以包含一个或多个具有相同模式的文件。
val spark: SparkSession = SparkSession.builder().appName("Spark Structured Streaming XGBOOST").master("local[*]").getOrCreate()val schema = StructType(Array(StructField("PassengerId", DoubleType),StructField("Pclass", DoubleType),StructField("Name", StringType),StructField("Sex", StringType),StructField("Age", DoubleType),StructField("SibSp", DoubleType),StructField("Parch", DoubleType),StructField("Ticket", StringType),StructField("Fare", DoubleType),StructField("Cabin", StringType),StructField("Embarked", StringType)))val df = spark.readStream.option("header", "true").schema(schema).csv(fileDir)
第2步:加载XGBoost模型
在对象中,XGBoostModel我们加载预先训练的模型,该模型将应用于我们在流中读取的每一批新行。
object XGBoostModel {private val modelPath = "your_path"private val model = PipelineModel.read.load(modelPath)def transform(df: DataFrame) = {// replace nan values with 0val df_clean = df.na.fill(0)// run the model on new dataval result = model.transform(df_clean)// display the resultsresult.show()}}
第3步:定义自定义ML接收器
为了能够将我们的分类器应用于新数据,我们需要创建一个新的接收器(流和输出之间的接口,在我们的例子中是XGBoost模型)。为此,我们需要一个自定义接收器(MLSink),一个抽象接收器提供器(MLSinkProvider)和provider()的实现XGBoostMLSinkProvider。
abstract class MLSinkProvider extends StreamSinkProvider {def process(df: DataFrame): Unitdef createSink(sqlContext: SQLContext,parameters: Map[String, String],partitionColumns: Seq[String],outputMode: OutputMode): MLSink = {new MLSink(process)}}case class MLSink(process: DataFrame => Unit) extends Sink {override def addBatch(batchId: Long, data: DataFrame): Unit = {process(data)}}class XGBoostMLSinkProvider extends MLSinkProvider {override def process(df: DataFrame) {XGBoostModel.transform(df)}}
第4步:在我们的自定义接收器中写入数据
最后一步是定义一个将数据写入自定义接收器的查询。还必须定义检查点位置,以便应用程序“记住”在发生故障时在流中读取的最新行。如果我们运行程序,每个新批次的数据将显示在控制台上,同时包含预测的标签。
df.writeStream.format("titanic.XGBoostMLSinkProvider").queryName("XGBoostQuery").option("checkpointLocation", checkpoint_location).start()
