一、概述
1. 定义
Spark SQL 是 Spark 用于处理结构化数据(structured data)处理的 Spark 模块
2. Hive and SparkSQL
hive 值早期唯一运行在 Hadoop 上的 SQL-on-hadoop工具,但是MapReduce计算过程中大量的中间磁盘落地消耗了大量IO,降低了运行效率。为了提高 SQL-on-hadoop 的效率,大量的SQL-on-hadoop 工具开始产生,其中表现较为突出的有 :
- Drill
- Impata
- Shark
其中 shark 是 伯克利实验室 Spark 生态环境组件之一,是基于hive 开发的工具,是 Spark SQL 的前身。它修改了下图所示的 内存管理、物理计划、执行三大模块,使得 SQL-on-hadoop 的执行速度 比 hive 高了10—100倍。
但是随着Spark 的发展,对于野心勃勃的 Spark 团队来说,Shark 对于hive 的太多依赖(比如 语法解析器、查询优化器等),制约了 Spark 的 One Stack Rule Them All 的既定方针,制约了 Spark 各个组件的相互集成,所以提出了 SparkSQL 项目。SparkSQL 抛弃原有 Shark 的代码,汲取了 Shark 的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性 SparkSQL 无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便,真可谓“退一步,海阔天
空”。
- 数据兼容方面 SparkSQL 不但兼容 Hive,还可以从 RDD、parquet 文件、JSON 文件中获取数据,未来版本甚至支持获取 RDBMS 数据以及 cassandra 等 NOSQL 数据;
- 性能优化方面 除了采取 In-Memory Columnar Storage、byte-code generation 等优化技术外、将会引进 Cost Model 对查询进行动态评估、获取最佳物理计划等等;
- 组件扩展方面 无论是 SQL 的语法解析器、分析器还是优化器都可以重新定义,进行扩展。
2014 年 6 月 1 日 Shark 项目和 SparkSQL 项目的主持人 Reynold Xin 宣布:停止对 Shark 的开发,团队将所有资源放 SparkSQL 项目上,至此,Shark 的发展画上了句话,但也因此发展出两个支线:SparkSQL 和 Hive on Spark。其中 SparkSQL 作为 Spark 生态的一员继续发展,而不再受限于 Hive,只是兼容 Hive;而Hive on Spark 是一个 Hive 的发展计划,该计划将 Spark 作为 Hive 的底层引擎之一,也就是说,Hive 将不再受限于一个引擎,可以采用 Map-Reduce、Tez、Spark 等引擎。
对于开发人员来讲,SparkSQL 可以简化 RDD 的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是 SparkSQL。Spark SQL 为了简化 RDD 的开发,提高开发效率,提供了 2 个编程抽象,类似 Spark Core 中的 RDD:DataFrame 和 Dataset
3. Spark SQL 特点
- 易整合: 无缝的整合了 SQL 查询和 Spark 编程

- 统一的数据访问 : 使用相同的方式连接不同的数据源

- 兼容 Hive : 在已有的仓库上直接运行 SQL 或者 HiveQL

