此示例显示如何训练MLlib管道以生成可应用于转换流式DataFrame的PipelineModel。在创建旨在用于转换流数据的管道时,我们还将记住几点。
首先,让我们考虑信用卡公司的情况,该公司希望尽快识别潜在的欺诈性交易,以便他们跟进客户以确认交易是否实际上是欺诈性的。这个假设的信用卡公司有关于交易的历史数据,以及哪些交易被客户报告为欺诈。我们想要根据这些历史数据训练分类器。一旦我们训练了分类器,我们就会将它应用于一系列交易。
此示例需要Databricks Runtime 4.0或更高版本。
对于此示例,我们将使用在Worldline和ULB(UniversitéLibrede Bruxelles)的机器学习小组(http://mlg.ulb.ac.be)进行大数据挖掘和研究合作期间收集和分析的数据集。欺诈检测[1]。有关相关主题的当前和过去项目的更多详细信息,请访问http://mlg.ulb.ac.be/BruFence和http://mlg.ulb.ac.be/ARTML。
首先阅读数据并检查模式。
val data = spark.read.parquet("/databricks-datasets/credit-card-fraud/data")data.printSchema()root|-- time: integer (nullable = true)|-- amountRange: integer (nullable = true)|-- label: integer (nullable = true)|-- pcaVector: vector (nullable = true)data: org.apache.spark.sql.DataFrame = [time: int, amountRange: int ... 2 more fields]
我们将使用此数据集的3列:
pcaVector:原始交易数据的PCA转换。对于这个例子,我们假设这个PCA转换在数据到达我们之前作为某个数据管道的一部分发生。amountRange:介于0和7之间的值。大致的交易金额。值对应于0-1,1-5,5-10,10-20,20-50,50-100,100-200和200+美元。label:0或1.表示交易是否是欺诈性的。
我们想要构建一个使用pcaVector和amountRange数据预测标签的模型。我们将通过使用3阶段的管道来实现这一目标。
- A
OneHotEncoderEstimator从amountRange列构建向量。 - 一个
Vector汇编程序将我们的合并pcaVector和amountRange矢量到我们的features载体。 - A
GBTClassifier到服务器作为我们的Estimator。
让我们从创建代表这些阶段的对象开始。
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, VectorAssembler}import org.apache.spark.ml.classification.GBTClassifierval oneHot = new OneHotEncoderEstimator().setInputCols(Array("amountRange")).setOutputCols(Array("amountVect"))val vectorAssembler = new VectorAssembler().setInputCols(Array("amountVect", "pcaVector")).setOutputCol("features")val estimator = new GBTClassifier().setLabelCol("label").setFeaturesCol("features")import org.apache.spark.ml.feature.{OneHotEncoderEstimator, VectorAssembler}import org.apache.spark.ml.classification.GBTClassifieroneHot: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_d08ee1823dc0vectorAssembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_1f1b468a900eestimator: org.apache.spark.ml.classification.GBTClassifier = gbtc_5215e2e5d504
到目前为止,对于使用过MLlib管道的人来说,这应该是熟悉的,但是因为我们打算在流式上下文中使用这个模型,所以我们应该注意一些事情。
首先你可能会注意到我们使用的OneHotEncoderEstimator是Spark 2.3.0中新增的,而不是a OneHotEncoder,现在已经弃用了。这个新的估算工具修复了与之相关的几个问题,OneHotEncoder并且还允许您将一个热编码应用于流式DataFrame。
使用MLlib和Structured Streaming时要注意的第二件事是VectorAssembler在流式上下文中有一些限制。具体来说,VectorAssembler只能在已知大小的Vector列上工作。这不是批处理DataFrames的问题,因为我们可以简单地检查数据帧的内容以确定向量的大小。
如果我们打算使用我们的管道来转换流式DataFrame,我们可以通过VectorSizeHint在管道中包含一个阶段来明确指定pcaVector列的大小。我们VectorAssembler阶段的另一个输入amountVect也是一个向量列,但由于此列是MLlib转换器的输出,因此它已包含适当的大小信息,因此我们不需要为此列执行任何其他操作。
import org.apache.spark.ml.feature.VectorSizeHintval vectorSizeHint = new VectorSizeHint().setInputCol("pcaVector").setSize(28)import org.apache.spark.ml.feature.VectorSizeHintvectorSizeHint: org.apache.spark.ml.feature.VectorSizeHint = vectSizeHint_1409512da3d9
在我们拟合模型之前,让我们将数据集分成两部分进行训练和验证。
val Array(train, test) = data.randomSplit(weights=Array(.8, .2))train: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [time: int, amountRange: int ... 2 more fields]test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [time: int, amountRange: int ... 2 more fields]
现在我们已经准备好构建一个管道并适应它。
import org.apache.spark.ml.Pipelineimport org.apache.spark.sql.functions.colval pipeline = new Pipeline().setStages(Array(oneHot, vectorSizeHint, vectorAssembler, estimator))val pipelineModel = pipeline.fit(train)import org.apache.spark.ml.Pipelineimport org.apache.spark.sql.functions.colpipeline: org.apache.spark.ml.Pipeline = pipeline_04b363362758pipelineModel: org.apache.spark.ml.PipelineModel = pipeline_04b363362758
由于我们没有可读取的实际流,因此我们可以通过保存测试数据然后使用Spark将其作为流来读取来模拟流。我们将使用此模拟流对模型进行一些验证。为了验证模型,我们将混淆矩阵写入表中,但由于我们正在使用Spark Structured Streaming,因此当从流中读取更多数据时,该表将实时更新。
val testDataPath = "/tmp/credit-card-frauld-test-data"test.repartition(20).write.mode("overwrite").parquet(testDataPath)testDataPath: String = /tmp/credit-card-frauld-test-data
import org.apache.spark.sql.types._import org.apache.spark.ml.linalg.SQLDataTypes.VectorTypeval schema = new StructType().add(StructField("time", IntegerType)).add(StructField("amountRange", IntegerType)).add(StructField("label", IntegerType)).add(StructField("pcaVector", VectorType))val streamingData = sqlContext.readStream.schema(schema).option("maxFilesPerTrigger", 1).parquet(testDataPath)import org.apache.spark.sql.types._import org.apache.spark.ml.linalg.SQLDataTypes.VectorTypeschema: org.apache.spark.sql.types.StructType = StructType(StructField(time,IntegerType,true), StructField(amountRange,IntegerType,true), StructField(label,IntegerType,true), StructField(pcaVector,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true))streamingData: org.apache.spark.sql.DataFrame = [time: int, amountRange: int ... 2 more fields]
为了验证我们的模型,让我们分别计算验证数据的真阳性和真阳性率,通常分别称为敏感性和特异性。因为我们将验证数据集作为流读取,所以当我们从流中读取更多事务时,这些值将更新。
import org.apache.spark.sql.functions.{count, sum, when}val streamingRates = pipelineModel.transform(streamingData).groupBy('label).agg((sum(when('prediction === 'label, 1)) / count('label)).alias("true prediction rate"),count('label).alias("count"))display(streamingRates)
此示例显示如何训练MLlib管道以生成可应用于转换流式DataFrame的PipelineModel。在创建旨在用于转换流数据的管道时,我们还将记住几点。
首先,让我们考虑信用卡公司的情况,该公司希望尽快识别潜在的欺诈性交易,以便他们跟进客户以确认交易是否实际上是欺诈性的。这个假设的信用卡公司有关于交易的历史数据,以及哪些交易被客户报告为欺诈。我们想要根据这些历史数据训练分类器。一旦我们训练了分类器,我们就会将它应用于一系列交易。
此示例需要Databricks Runtime 4.0或更高版本。
对于此示例,我们将使用在Worldline和ULB(UniversitéLibrede Bruxelles)的机器学习小组(http://mlg.ulb.ac.be)进行大数据挖掘和研究合作期间收集和分析的数据集。欺诈检测[1]。有关相关主题的当前和过去项目的更多详细信息,请访问http://mlg.ulb.ac.be/BruFence和http://mlg.ulb.ac.be/ARTML。
首先阅读数据并检查模式。
