最近一个相当热门的话题是机器学习 - 与计算统计密切相关的跨部门学科,让计算机在没有明确编程的情况下学习。
它已被发现在数据分析领域具有重要意义 - 从估算贷款和保险风险到试图在现实条件下自主驾驶汽车。
在下面的文章中,我想向读者介绍一下 MLlib
- 作为Spark Framework一部分的机器学习库。
关于以下文本的一个重要事项 - 目的是介绍图书馆,而不是机器学习或统计学背后的概念和理论,因此读者可以期待这些主题的至少基本理解。此外,还需要至少基本的Spark知识。
这将基于Apache Spark 2.x API,它采用新的DataFrame API作为旧版本的替代品 RDD
。该DataFrame
方法的主要优点之一 是它比RDD
一个更容易使用和更友好 。尽管如此, RDD
API仍然存在但已进入维护模式(它将不再被扩展,并且当DataFrame
API与其达到特征奇偶校验时将被弃用 )。
MLlib简介
MLlib(机器学习库的缩写)是Apache Spark的机器学习库,在尝试解决机器学习问题时为我们提供了Spark的卓越可扩展性和易用性。在引擎盖下,MLlib使用 Breeze来满足它的线性代数需求。
该库包含一系列非常广泛的功能,现在我将简要介绍。每个功能集的更深入描述将在后面的部分中介绍。
功能
算法
回归
- 线性
- 广义线性
- 决策树
- 随机森林
- 梯度提升树
- 生存
- 等渗
分类
- 物流(二项式和多项式)
- 决策树
- 随机森林
- 梯度提升树
- 多层感知器
- 线性支持向量机
- 一VS-所有
- 朴素贝叶斯
聚类
- K-手段
- 潜在的Dirichlet分配
- 平分k均值
- 高斯混合模型
- 协同过滤
Featurization
- 特征提取
- 转型
- 维度降低
- 选择
管道
- 撰写管道
- 构建,评估和调整机器学习管道
坚持
- 将算法,模型和管道保存到持久数据存储中以供以后使用
- 从持久数据存储加载算法,模型和管道
公用事业
- 线性代数
- 统计
- 数据处理
- 其他
DataFrame
如前所述,这 DataFrame
是Spark版本2.x中使用的新API,它应该是旧RDD
API 的替代品 。A DataFrame
是一个Spark Dataset
(简而言之 - 一个分布式,强类型的数据集合,该接口在Spark 1.6中引入)组织成命名列(代表变量)。
该概念实际上与关系数据库中的表或R / Python中的数据框相同,但具有一组隐式优化。
特点
使用DataFrame
API比使用旧API有 哪些主要卖点和好处 RDD
?这里有几个:
- 熟悉 - 如前所述,该概念类似于更广泛的已知和使用的处理数据的方法,如关系数据库中的表或例如R中的数据框构造。
- 统一API - API在语言中是一致的,因此我们不会浪费时间来适应差异,并且可以专注于重要的事情。
- Spark SQL - 它使我们能够通过SQL查询和类似SQL的特定语言访问和操作数据。
- 优化 - 在一系列优化下实现
Dataset
,在处理数据时为我们提供更多性能。 - 众多可能的来源 - 我们可以构建一个
DataSet
来自外部数据库,现有的RDD
s,CSV文件,JSON和许多其他结构化数据源。
创建DataFrame
如上所述 - 我们有多个可能的来源,我们可以从中创建一个 DataFrame
。要从Dataset
外部源加载流 ,我们将使用该 DataStreamReader
接口。
在下面的例子中,我们假设一个名为的变量 spark
存在于 SparkSession
。在 DataStreamReader
该届会议可以通过调用来获得 read
在该实例方法。
我们可以通过option
在reader实例上调用方法来为底层数据源添加输入选项 。它需要a key
和a value
作为参数(或整体 Map
)。
有两种方法来加载数据:特定格式的方法,如 csv
, jdbc
等与指定的格式明确的 format
方法,然后调用通用的 load
方法。如果未指定格式,则为Parquet
默认格式 。
以下是创建a DataFrame
和使用的方法时最常见的用例 :
实木复合地板
Parquet
是由Apache开发的Hadoop
/ Spark
生态系统项目的柱状存储格式 。
我们通过调用 带有 文件路径的load
或 parquet
方法Parquet
作为参数来 加载它 ,例如:
spark.read.load( “一些/路径/到/ file.parquet”)
CSV
众所周知的逗号分隔值文件。Spark可以自动推断CSV
加载文件的架构 。
我们通过调用csv
带有CSV
文件路径的方法作为参数来 加载它 ,例如:
- spark.read.load(“some/path/to/file.parquet”)
CSV
众所周知的逗号分隔值文件。Spark可以自动推断CSV
加载文件的架构 。
我们通过调用csv
带有CSV
文件路径的方法作为参数来 加载它 ,例如:
- spark.read.csv(“some/path/to/file.csv”)
JSON
Web应用程序最广泛使用的JavaScript Object Notation格式,用于异步前端/后端通信。Spark可以自动推断JSON
加载文件的架构 。
我们通过调用json
带有JSON
文件路径的方法作为参数来 加载它 ,例如:
- spark.read.json(“some/path/to/file.json”)
蜂巢
Apache Hive
是一个数据仓库软件包。对于接口 DataFrame
, Hive
我们需要一个 SparkSession
启用 Hive
支持和类路径中所有需要的依赖项,以便Spark自动加载它们。
我们不会介绍与Hive
数据存储的接口, 因为这需要 Hive
更深入地了解它的工作原理和工作方式。有关该主题的更多信息,请参阅有关该主题的官方 文档 。
数据库
我们可以轻松地与任何类型的数据库接口使用 JDBC
。为了使它成为可能您需要为JDBC
您的类路径中包含的要连接的数据库提供所需的 驱动程序。
我们将使用load
前面提到的 方法,但我们需要将格式从默认的one(Parquet
)更改jdbc
为format
在阅读器上 使用该 方法。我们也可以使用该 jdbc
方法并向其传递一个Properties
将保存连接属性的 类实例。
我们JDBC
通过option
前面提到的方法指定 连接属性 。此处提供了可以传递的可能选项的完整列表及其说明 。
下面是一个快速示例,说明如何DataFrame
从 JDBC
源代码创建 (例如来自官方文档):
- val jdbcDF = spark.read
- .format(“jdbc”)
- .option(“url”, “jdbc:postgresql:dbserver”)
- .option(“dbtable”, “schema.tablename”)
- .option(“user”, “username”)
- .option(“password”, “password”)
- .load()
或者使用 jdbc
方法:
- val connectionProperties = new Properties()
- connectionProperties.put(“user”, “username”)
- connectionProperties.put(“password”, “password”)
- val jdbcDF2 = spark.read
- .jdbc(“jdbc:postgresql:dbserver”, “schema.tablename”, connectionProperties)
RDD
我们可以自动将a转换 RDD
为a DataFrame
。案例类的参数名称将成为列名。它支持嵌套复杂类型,如 Seq
或 Array
。
我们需要做的就是简单地调用 toDF
方法 RDD
,即:
- val dataFrame = someRDD.toDF ()
定义架构
数据的模式通常可以自动推断,但如果我们的数据没有选项,或者我们只想手动定义它,我们有三种主要方法:
铸件
将列从一种类型显式转换为另一种类型。例如:
- val dataFrame = otherDataFrame
- .withColumn(“numericalColumn”, dataFrame(“numericalColumn”).cast(DoubleType))
StructType
使用 StructType
和 StructField
类型显式定义 DataType
每列的内容。例如:
- val schemaStruct =
- StructType(
**StructField**("intColumn", IntegerType, true) ::
**StructField**("longColumn", LongType, true) ::
**StructField**("booleanColumn", BooleanType, true) :: Nil)
- val df = spark.read
- .schema(schemaStruct)
- .option(“header”, true)
- .csv(dataPath)
编码器
这是Spark SQL的序列化和反序列化框架的概念。我们可以通过使用 Encoders
提供架构 case object
。
- case class SchemaClass(intColumn: Int, longColumn: Long, booleanColumn: Boolean)
- val schemaEncoded = Encoders.product[SchemaClass].schema
- val df = spark.read
- .schema(schemaEncoded)
- .option(“header”, true)
- .csv(dataPath)
保存DataFrame
我们可以DataFrame
通过使用DataFrameWriter
我们可以DataFrame
通过简单地调用 write
方法从a获得的接口 来保存 到持久存储 。DataFrame
在大多数情况下,写入 几乎完全相同,我们只是调用前面提到的方法 write
而不是 read
。例如,写 DataFrame
一个 JSON
文件:
- val dataFrame = spark.read.csv(“someFile.csv”)
- dataFrame.write.json(“newFile”)
探索DataFrame
我们有两种主要方法来检查DataFrame
(或任何其他 Dataset
)的内容和结构 - show
和 printSchema
。
该 show
方法有五个版本:
show()
- 以表格形式显示前20行。show(numRows: Int)
-numRows
以表格形式显示顶部 。show(truncate: Boolean)
- 以表格形式显示前20行。如果truncate
是,true
则将截断超过20个字符的字符串,并将所有单元格对齐。show(numRows: Int, truncate: Boolean)
-numRows
以表格形式显示顶 行。如果truncate
是,true
则将截断超过20个字符的字符串,并将所有单元格对齐。show(numRows: Int, truncate: Int)
-numRows
以表格形式显示顶 行。如果truncate
大于0,则将truncate
截断长于字符的字符串, 并将所有单元格对齐。
在 printSchema()
将打印出树的形式架构到控制台。
DataFrame操作
该 Dataset
接口允许我们通过基于SQL的DSL或仅通过编程方式运行SQL查询来对数据执行操作。正如前面提到的 DataFrame
仅仅是一个 Dataset
的 Rows
因此不是强类型。这就是操作无类型的原因。
在 import spark.implicits._
包含了我们在表上操作时使用更丰富的符号implicits。
无类型操作
一个简单的例子,按值过滤 someColumn
,然后选择 anotherColumn
显示的结果:
- val result = dataFrame.filter($”someColumn” > 0).select(“anotherColumn”)
- result.show()
该 $
运营商是金 spark.implicits
包,让我们创建一个 Column
从参考 String
。
可在此处的Dataset
API文档中 找到可用操作的完整列表 。
还有一套非常全面的字符串操作和数学函数。它们的列表可以在这里找到 。
运行SQL查询
我们还可以选择以编程方式运行SQL查询,该 sql
方法采用带有查询字符串作为参数的字符串。
但要做到这一点,我们需要先将其注册 DataFrame
为SQL Temporary View
。这将使 DataFrame
表从SQL查询中可见。这可以通过该createOrReplaceTempView
方法完成 ,例如:
- dataFrame.createOrReplaceTempView(“dataFrameTable”)
现在使用以下sql
方法运行SQL查询 :
- val result = spark.sql(“SELECT * FROM dataFrameTable”)
- result.show()
临时视图是会话范围的,因此会话终止时将消失。我们可以创建一个 Global Temporary View
将在所有会话之间共享并保持活动状态直到应用程序终止。全局临时视图与global_temp
数据库绑定, 因此要访问它们,我们必须使用限定名称通过使用global_temp.
前缀来引用它 。创建和访问此类视图的示例:
- dataFrame.createGlobalTempView(“globalDataFrameTable”)
- val result = spark.sql(“SELECT * FROM global_temp.globalDataFrameTable”)
- result.show()
管道
该 Pipeline
概念围绕提供统一API以创建和组合机器学习数据转换管道以创建单一,简洁的工作流的想法。它还为我们提供了保留它们并使用我们之前创建和保存的已存在的选项。该概念类似于例如流处理 Akka Streams
。
A Pipeline
可以包含以下元素:
- 变形金刚 - 变形金刚的抽象
DataFrame
。由一个transform
函数组成,该函数DataFrame
通过例如添加列,更改特定列的行,根据特征向量预测标签来映射到新的 函数 。 - Estimator - 适合或训练或数据的算法的抽象(例如回归算法)。由的
fit
映射函数DataFrame
成Model
。
此外 Transformer
, Estimator
共享一个用于指定其参数的通用API - Parameter
作为使用setter的替代方法。有关该Parameter
概念的 更多信息,请 点击此处
管道
A Pipeline
本质上是一个有序的阶段阵列。如前所述,舞台是a Transformer
或a Estimator
。当然,我们可以很容易地从两个领域和共同领域看出一个 Pipeline
可以由许多Transform
阶段组成, 但只有一个 Estimator
阶段必须在最后阶段 Pipeline
。Pipeline
一些简单回归任务的示例 :1。将分类特征转换为索引。2.规范化帧中的向量。3.线性回归。
保存/载入
我们可以轻松保存创建 Pipeline
或 Model
以后使用。并非所有 Transform
和 Estimator
类型的支持,以便检查他们的文档的具体信息它是一个好主意。大多数基本变压器和型号都受到支持。方法:
save(path: String)
- 将Model
/ 保存Pipeline
到指向的位置path
load(path: String)
- 从指向的位置 加载aModel
/Pipeline``path
例
这是一个如何创建的简短示例 Pipeline
(请注意该 setStages
方法将 Array
参数作为参数):
- val indexer = new VectorIndexer()
- .setInputCol(“features”)
- .setOutputCol(“indexedFeatures”)
- .setMaxCategories(5)
- val normalizer = new Normalizer()
- .setInputCol(“features”)
- .setOutputCol(“normalizedFeatures”)
- .setP(1.0)
- val lr = new LinearRegression()
- .setMaxIter(100)
- .setRegParam(0.5)
- .setElasticNetParam(0.5)
- val pipeline = new Pipeline()
- .setStages(Array(indexer, normalizer, lr))
Spark中的变形金刚和估算师
MLlib附带了一系列广泛的 Transformer
算法 Estimator
元素,我们可以在机器学习工作流程中使用它们。为每个文档提供的文档非常好,我建议检查一下。您可以在以下链接中找到它:
库中的回归/分类算法对两Double
值向量进行操作 - 特征向量和标签向量。因此,对于分类值,我们需要使用索引器转换列,并且需要将多个特征列值收集到单个向量中(例如,通过使用 VectorAssembler)。
如果提供的内容不够,Spark还为我们提供了定义自己 Transformer
和 Estimator
组件的方法。有关详细信息,我建议阅读 TomaszSosiński撰写的“ 扩展管道”。
例
最后,我想提供一个完整的代码示例,用于对现实世界的数据集进行回归(尽管我们只会查看其中的一小部分)。
我们将尝试根据两个变量来解决预测葡萄酒价格的回归问题 - 这是WineEnthusiast评级和制作葡萄酒的国家。我们将使用 此 数据集来执行此操作。解压缩的文件被重命名 wine-data.csv
并移动到应用程序的工作目录。
的 WineEnthusiast
,如果你看它的价值观和变量描述,但我们会把它当作一个“双”的示范起见变量是定义更接近序变量。Country是一个分类(名义)值,因此需要为特征向量编制索引。然后我们将新列收集到features
使用前面提到的名称的单个向量 中 VectorAssembler
。
- import org.apache.spark.ml.Pipeline
- import org.apache.spark.ml.evaluation.RegressionEvaluator
- import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
- import org.apache.spark.ml.regression.GBTRegressor
- import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
- import org.apache.spark.sql.{Encoders, SparkSession}
- object Main {
def **main**(args: Array[String]) = {
val spark = SparkSession.builder
.**appName**("Wine Price Regression")
.**master**("local")
.**getOrCreate**()
//We'll define a partial schema with the values we are interested in. For the sake of the example points is a Double
val schemaStruct = **StructType**(
**StructField**("points", DoubleType) ::
**StructField**("country", StringType) ::
**StructField**("price", DoubleType) :: Nil
)
//We read the data from the file taking into account there's a header.
//na.drop() will return rows where all values are non-null.
val df = spark.read
.**option**("header", true)
.**schema**(schemaStruct)
.**csv**("wine-data.csv")
.na.**drop**()
//We'll split the set into training and test data
val **Array**(trainingData, testData) = df.**randomSplit**(**Array**(0.8, 0.2))
val labelColumn = "price"
//We define two StringIndexers for the categorical variables
val countryIndexer = new **StringIndexer**()
.**setInputCol**("country")
.**setOutputCol**("countryIndex")
//We define the assembler to collect the columns into a new column with a single vector - "features"
val assembler = new **VectorAssembler**()
.**setInputCols**(**Array**("points", "countryIndex"))
.**setOutputCol**("features")
//For the regression we'll use the Gradient-boosted tree estimator
val gbt = new **GBTRegressor**()
.**setLabelCol**(labelColumn)
.**setFeaturesCol**("features")
.**setPredictionCol**("Predicted " + labelColumn)
.**setMaxIter**(50)
//We define the Array with the stages of the pipeline
val stages = **Array**(
countryIndexer,
assembler,
gbt
)
//Construct the pipeline
val pipeline = new **Pipeline**().**setStages**(stages)
//We fit our DataFrame into the pipeline to generate a model
val model = pipeline.**fit**(trainingData)
//We'll make predictions using the model and the test data
val predictions = model.**transform**(testData)
//This will evaluate the error/deviation of the regression using the Root Mean Squared deviation
val evaluator = new **RegressionEvaluator**()
.**setLabelCol**(labelColumn)
.**setPredictionCol**("Predicted " + labelColumn)
.**setMetricName**("rmse")
//We compute the error using the evaluator
val error = evaluator.**evaluate**(predictions)
**println**(error)
spark.**stop**()
}
- }
后记
我希望这篇文章有助于理解MLlib背后的基础知识以及如何在您的机器学习努力中使用它。正如我们所看到的那样,库(以及一般的Spark)为我们提供了一个精心设计的API和用于进行机器学习的工作流程。当然,这个文本是作为一个介绍,因此不会耗尽主题。但是,如前所述,Spark为我们提供了很好的文档,让我们可以更深入地追求它。
在上一节中,我提供了一些我认为应该证明对于进一步扩展我们的知识非常有用的链接。快乐的编码;)
干杯,马辛