-
4. DataFrame 是什么
DataFrame 是以 Rdd为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame 与RDD 之间的区别在于,DataFrame 带有 schema 元信息,简单说就是 DataFrame 存储了它所表示的二维表数据集的每一列的名称和类型,进而可以对作用在 DataFrame 上的变换进行针对性的优化。
同时,和hive 相似, DataFrame 也支持嵌套数据类型(struct,array,map),另外提供了一套高层的关系操作,比函数式的RDD API 要更友好
左侧的 RDD[Person]虽然以 Person 为类型参数,但 Spark 框架本身不了解 Person 类的内部结构。而右侧的 DataFrame 却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame 是为数据提供了 Schema 的视图。可以把它当做数据库中的一张表来对待。DataFrame 也是懒执行的,但性能上比 RDD 要高,主要原因:优化的执行计划,即查询计划通过 Spark catalyst optimiser 进行优化。5. DataSet
DataSet是 Spark1.6 中新添加的,是一个抽象分布式数据集合,是DataFrame 的一个扩展。它提供了 RDD 的优势(强类型,使用强大的 lambda 函数的能力)以及 Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换(操作 map,flatMap,filter等等)。
DataSet 是 DataFrame API 的一个扩展,是 SparkSQL 最新的数据抽象
- 用样例类来对 DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet 中的字段名称;
- DataSet 是强类型的。比如可以有 DataSet[Car],DataSet[Person]。
DataFrame 是 DataSet 的特列,DataFrame=DataSet[Row] ,所以可以通过 as 方法将DataFrame 转换为 DataSet。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的表结构信息都用 Row 来表示。获取数据时需要指定顺序
二、SparkSQL 核心编程
1. 新的起点
Spark SQL 可以理解为 对 Spark Core 的一种封装,不仅仅是在模型上进行封装,还在上下文对象中进行了封装。在Spark core 中,首先要构建上下文对象 SparkContext (实际上是对 SQL Context、HIve Context的组合),而 SparkSession 则是在 Spark SQL 新的上下文对象,它的内部封装了 Spark Context,所以计算实际上是 Spark Context完成的。
2. DataFrame
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。
1. 创建 DataFrame
在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame有三种方式:通过 Spark 的数据源进行创建;从一个存在的 RDD 进行转换;还可以从 Hive Table 进行查询返回。
从 Spark 数据源进行创建
- 查看 支持的数据源格式

