此示例显示如何训练MLlib管道以生成可应用于转换流式DataFrame的PipelineModel。在创建旨在用于转换流数据的管道时,我们还将记住几点。
首先,让我们考虑信用卡公司的情况,该公司希望尽快识别潜在的欺诈性交易,以便他们跟进客户以确认交易是否实际上是欺诈性的。这个假设的信用卡公司有关于交易的历史数据,以及哪些交易被客户报告为欺诈。我们想要根据这些历史数据训练分类器。一旦我们训练了分类器,我们就会将它应用于一系列交易。
此示例需要Databricks Runtime 4.0或更高版本。
对于此示例,我们将使用在Worldline和ULB(UniversitéLibrede Bruxelles)的机器学习小组(http://mlg.ulb.ac.be)进行大数据挖掘和研究合作期间收集和分析的数据集。欺诈检测[1]。有关相关主题的当前和过去项目的更多详细信息,请访问http://mlg.ulb.ac.be/BruFence和http://mlg.ulb.ac.be/ARTML。
首先阅读数据并检查模式。
val data = spark.read.parquet("/databricks-datasets/credit-card-fraud/data")
data.printSchema()
root
|-- time: integer (nullable = true)
|-- amountRange: integer (nullable = true)
|-- label: integer (nullable = true)
|-- pcaVector: vector (nullable = true)
data: org.apache.spark.sql.DataFrame = [time: int, amountRange: int ... 2 more fields]
我们将使用此数据集的3列:
pcaVector
:原始交易数据的PCA转换。对于这个例子,我们假设这个PCA转换在数据到达我们之前作为某个数据管道的一部分发生。amountRange
:介于0和7之间的值。大致的交易金额。值对应于0-1,1-5,5-10,10-20,20-50,50-100,100-200和200+美元。label
:0或1.表示交易是否是欺诈性的。
我们想要构建一个使用pcaVector
和amountRange
数据预测标签的模型。我们将通过使用3阶段的管道来实现这一目标。
- A
OneHotEncoderEstimator
从amountRange
列构建向量。 - 一个
Vector
汇编程序将我们的合并pcaVector
和amountRange
矢量到我们的features
载体。 - A
GBTClassifier
到服务器作为我们的Estimator
。
让我们从创建代表这些阶段的对象开始。
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, VectorAssembler}
import org.apache.spark.ml.classification.GBTClassifier
val oneHot = new OneHotEncoderEstimator()
.setInputCols(Array("amountRange"))
.setOutputCols(Array("amountVect"))
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("amountVect", "pcaVector"))
.setOutputCol("features")
val estimator = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, VectorAssembler}
import org.apache.spark.ml.classification.GBTClassifier
oneHot: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_d08ee1823dc0
vectorAssembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_1f1b468a900e
estimator: org.apache.spark.ml.classification.GBTClassifier = gbtc_5215e2e5d504
到目前为止,对于使用过MLlib管道的人来说,这应该是熟悉的,但是因为我们打算在流式上下文中使用这个模型,所以我们应该注意一些事情。
首先你可能会注意到我们使用的OneHotEncoderEstimator
是Spark 2.3.0中新增的,而不是a OneHotEncoder
,现在已经弃用了。这个新的估算工具修复了与之相关的几个问题,OneHotEncoder
并且还允许您将一个热编码应用于流式DataFrame。
使用MLlib和Structured Streaming时要注意的第二件事是VectorAssembler
在流式上下文中有一些限制。具体来说,VectorAssembler
只能在已知大小的Vector列上工作。这不是批处理DataFrames的问题,因为我们可以简单地检查数据帧的内容以确定向量的大小。
如果我们打算使用我们的管道来转换流式DataFrame,我们可以通过VectorSizeHint
在管道中包含一个阶段来明确指定pcaVector列的大小。我们VectorAssembler
阶段的另一个输入amountVect
也是一个向量列,但由于此列是MLlib转换器的输出,因此它已包含适当的大小信息,因此我们不需要为此列执行任何其他操作。
import org.apache.spark.ml.feature.VectorSizeHint
val vectorSizeHint = new VectorSizeHint()
.setInputCol("pcaVector")
.setSize(28)
import org.apache.spark.ml.feature.VectorSizeHint
vectorSizeHint: org.apache.spark.ml.feature.VectorSizeHint = vectSizeHint_1409512da3d9
在我们拟合模型之前,让我们将数据集分成两部分进行训练和验证。
val Array(train, test) = data.randomSplit(weights=Array(.8, .2))
train: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [time: int, amountRange: int ... 2 more fields]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [time: int, amountRange: int ... 2 more fields]
现在我们已经准备好构建一个管道并适应它。
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.functions.col
val pipeline = new Pipeline()
.setStages(Array(oneHot, vectorSizeHint, vectorAssembler, estimator))
val pipelineModel = pipeline.fit(train)
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.functions.col
pipeline: org.apache.spark.ml.Pipeline = pipeline_04b363362758
pipelineModel: org.apache.spark.ml.PipelineModel = pipeline_04b363362758
由于我们没有可读取的实际流,因此我们可以通过保存测试数据然后使用Spark将其作为流来读取来模拟流。我们将使用此模拟流对模型进行一些验证。为了验证模型,我们将混淆矩阵写入表中,但由于我们正在使用Spark Structured Streaming,因此当从流中读取更多数据时,该表将实时更新。
val testDataPath = "/tmp/credit-card-frauld-test-data"
test.repartition(20).write
.mode("overwrite")
.parquet(testDataPath)
testDataPath: String = /tmp/credit-card-frauld-test-data
import org.apache.spark.sql.types._
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
val schema = new StructType()
.add(StructField("time", IntegerType))
.add(StructField("amountRange", IntegerType))
.add(StructField("label", IntegerType))
.add(StructField("pcaVector", VectorType))
val streamingData = sqlContext.readStream
.schema(schema)
.option("maxFilesPerTrigger", 1)
.parquet(testDataPath)
import org.apache.spark.sql.types._
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
schema: org.apache.spark.sql.types.StructType = StructType(StructField(time,IntegerType,true), StructField(amountRange,IntegerType,true), StructField(label,IntegerType,true), StructField(pcaVector,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true))
streamingData: org.apache.spark.sql.DataFrame = [time: int, amountRange: int ... 2 more fields]
为了验证我们的模型,让我们分别计算验证数据的真阳性和真阳性率,通常分别称为敏感性和特异性。因为我们将验证数据集作为流读取,所以当我们从流中读取更多事务时,这些值将更新。
import org.apache.spark.sql.functions.{count, sum, when}
val streamingRates = pipelineModel.transform(streamingData)
.groupBy('label)
.agg(
(sum(when('prediction === 'label, 1)) / count('label)).alias("true prediction rate"),
count('label).alias("count")
)
display(streamingRates)
此示例显示如何训练MLlib管道以生成可应用于转换流式DataFrame的PipelineModel。在创建旨在用于转换流数据的管道时,我们还将记住几点。
首先,让我们考虑信用卡公司的情况,该公司希望尽快识别潜在的欺诈性交易,以便他们跟进客户以确认交易是否实际上是欺诈性的。这个假设的信用卡公司有关于交易的历史数据,以及哪些交易被客户报告为欺诈。我们想要根据这些历史数据训练分类器。一旦我们训练了分类器,我们就会将它应用于一系列交易。
此示例需要Databricks Runtime 4.0或更高版本。
对于此示例,我们将使用在Worldline和ULB(UniversitéLibrede Bruxelles)的机器学习小组(http://mlg.ulb.ac.be)进行大数据挖掘和研究合作期间收集和分析的数据集。欺诈检测[1]。有关相关主题的当前和过去项目的更多详细信息,请访问http://mlg.ulb.ac.be/BruFence和http://mlg.ulb.ac.be/ARTML。
首先阅读数据并检查模式。