在本文中,我们将讨论构建完整的机器学习管道。第一部分将侧重于在标准批处理模式下训练二元分类器,在第二部分中我们将进行一些实时预测。我们将使用来自泰坦尼克号的数据:机器学习灾难中的众多Kaggle比赛之一。在开始之前,请知道您应该熟悉ScalaApache SparkXgboost。所有源代码也将在Github上提供。很酷,现在让我们开始吧!

训练

我们将使用Spark中的ML管道训练XGBoost分类器。分类器将保存为输出,并将在Spark Structured Streaming实时应用程序中用于预测新的测试数据。
第1步:启动spark会话
我们正在创建一个将在本地运行的spark应用程序,并将使用与使用的核心一样多的线程local[*]

  1. val spark = SparkSession.builder()
  2. .appName("Spark XGBOOST Titanic Training")
  3. .master("local[*]")
  4. .getOrCreate()

第2步:定义架构
接下来,我们定义从csv读取的数据的模式。这通常比让火花推断模式更好,因为它消耗的资源更少,我们可以完全控制字段。

  1. val schema = StructType(
  2. Array(StructField("PassengerId", DoubleType),
  3. StructField("Survival", DoubleType),
  4. StructField("Pclass", DoubleType),
  5. StructField("Name", StringType),
  6. StructField("Sex", StringType),
  7. StructField("Age", DoubleType),
  8. StructField("SibSp", DoubleType),
  9. StructField("Parch", DoubleType),
  10. StructField("Ticket", StringType),
  11. StructField("Fare", DoubleType),
  12. StructField("Cabin", StringType),
  13. StructField("Embarked", StringType)
  14. ))

第3步:读取数据
我们把csv读成a DataFrame,确保我们提到我们有一个标题。

  1. val df_raw = spark
  2. .read
  3. .option("header", "true")
  4. .schema(schema)
  5. .csv(filePath)

第4步:删除空值
所有空值都替换为0.这不是理想的,但是对于本教程的目的,它是可以的。

  1. val df = df_raw.na.fill(0)

第5步:将名义值转换为数字
在浏览此步骤的代码之前,让我们简要介绍一些Spark ML概念。他们介绍了ML管道的概念,它是一组构建在其上的高级API DataFrames,可以更轻松地将多个算法组合到一个流程中。管道的主要元素是TransformerEstimator。第一个可以表示可以将a DataFrame转换为另一个DataFrame的算法,而后者是可以适合a DataFrame来生成a 的算法Transformer
为了将名义值转换为数字值,我们需要Transformer为每列定义一个:

  1. val sexIndexer = new StringIndexer()
  2. .setInputCol("Sex")
  3. .setOutputCol("SexIndex")
  4. .setHandleInvalid("keep")
  5. val cabinIndexer = new StringIndexer()
  6. .setInputCol("Cabin")
  7. .setOutputCol("CabinIndex")
  8. .setHandleInvalid("keep")
  9. val embarkedIndexer = new StringIndexer()
  10. .setInputCol("Embarked")
  11. .setOutputCol("EmbarkedIndex")
  12. .setHandleInvalid("keep")

我们正在使用它StringIndexer来转换价值观。对于每个Transformer我们定义的输入列和输出列将包含修改后的值。
步骤6:将列组合成特征向量
我们将使用另一个Transformer将XGBoost分类中使用的列组合Estimator成一个向量:

  1. val vectorAssembler = new VectorAssembler()
  2. .setInputCols(Array("Pclass", "SexIndex", "Age", "SibSp", "Parch", "Fare", "CabinIndex", "EmbarkedIndex"))
  3. .setOutputCol("features")

第7步:添加XGBoost估算器
定义Estimator它将产生模型。可以在地图中定义估计器的设置。我们还可以设置功能和标签列:

  1. val xgbEstimator = new XGBoostEstimator(Map[String, Any]("num_rounds" -> 100))
  2. .setFeaturesCol("features")
  3. .setLabelCol("Survival")

第8步:构建管道和分类器
在我们创建了所有单独的步骤之后,我们可以定义实际的管道和操作的顺序:

  1. val pipeline = new Pipeline().setStages(Array(sexIndexer, cabinIndexer, embarkedIndexer, vectorAssembler, xgbEstimator))