- 在 spark 的 bin/data 目录中创建 user.json 文件
{"username":"zhangsan","age":20}
读取 json 文件创建 DataFrame
spark.read.json("data/demo1.json")
如果是从 内存中 读取数据,Spark 是可以知道数据类型的具体是什么,如果是数值,默认是作为 Int 处理,单数从文件中读取的数据,不能确定是什么类型,一般用 bigInt 接受,可以做Long类型转换,但是不能转化为Int。
- 从 Rdd 中进行创建
-
2. SQL 语法
SQL 语法风格是我们查询数据的时候使用的 SQL 语句来查询,这种风格查询必须要有临时视图或者全局视图来辅助。
读取Json 文件 创建 DataFrame
scala> val df = spark.read.json("data/demo1.json")df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
对 DataFrame 创建一个语法视图
`scala> df.createOrReplaceTempView("people")`
通过 SQL 查询全表
`scala> val sqlDF = spark.sql("select * from people ") `<br />`sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, username: string] `
结果展示
scala> sqlDF.show+---+--------+|age|username|+---+--------+| 20|zhangsan|+---+--------+
注意 : 普通表 是Session 范围内的,如果想范围外有效,可以使用全局表,使用的时候需要全路径
创建 全局表
df.createGlobalTempView("people1")查询全局表
spark.sql("SELECT * FROM global_temp.people").show()
3. DSL 语法
DataFrame 提供了一个特定领域的语言 domain-specific language (DSL)去管理结构化数据。我们可以在 SCala 、java、Python 和 R 中使用 DSL ,使用的时候 不再需要创建临时视图。
- 创建 DataFrame
scala> val df = spark.read.json("data/demo1.json")df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
查看 DataFrame 的 Schema 信息
scala> df.printSchemaroot|-- age: long (nullable = true)|-- username: string (nullable = true)
只看 username 的信息
scala> df.select("username").show+--------+|username|+--------+|zhangsan|+--------+
查看”username”列数据以及”age+1”数据
注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
scala> df.select($"username",$"age" + 1).showscala> df.select('username, 'age + 1).show()scala> df.select('username, 'age + 1 as "newage").show()
- 查看 age > 30 的数据
scala> df.filter($"age">30).show
按照 age 进行分组
` scala> df.groupBy("age").count.show`
4. Rdd 转化为 DataFrame
注意: 在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入
import spark.implicits._
这里的 Spark 并不是 SCala 的包名,而是创建 SparkSession 对象的变量名称,所以必须先创建 SparkSession 对象后在引入。同时 Spark 对象 不能使用 var 声明 ,只能使用 val
在 Spark-shell 中 无需引入,自动化完成此操作 ```scala
scala> val idRDD = sc.textFile(“data/1.txt”)
idRDD: org.apache.spark.rdd.RDD[String] = data/1.txt MapPartitionsRDD[43] at textFile at
scala> idRDD.toDF(“id”).show +—-+ | id| +—-+ | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| +—-+
实际开发中,一般通过样例类将 RDD 转换为 DataFrame```scalascala> case class User(name:String, age:Int)defined class Userscala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF.show+--------+---+| name|age|+--------+---+|zhangsan| 30|| lisi| 40|+--------+---+
5. DataFrame 转化为 RDD
因为 DataFrame 就是对 RDD 的封装,所以可以直接获取内部的 RDD
scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1,| t._2)).toDFdf: org.apache.spark.sql.DataFrame = [name: string, age: int]scala> val rdd = df.rddrdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[58] at rdd at <console>:25scala> rdd.collectres18: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])
注意:此时得到的 RDD 存储类型为 Row
scala> val array=rdd.collectarray: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])scala> array(0)res19: org.apache.spark.sql.Row = [zhangsan,30]scala> array(0)(0)res20: Any = zhangsanscala> array(0).getAs[String]("name")res23: String = zhangsan
3. DataSet
DataSet 是 具有强类型的数据集合,需要提供对应的信息
1. 创建 DataSet
- 使用样例类创建 DataSet ```scala scala> case class Person(name: String, age: Long) defined class Person
scala> val caseClassDS = Seq(Person(“zhangsan”,2)).toDS() caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> caseClassDS.show +————+—-+ | name|age| +————+—-+ |zhangsan| 2| +————+—-+
- 使用基本类型创建 DataSet```scalascala> val ds = Seq(1,2,3,4,5).toDSds: org.apache.spark.sql.Dataset[Int] = [value: int]scala> ds.show+-----+|value|+-----+| 1|| 2|| 3|| 4|| 5|+-----+
注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet
2. RDD 转换为 DataSet
SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。
scala> case class User(name:String,age:Int)defined class Userscala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDSres26: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
3. DataSet 转化为 RDD
DataSet 也是 由RDD 封装而来,可以直接获取内部的RDD
scala> val ds = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDSds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]scala> ds.rddres27: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[65] at rdd at <console>:26
4. DataSet 和 DataFrame 转换
DataFrame 其实是DataSet 的特例 ,他们之间是可以相互转化的
scala> case class User(name:String, age:Int)defined class Userscala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")df: org.apache.spark.sql.DataFrame = [name: string, age: int]scala> val ds = df.as[User]ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]scala> val df = ds.toDFdf: org.apache.spark.sql.DataFrame = [name: string, age: int]scala>
5. RDD DataFrame DataSet 之间的关系
1. 共性
- 三者都是 spark 平台下分布式弹性数据集,为处理超大数据提供便利
- 三者都有惰性机制 ,在执行转换算子(map flatmap等)不会立即执行,只有在遇到行动算子的时候才会执行
- 三者之间有许多共同的函数 filter 过滤等
- 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
- 三者都有分区的概念
- 在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:import spark.implicits._(在创建好 SparkSession 对象后尽量直接导入)
DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型
2. 区别
从版本上看
- Spark 1.0 ==> RDD
- Spark 1.3 ==> DataFrame
- Spark 1.6 ==> DataSet
在后期的 Spark 版本中,DataSet 有可能会逐步取代 RDD和 DataFrame 成为唯一的 API 接口。
- Rdd
- 一般和 Spark Mlib 同时使用
- 不支持 SQL 操作
- DataFrame
- 每一行的类型固定 都是 Row ,每一列无法直接访问,只能通过解析才能获取字段的值
- DataFrame 与 DataSet 一般不与 spark mllib 同时使用
- DataFrame 与 DataSet 均支持 SparkSQL 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql 语句操作
- DataFrame 与 DataSet 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头,这样每一列的字段名一目了然(后面专门讲解)
DataSet
- Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]
Dataset 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息
3. 三者之间的相互转换
6. IDEA 操作
1. 添加依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0</version> </dependency>2. 代码实现
```scala object Spark01_demo { def main(args: Array[String]): Unit = { // 1. 创建上下文环境配置对象 val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“Spark01_demo”)
// 2. 创建 SparkSession 对象 val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换 // 2.1 spark 不是包名,是上下文环境对象名 import sparkSession.implicits._
// 3. 读取 json 文件 创建 DataFrame // json 格式 一行一个json格式 val df: DataFrame = sparkSession.read.json(“input/session.json”) df.show()
// 4. SQL 风格语法 df.createOrReplaceTempView(“user”) sparkSession.sql(“select avg(age) from user”) .show()
// 5. DSL 风格语法 df.select(“username”,”age”).show
// 6. *****RDD=>DataFrame=>DataSet*****
// 6.1 RDD=>DataFrame
val rdd1: RDD[(Int, String, Int)] = sparkSession.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu", 20)))
val dataFrame:DataFrame = rdd1.toDF("id", "name", "age")
dataFrame.show()
// 6.2 DataFrame=>DataSet
val dataSet:Dataset[User] = dataFrame.as[User]
dataSet.show()
// 7. *****DataSet=>DataFrame=>RDD*****
// 7.1 DataSet=>DataFrame
val DataFrame:DataFrame = dataSet.toDF()
// 7.2 DataFrame=>RDD
val rdd:RDD[Row] = dataFrame.rdd
//RDD 返回的 RDD 类型为 Row,里面提供的 getXXX 方法可以获取字段值,类似 jdbc 处理结果集,但是索引从 0 开始
rdd.foreach(x=>println(x.getString(1)))
// 8.1 *****RDD=>DataSet*****
val rdd2: RDD[(Int, String, Int)] = sparkSession.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu", 20)))
val dataSet1:Dataset[User] = rdd2.map(x => User(x._1, x._2, x._3)).toDS()
// 8.2 *****DataSet=>=>RDD*****
val rdd3 : RDD[User] = dataSet1.rdd
//9. 释放资源
sparkSession.stop()
} } case class User(id:Int,name:String,age:Int)
<a name="nhegt"></a>
### 7. 用户自定义函数
可以通过 spark.udf 功能添加自定义函数,实现自定义功能。
<a name="qWhRd"></a>
#### 1. UDF
1. 创建 DataFrame
1. 注册 UDF
1. 创建临时表
1. 应用 UDF
- 案例 : 修改输出
```scala
def main(args: Array[String]): Unit = {
// 1. 创建上下文环境配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_demo")
// 2. 创建 SparkSession 对象
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val df: DataFrame = sparkSession.read.json("datas/session.json")
df.createTempView("persion")
sparkSession.udf.register("udfFunction",(age:Int) =>{
"年龄 :" + age.toString
})
sparkSession.sql("select name ,udfFunction(age) from persion").show()
sparkSession.close()
}
2. UDAF
强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator
- 案例 :计算平均工资
- 实现方式 - UDAF - 弱类型 ```scala import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
object Spark03_SparkSQL_UDAF {
def main(args: Array[String]): Unit = {
// TODO 创建SparkSQL的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val df = spark.read.json("datas/user.json")
df.createOrReplaceTempView("user")
spark.udf.register("ageAvg", new MyAvgUDAF())
spark.sql("select ageAvg(age) from user").show
// TODO 关闭环境
spark.close()
}
/*
自定义聚合函数类:计算年龄的平均值
1. 继承UserDefinedAggregateFunction
2. 重写方法(8)
*/
class MyAvgUDAF extends UserDefinedAggregateFunction{
// 输入数据的结构 : Int
override def inputSchema: StructType = {
StructType(
Array(
StructField("age", LongType)
)
)
}
// 缓冲区数据的结构 : Buffer
override def bufferSchema: StructType = {
StructType(
Array(
StructField("total", LongType),
StructField("count", LongType)
)
)
}
// 函数计算结果的数据类型:Out
override def dataType: DataType = LongType
// 函数的稳定性
override def deterministic: Boolean = true
// 缓冲区初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
//buffer(0) = 0L
//buffer(1) = 0L
buffer.update(0, 0L)
buffer.update(1, 0L)
}
// 根据输入的值更新缓冲区数据
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer.update(0, buffer.getLong(0)+input.getLong(0))
buffer.update(1, buffer.getLong(1)+1)
}
// 缓冲区数据合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
}
// 计算平均值
override def evaluate(buffer: Row): Any = {
buffer.getLong(0)/buffer.getLong(1)
}
}
}
- 实现方式 - UDAF - 强类型
```scala
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
import org.apache.spark.sql.{Encoder, Encoders, Row, SparkSession, functions}
object Spark03_SparkSQL_UDAF1 {
def main(args: Array[String]): Unit = {
// TODO 创建SparkSQL的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val df = spark.read.json("datas/user.json")
df.createOrReplaceTempView("user")
spark.udf.register("ageAvg", functions.udaf(new MyAvgUDAF()))
spark.sql("select ageAvg(age) from user").show
// TODO 关闭环境
spark.close()
}
/*
自定义聚合函数类:计算年龄的平均值
1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
IN : 输入的数据类型 Long
BUF : 缓冲区的数据类型 Buff
OUT : 输出的数据类型 Long
2. 重写方法(6)
*/
case class Buff( var total:Long, var count:Long )
class MyAvgUDAF extends Aggregator[Long, Buff, Long]{
// z & zero : 初始值或零值
// 缓冲区的初始化
override def zero: Buff = {
Buff(0L,0L)
}
// 根据输入的数据更新缓冲区的数据
override def reduce(buff: Buff, in: Long): Buff = {
buff.total = buff.total + in
buff.count = buff.count + 1
buff
}
// 合并缓冲区
override def merge(buff1: Buff, buff2: Buff): Buff = {
buff1.total = buff1.total + buff2.total
buff1.count = buff1.count + buff2.count
buff1
}
//计算结果
override def finish(buff: Buff): Long = {
buff.total / buff.count
}
// 缓冲区的编码操作
override def bufferEncoder: Encoder[Buff] = Encoders.product
// 输出的编码操作
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
}
- Spark3.0 版本可以采用强类型的 Aggregator 方式代替 UserDefinedAggregateFunction ```scala
import org.apache.spark.SparkConf import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession, TypedColumn, functions}
object Spark03_SparkSQL_UDAF2 {
def main(args: Array[String]): Unit = {
// TODO 创建SparkSQL的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val df = spark.read.json("datas/user.json")
// 早期版本中,spark不能在sql中使用强类型UDAF操作
// SQL & DSL
// 早期的UDAF强类型聚合函数使用DSL语法操作
val ds: Dataset[User] = df.as[User]
// 将UDAF函数转换为查询的列对象
val udafCol: TypedColumn[User, Long] = new MyAvgUDAF().toColumn
ds.select(udafCol).show
// TODO 关闭环境
spark.close()
}
/*
自定义聚合函数类:计算年龄的平均值
1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
IN : 输入的数据类型 User
BUF : 缓冲区的数据类型 Buff
OUT : 输出的数据类型 Long
2. 重写方法(6)
*/
case class User(username:String, age:Long)
case class Buff( var total:Long, var count:Long )
class MyAvgUDAF extends Aggregator[User, Buff, Long]{
// z & zero : 初始值或零值
// 缓冲区的初始化
override def zero: Buff = {
Buff(0L,0L)
}
// 根据输入的数据更新缓冲区的数据
override def reduce(buff: Buff, in: User): Buff = {
buff.total = buff.total + in.age
buff.count = buff.count + 1
buff
}
// 合并缓冲区
override def merge(buff1: Buff, buff2: Buff): Buff = {
buff1.total = buff1.total + buff2.total
buff1.count = buff1.count + buff2.count
buff1
}
//计算结果
override def finish(buff: Buff): Long = {
buff.total / buff.count
}
// 缓冲区的编码操作
override def bufferEncoder: Encoder[Buff] = Encoders.product
// 输出的编码操作
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
}
<a name="zzllB"></a>
### 8. 数据的加载与保存
<a name="EVE0O"></a>
#### 1.通用的数据加载保存方式
SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式为 parquet
<a name="ZNnxH"></a>
##### 1. 加载数据
` spark.read.load `是加载数据的通用方法
```scala
scala> spark.read.
csv format jdbc json load option options orc parquet schema
table text textFile
- 如果读取不同格式的数据,可以对不同的数据格式进行设定
spark.read.format("…")[.option("…")].load("…")
- format(“…”):指定加载的数据类型,包括”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”。
- load(“…”):在”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”格式下需要传入加载数据的路径。
- option(“…”):在”jdbc”格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
2. 保存数据
df.write.save是保存数据的通用方法scala>df.write. csv jdbc json orc parquet textFile… …
- 如果保存不同格式的数据,可以对不同的数据格式进行设定
df.write.format("…")[.option("…")].save("…")
- format(“…”):指定保存的数据类型,包括”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”。
- save (“…”):在”csv”、”orc”、”parquet”和”textFile”格式下需要传入保存数据的路径。
- option(“…”):在”jdbc”格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。SaveMode 是一个枚举类,其中的常量包括:
2. Parquet
Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式存储格式。
数据源为 Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用 format。修改配置项 spark.sql.sources.default,可修改默认数据源格式。
scala> val df = spark.read.load("../examples/src/main/resources/users.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
scala> df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
scala> df.write.mode("append").save("./data/demo1")
1. JSON
Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row]. 可以通过 SparkSession.read.json()去加载 JSON 文件。
注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串。例如:
{"name":"Michael"}
{"name":"Andy", "age":30}
[{"name":"Justin", "age":19},{"name":"Justin", "age":19}]
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("allen").setMaster("local[*]")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("datas/resources/people.json") // json 格式 一行一个json格式
df.select("*").show()
df.write.mode("append").json("datas/resources/people" )
}
2. CSV
val conf: SparkConf = new SparkConf().setAppName("allen").setMaster("local[*]")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val dataFrame = spark.read.csv("datas/resources/people.csv")
dataFrame.select("*").show()
dataFrame.write.mode("append").csv("datas/resources/people1")
3. mysql
Spark SQL 通过 JDBC 从关系型数据库中读取数据创建 DataFrame。对 DataFrame 进行一系列操作后,还可以将数据再次写入到关系型数据库中。
- 引入依赖
```xml
mysql mysql-connector-java 5.1.27
```scala
object spark02 {
def main(args: Array[String]): Unit = {
// 1. 创建上下文环境配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_demo")
// 2. 创建 SparkSession 对象
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._
// 读取MySQL数据
val df = sparkSession.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test1")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "user")
.load()
df.show
val writeDf = sparkSession.sparkContext.makeRDD(List(user("user6", 6), user("user7", 7))).toDS()
// 保存数据
writeDf.write
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test1")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "user")
.mode(SaveMode.Append)
.save()
sparkSession.close()
}
}
case class user(name:String,age:Int)
4. Hive
Hive 是hadoop 上的 SQL引擎,分为两种 内置 hive 和 外置 hive 。一般使用外置hive,使用步骤如下
- hive-site.xml 拷贝到 conf/目录下
- 把 Mysql 的驱动 copy 到 jars/目录下
- 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
- 启动 hadoop hive (包括 Metastore thriftserver )
- 重启 spark-shell ```xml scala> spark.sql(“show tables”).show 2022-03-28 13:57:03,601 WARN conf.HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 2022-03-28 13:57:03,601 WARN conf.HiveConf: HiveConf of name hive.server2.active.passive.ha.enable does not exist +————+—————————-+—————-+ |database| tableName|isTemporary| +————+—————————-+—————-+ | default| demo| false| | default| demo1| false| | default| emp_table| false| | default| hive_emp_table| false| | default|relevance_hbase_emp| false| | default| test| false| | default| test1| false| +————+—————————-+—————-+
scala> :q
- 代码操作
1. 引入依赖
```xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
- 将 hive-site.xml 文件拷贝到项目的 resources 目录中,代码实现 ```scala System.setProperty(“HADOOP_USER_NAME”, “root”) // TODO 创建SparkSQL的运行环境 val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“sparkSQL”) val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
// 使用SparkSQL连接外置的Hive // 1. 拷贝Hive-size.xml文件到classpath下 // 2. 启用Hive的支持 // 3. 增加对应的依赖关系(包含MySQL驱动) spark.sql(“show tables”).show
// TODO 关闭环境 spark.close() ```
