此示例显示如何训练MLlib管道以生成可应用于转换流式DataFrame的PipelineModel。在创建旨在用于转换流数据的管道时,我们还将记住几点。
    首先,让我们考虑信用卡公司的情况,该公司希望尽快识别潜在的欺诈性交易,以便他们跟进客户以确认交易是否实际上是欺诈性的。这个假设的信用卡公司有关于交易的历史数据,以及哪些交易被客户报告为欺诈。我们想要根据这些历史数据训练分类器。一旦我们训练了分类器,我们就会将它应用于一系列交易。
    此示例需要Databricks Runtime 4.0或更高版本。

    对于此示例,我们将使用在Worldline和ULB(UniversitéLibrede Bruxelles)的机器学习小组(http://mlg.ulb.ac.be)进行大数据挖掘和研究合作期间收集和分析的数据集。欺诈检测[1]。有关相关主题的当前和过去项目的更多详细信息,请访问http://mlg.ulb.ac.be/BruFencehttp://mlg.ulb.ac.be/ARTML
    首先阅读数据并检查模式。

    1. val data = spark.read.parquet("/databricks-datasets/credit-card-fraud/data")
    2. data.printSchema()
    3. root
    4. |-- time: integer (nullable = true)
    5. |-- amountRange: integer (nullable = true)
    6. |-- label: integer (nullable = true)
    7. |-- pcaVector: vector (nullable = true)
    8. data: org.apache.spark.sql.DataFrame = [time: int, amountRange: int ... 2 more fields]

    我们将使用此数据集的3列:

    • pcaVector:原始交易数据的PCA转换。对于这个例子,我们假设这个PCA转换在数据到达我们之前作为某个数据管道的一部分发生。
    • amountRange:介于0和7之间的值。大致的交易金额。值对应于0-1,1-5,5-10,10-20,20-50,50-100,100-200和200+美元。
    • label:0或1.表示交易是否是欺诈性的。

    我们想要构建一个使用pcaVectoramountRange数据预测标签的模型。我们将通过使用3阶段的管道来实现这一目标。

    1. A OneHotEncoderEstimatoramountRange列构建向量。
    2. 一个Vector汇编程序将我们的合并pcaVectoramountRange矢量到我们的features载体。
    3. A GBTClassifier到服务器作为我们的Estimator

    让我们从创建代表这些阶段的对象开始。

    1. import org.apache.spark.ml.feature.{OneHotEncoderEstimator, VectorAssembler}
    2. import org.apache.spark.ml.classification.GBTClassifier
    3. val oneHot = new OneHotEncoderEstimator()
    4. .setInputCols(Array("amountRange"))
    5. .setOutputCols(Array("amountVect"))
    6. val vectorAssembler = new VectorAssembler()
    7. .setInputCols(Array("amountVect", "pcaVector"))
    8. .setOutputCol("features")
    9. val estimator = new GBTClassifier()
    10. .setLabelCol("label")
    11. .setFeaturesCol("features")
    12. import org.apache.spark.ml.feature.{OneHotEncoderEstimator, VectorAssembler}
    13. import org.apache.spark.ml.classification.GBTClassifier
    14. oneHot: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_d08ee1823dc0
    15. vectorAssembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_1f1b468a900e
    16. estimator: org.apache.spark.ml.classification.GBTClassifier = gbtc_5215e2e5d504

    到目前为止,对于使用过MLlib管道的人来说,这应该是熟悉的,但是因为我们打算在流式上下文中使用这个模型,所以我们应该注意一些事情。
    首先你可能会注意到我们使用的OneHotEncoderEstimator是Spark 2.3.0中新增的,而不是a OneHotEncoder,现在已经弃用了。这个新的估算工具修复了与之相关的几个问题,OneHotEncoder并且还允许您将一个热编码应用于流式DataFrame。
    使用MLlib和Structured Streaming时要注意的第二件事是VectorAssembler在流式上下文中有一些限制。具体来说,VectorAssembler只能在已知大小的Vector列上工作。这不是批处理DataFrames的问题,因为我们可以简单地检查数据帧的内容以确定向量的大小。
    如果我们打算使用我们的管道来转换流式DataFrame,我们可以通过VectorSizeHint在管道中包含一个阶段来明确指定pcaVector列的大小。我们VectorAssembler阶段的另一个输入amountVect也是一个向量列,但由于此列是MLlib转换器的输出,因此它已包含适当的大小信息,因此我们不需要为此列执行任何其他操作。

    1. import org.apache.spark.ml.feature.VectorSizeHint
    2. val vectorSizeHint = new VectorSizeHint()
    3. .setInputCol("pcaVector")
    4. .setSize(28)
    5. import org.apache.spark.ml.feature.VectorSizeHint
    6. vectorSizeHint: org.apache.spark.ml.feature.VectorSizeHint = vectSizeHint_1409512da3d9


    在我们拟合模型之前,让我们将数据集分成两部分进行训练和验证。

    1. val Array(train, test) = data.randomSplit(weights=Array(.8, .2))
    2. train: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [time: int, amountRange: int ... 2 more fields]
    3. test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [time: int, amountRange: int ... 2 more fields]

    现在我们已经准备好构建一个管道并适应它。

    1. import org.apache.spark.ml.Pipeline
    2. import org.apache.spark.sql.functions.col
    3. val pipeline = new Pipeline()
    4. .setStages(Array(oneHot, vectorSizeHint, vectorAssembler, estimator))
    5. val pipelineModel = pipeline.fit(train)
    6. import org.apache.spark.ml.Pipeline
    7. import org.apache.spark.sql.functions.col
    8. pipeline: org.apache.spark.ml.Pipeline = pipeline_04b363362758
    9. pipelineModel: org.apache.spark.ml.PipelineModel = pipeline_04b363362758

    由于我们没有可读取的实际流,因此我们可以通过保存测试数据然后使用Spark将其作为流来读取来模拟流。我们将使用此模拟流对模型进行一些验证。为了验证模型,我们将混淆矩阵写入表中,但由于我们正在使用Spark Structured Streaming,因此当从流中读取更多数据时,该表将实时更新。

    1. val testDataPath = "/tmp/credit-card-frauld-test-data"
    2. test.repartition(20).write
    3. .mode("overwrite")
    4. .parquet(testDataPath)
    5. testDataPath: String = /tmp/credit-card-frauld-test-data
    1. import org.apache.spark.sql.types._
    2. import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
    3. val schema = new StructType()
    4. .add(StructField("time", IntegerType))
    5. .add(StructField("amountRange", IntegerType))
    6. .add(StructField("label", IntegerType))
    7. .add(StructField("pcaVector", VectorType))
    8. val streamingData = sqlContext.readStream
    9. .schema(schema)
    10. .option("maxFilesPerTrigger", 1)
    11. .parquet(testDataPath)
    12. import org.apache.spark.sql.types._
    13. import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
    14. schema: org.apache.spark.sql.types.StructType = StructType(StructField(time,IntegerType,true), StructField(amountRange,IntegerType,true), StructField(label,IntegerType,true), StructField(pcaVector,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true))
    15. streamingData: org.apache.spark.sql.DataFrame = [time: int, amountRange: int ... 2 more fields]

    为了验证我们的模型,让我们分别计算验证数据的真阳性和真阳性率,通常分别称为敏感性和特异性。因为我们将验证数据集作为流读取,所以当我们从流中读取更多事务时,这些值将更新。

    1. import org.apache.spark.sql.functions.{count, sum, when}
    2. val streamingRates = pipelineModel.transform(streamingData)
    3. .groupBy('label)
    4. .agg(
    5. (sum(when('prediction === 'label, 1)) / count('label)).alias("true prediction rate"),
    6. count('label).alias("count")
    7. )
    8. display(streamingRates)


    此示例显示如何训练MLlib管道以生成可应用于转换流式DataFrame的PipelineModel。在创建旨在用于转换流数据的管道时,我们还将记住几点。
    首先,让我们考虑信用卡公司的情况,该公司希望尽快识别潜在的欺诈性交易,以便他们跟进客户以确认交易是否实际上是欺诈性的。这个假设的信用卡公司有关于交易的历史数据,以及哪些交易被客户报告为欺诈。我们想要根据这些历史数据训练分类器。一旦我们训练了分类器,我们就会将它应用于一系列交易。
    此示例需要Databricks Runtime 4.0或更高版本。

    对于此示例,我们将使用在Worldline和ULB(UniversitéLibrede Bruxelles)的机器学习小组(http://mlg.ulb.ac.be)进行大数据挖掘和研究合作期间收集和分析的数据集。欺诈检测[1]。有关相关主题的当前和过去项目的更多详细信息,请访问http://mlg.ulb.ac.be/BruFencehttp://mlg.ulb.ac.be/ARTML
    首先阅读数据并检查模式。