1_5GXouKuoLYmdTCgu1onrzw.png
输入DataFrame将被多次转换,最终将生成使用我们的数据训练的模型。我们将保存输出,以便在第二个实时应用程序中使用它。

  1. val cvModel = pipeline.fit(df)
  2. cvModel.write.overwrite.save(modelPath)

预测

我们将使用Spark Structured Streaming来基本传输文件中的数据。在现实世界的应用程序中,我们从专用的分布式队列(例如Apache Kafka或AWS Kinesis)读取数据,但是对于此演示,我们将只使用一个简单的文件。
简要描述Spark Structured Streaming是一个构建在Spark SQL之上的流处理引擎。它使用相同的概念,DataFrames数据存储在一个无限制的表中,当数据流入时,该表随着新行的增长而增长。
第1步:创建输入读取流
我们再次创建一个spark会话并定义数据的架构。请注意,测试csv不包含标签Survival 。最后我们可以创建输入流DataFrame, df。输入路径必须是我们存储csv文件的目录。它可以包含一个或多个具有相同模式的文件。

  1. val spark: SparkSession = SparkSession.builder()
  2. .appName("Spark Structured Streaming XGBOOST")
  3. .master("local[*]")
  4. .getOrCreate()
  5. val schema = StructType(
  6. Array(StructField("PassengerId", DoubleType),
  7. StructField("Pclass", DoubleType),
  8. StructField("Name", StringType),
  9. StructField("Sex", StringType),
  10. StructField("Age", DoubleType),
  11. StructField("SibSp", DoubleType),
  12. StructField("Parch", DoubleType),
  13. StructField("Ticket", StringType),
  14. StructField("Fare", DoubleType),
  15. StructField("Cabin", StringType),
  16. StructField("Embarked", StringType)
  17. ))
  18. val df = spark
  19. .readStream
  20. .option("header", "true")
  21. .schema(schema)
  22. .csv(fileDir)

第2步:加载XGBoost模型
在对象中,XGBoostModel我们加载预先训练的模型,该模型将应用于我们在流中读取的每一批新行。

  1. object XGBoostModel {
  2. private val modelPath = "your_path"
  3. private val model = PipelineModel.read.load(modelPath)
  4. def transform(df: DataFrame) = {
  5. // replace nan values with 0
  6. val df_clean = df.na.fill(0)
  7. // run the model on new data
  8. val result = model.transform(df_clean)
  9. // display the results
  10. result.show()
  11. }
  12. }

第3步:定义自定义ML接收器
为了能够将我们的分类器应用于新数据,我们需要创建一个新的接收器(流和输出之间的接口,在我们的例子中是XGBoost模型)。为此,我们需要一个自定义接收器(MLSink),一个抽象接收器提供器(MLSinkProvider)和provider()的实现XGBoostMLSinkProvider

  1. abstract class MLSinkProvider extends StreamSinkProvider {
  2. def process(df: DataFrame): Unit
  3. def createSink(
  4. sqlContext: SQLContext,
  5. parameters: Map[String, String],
  6. partitionColumns: Seq[String],
  7. outputMode: OutputMode): MLSink = {
  8. new MLSink(process)
  9. }
  10. }
  11. case class MLSink(process: DataFrame => Unit) extends Sink {
  12. override def addBatch(batchId: Long, data: DataFrame): Unit = {
  13. process(data)
  14. }
  15. }
  16. class XGBoostMLSinkProvider extends MLSinkProvider {
  17. override def process(df: DataFrame) {
  18. XGBoostModel.transform(df)
  19. }
  20. }

第4步:在我们的自定义接收器中写入数据
最后一步是定义一个将数据写入自定义接收器的查询。还必须定义检查点位置,以便应用程序“记住”在发生故障时在流中读取的最新行。如果我们运行程序,每个新批次的数据将显示在控制台上,同时包含预测的标签。

  1. df.writeStream
  2. .format("titanic.XGBoostMLSinkProvider")
  3. .queryName("XGBoostQuery")
  4. .option("checkpointLocation", checkpoint_location)
  5. .start()