一、概述

1. 定义

Spark SQL 是 Spark 用于处理结构化数据(structured data)处理的 Spark 模块

2. Hive and SparkSQL

hive 值早期唯一运行在 Hadoop 上的 SQL-on-hadoop工具,但是MapReduce计算过程中大量的中间磁盘落地消耗了大量IO,降低了运行效率。为了提高 SQL-on-hadoop 的效率,大量的SQL-on-hadoop 工具开始产生,其中表现较为突出的有 :

  • Drill
  • Impata
  • Shark

其中 shark 是 伯克利实验室 Spark 生态环境组件之一,是基于hive 开发的工具,是 Spark SQL 的前身。它修改了下图所示的 内存管理、物理计划、执行三大模块,使得 SQL-on-hadoop 的执行速度 比 hive 高了10—100倍。
image.png
但是随着Spark 的发展,对于野心勃勃的 Spark 团队来说,Shark 对于hive 的太多依赖(比如 语法解析器、查询优化器等),制约了 Spark 的 One Stack Rule Them All 的既定方针,制约了 Spark 各个组件的相互集成,所以提出了 SparkSQL 项目。SparkSQL 抛弃原有 Shark 的代码,汲取了 Shark 的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性 SparkSQL 无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便,真可谓“退一步,海阔天
空”。

  • 数据兼容方面 SparkSQL 不但兼容 Hive,还可以从 RDD、parquet 文件、JSON 文件中获取数据,未来版本甚至支持获取 RDBMS 数据以及 cassandra 等 NOSQL 数据;
  • 性能优化方面 除了采取 In-Memory Columnar Storage、byte-code generation 等优化技术外、将会引进 Cost Model 对查询进行动态评估、获取最佳物理计划等等;
  • 组件扩展方面 无论是 SQL 的语法解析器、分析器还是优化器都可以重新定义,进行扩展。

2014 年 6 月 1 日 Shark 项目和 SparkSQL 项目的主持人 Reynold Xin 宣布:停止对 Shark 的开发,团队将所有资源放 SparkSQL 项目上,至此,Shark 的发展画上了句话,但也因此发展出两个支线:SparkSQL 和 Hive on Spark。其中 SparkSQL 作为 Spark 生态的一员继续发展,而不再受限于 Hive,只是兼容 Hive;而Hive on Spark 是一个 Hive 的发展计划,该计划将 Spark 作为 Hive 的底层引擎之一,也就是说,Hive 将不再受限于一个引擎,可以采用 Map-Reduce、Tez、Spark 等引擎。
对于开发人员来讲,SparkSQL 可以简化 RDD 的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是 SparkSQL。Spark SQL 为了简化 RDD 的开发,提高开发效率,提供了 2 个编程抽象,类似 Spark Core 中的 RDD:DataFrame 和 Dataset

3. Spark SQL 特点

  • 易整合: 无缝的整合了 SQL 查询和 Spark 编程

image.png

  • 统一的数据访问 : 使用相同的方式连接不同的数据源

image.png

  • 兼容 Hive : 在已有的仓库上直接运行 SQL 或者 HiveQL

image.png

  • 标准数据连接image.png

    4. DataFrame 是什么

    DataFrame 是以 Rdd为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame 与RDD 之间的区别在于,DataFrame 带有 schema 元信息,简单说就是 DataFrame 存储了它所表示的二维表数据集的每一列的名称和类型,进而可以对作用在 DataFrame 上的变换进行针对性的优化。
    同时,和hive 相似, DataFrame 也支持嵌套数据类型(struct,array,map),另外提供了一套高层的关系操作,比函数式的RDD API 要更友好
    image.png
    左侧的 RDD[Person]虽然以 Person 为类型参数,但 Spark 框架本身不了解 Person 类的内部结构。而右侧的 DataFrame 却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame 是为数据提供了 Schema 的视图。可以把它当做数据库中的一张表来对待。DataFrame 也是懒执行的,但性能上比 RDD 要高,主要原因:优化的执行计划,即查询计划通过 Spark catalyst optimiser 进行优化。

    5. DataSet

    DataSet是 Spark1.6 中新添加的,是一个抽象分布式数据集合,是DataFrame 的一个扩展。它提供了 RDD 的优势(强类型,使用强大的 lambda 函数的能力)以及 Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换(操作 map,flatMap,filter等等)。

  • DataSet 是 DataFrame API 的一个扩展,是 SparkSQL 最新的数据抽象

  • 用样例类来对 DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet 中的字段名称;
  • DataSet 是强类型的。比如可以有 DataSet[Car],DataSet[Person]。
  • DataFrame 是 DataSet 的特列,DataFrame=DataSet[Row] ,所以可以通过 as 方法将DataFrame 转换为 DataSet。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的表结构信息都用 Row 来表示。获取数据时需要指定顺序

    二、SparkSQL 核心编程

    1. 新的起点

    Spark SQL 可以理解为 对 Spark Core 的一种封装,不仅仅是在模型上进行封装,还在上下文对象中进行了封装。在Spark core 中,首先要构建上下文对象 SparkContext (实际上是对 SQL Context、HIve Context的组合),而 SparkSession 则是在 Spark SQL 新的上下文对象,它的内部封装了 Spark Context,所以计算实际上是 Spark Context完成的。

    2. DataFrame

    Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。

    1. 创建 DataFrame

    在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame有三种方式:通过 Spark 的数据源进行创建;从一个存在的 RDD 进行转换;还可以从 Hive Table 进行查询返回。

  • 从 Spark 数据源进行创建

    • 查看 支持的数据源格式

image.png

  • 在 spark 的 bin/data 目录中创建 user.json 文件

{"username":"zhangsan","age":20}

  • 读取 json 文件创建 DataFrame

    spark.read.json("data/demo1.json")
    image.png
    如果是从 内存中 读取数据,Spark 是可以知道数据类型的具体是什么,如果是数值,默认是作为 Int 处理,单数从文件中读取的数据,不能确定是什么类型,一般用 bigInt 接受,可以做Long类型转换,但是不能转化为Int。

  • 从 Rdd 中进行创建
  • 从 hive Table 查询结果中创建

    2. SQL 语法

    SQL 语法风格是我们查询数据的时候使用的 SQL 语句来查询,这种风格查询必须要有临时视图或者全局视图来辅助。

  • 读取Json 文件 创建 DataFrame

scala> val df = spark.read.json("data/demo1.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

  • 对 DataFrame 创建一个语法视图

    1. `scala> df.createOrReplaceTempView("people")`
  • 通过 SQL 查询全表

    1. `scala> val sqlDF = spark.sql("select * from people ") `<br />`sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, username: string] `
  • 结果展示

    1. scala> sqlDF.show
    2. +---+--------+
    3. |age|username|
    4. +---+--------+
    5. | 20|zhangsan|
    6. +---+--------+

    注意 : 普通表 是Session 范围内的,如果想范围外有效,可以使用全局表,使用的时候需要全路径

  • 创建 全局表

    df.createGlobalTempView("people1")

  • 查询全局表

spark.sql("SELECT * FROM global_temp.people").show()

3. DSL 语法

DataFrame 提供了一个特定领域的语言 domain-specific language (DSL)去管理结构化数据。我们可以在 SCala 、java、Python 和 R 中使用 DSL ,使用的时候 不再需要创建临时视图。

  • 创建 DataFrame

scala> val df = spark.read.json("data/demo1.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

  • 查看 DataFrame 的 Schema 信息

    1. scala> df.printSchema
    2. root
    3. |-- age: long (nullable = true)
    4. |-- username: string (nullable = true)
  • 只看 username 的信息

    1. scala> df.select("username").show
    2. +--------+
    3. |username|
    4. +--------+
    5. |zhangsan|
    6. +--------+
  • 查看”username”列数据以及”age+1”数据

注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名

  1. scala> df.select($"username",$"age" + 1).show
  2. scala> df.select('username, 'age + 1).show()
  3. scala> df.select('username, 'age + 1 as "newage").show()
  • 查看 age > 30 的数据

scala> df.filter($"age">30).show

  • 按照 age 进行分组

    1. ` scala> df.groupBy("age").count.show`

    4. Rdd 转化为 DataFrame

    注意: 在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入
    import spark.implicits._
    这里的 Spark 并不是 SCala 的包名,而是创建 SparkSession 对象的变量名称,所以必须先创建 SparkSession 对象后在引入。同时 Spark 对象 不能使用 var 声明 ,只能使用 val
    在 Spark-shell 中 无需引入,自动化完成此操作 ```scala

scala> val idRDD = sc.textFile(“data/1.txt”) idRDD: org.apache.spark.rdd.RDD[String] = data/1.txt MapPartitionsRDD[43] at textFile at :24

scala> idRDD.toDF(“id”).show +—-+ | id| +—-+ | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| +—-+

  1. 实际开发中,一般通过样例类将 RDD 转换为 DataFrame
  2. ```scala
  3. scala> case class User(name:String, age:Int)
  4. defined class User
  5. scala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF.show
  6. +--------+---+
  7. | name|age|
  8. +--------+---+
  9. |zhangsan| 30|
  10. | lisi| 40|
  11. +--------+---+

5. DataFrame 转化为 RDD

因为 DataFrame 就是对 RDD 的封装,所以可以直接获取内部的 RDD

  1. scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1,| t._2)).toDF
  2. df: org.apache.spark.sql.DataFrame = [name: string, age: int]
  3. scala> val rdd = df.rdd
  4. rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[58] at rdd at <console>:25
  5. scala> rdd.collect
  6. res18: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])

注意:此时得到的 RDD 存储类型为 Row

  1. scala> val array=rdd.collect
  2. array: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])
  3. scala> array(0)
  4. res19: org.apache.spark.sql.Row = [zhangsan,30]
  5. scala> array(0)(0)
  6. res20: Any = zhangsan
  7. scala> array(0).getAs[String]("name")
  8. res23: String = zhangsan

3. DataSet

DataSet 是 具有强类型的数据集合,需要提供对应的信息

1. 创建 DataSet

  • 使用样例类创建 DataSet ```scala scala> case class Person(name: String, age: Long) defined class Person

scala> val caseClassDS = Seq(Person(“zhangsan”,2)).toDS() caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> caseClassDS.show +————+—-+ | name|age| +————+—-+ |zhangsan| 2| +————+—-+

  1. - 使用基本类型创建 DataSet
  2. ```scala
  3. scala> val ds = Seq(1,2,3,4,5).toDS
  4. ds: org.apache.spark.sql.Dataset[Int] = [value: int]
  5. scala> ds.show
  6. +-----+
  7. |value|
  8. +-----+
  9. | 1|
  10. | 2|
  11. | 3|
  12. | 4|
  13. | 5|
  14. +-----+

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet

2. RDD 转换为 DataSet

SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。

  1. scala> case class User(name:String,age:Int)
  2. defined class User
  3. scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
  4. res26: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

3. DataSet 转化为 RDD

DataSet 也是 由RDD 封装而来,可以直接获取内部的RDD

  1. scala> val ds = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
  2. ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
  3. scala> ds.rdd
  4. res27: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[65] at rdd at <console>:26

4. DataSet 和 DataFrame 转换

DataFrame 其实是DataSet 的特例 ,他们之间是可以相互转化的

  1. scala> case class User(name:String, age:Int)
  2. defined class User
  3. scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
  4. df: org.apache.spark.sql.DataFrame = [name: string, age: int]
  5. scala> val ds = df.as[User]
  6. ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
  7. scala> val df = ds.toDF
  8. df: org.apache.spark.sql.DataFrame = [name: string, age: int]
  9. scala>

5. RDD DataFrame DataSet 之间的关系

1. 共性

  • 三者都是 spark 平台下分布式弹性数据集,为处理超大数据提供便利
  • 三者都有惰性机制 ,在执行转换算子(map flatmap等)不会立即执行,只有在遇到行动算子的时候才会执行
  • 三者之间有许多共同的函数 filter 过滤等
  • 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
  • 三者都有分区的概念
  • 在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:import spark.implicits._(在创建好 SparkSession 对象后尽量直接导入)
  • DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型

    2. 区别

  • 从版本上看

    • Spark 1.0 ==> RDD
    • Spark 1.3 ==> DataFrame
    • Spark 1.6 ==> DataSet

在后期的 Spark 版本中,DataSet 有可能会逐步取代 RDD和 DataFrame 成为唯一的 API 接口。

  • Rdd
    • 一般和 Spark Mlib 同时使用
    • 不支持 SQL 操作
  • DataFrame
    • 每一行的类型固定 都是 Row ,每一列无法直接访问,只能通过解析才能获取字段的值
    • DataFrame 与 DataSet 一般不与 spark mllib 同时使用
    • DataFrame 与 DataSet 均支持 SparkSQL 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql 语句操作
    • DataFrame 与 DataSet 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头,这样每一列的字段名一目了然(后面专门讲解)
  • DataSet

    • Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]
    • Dataset 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息

      3. 三者之间的相互转换

      image.png

      6. IDEA 操作

      1. 添加依赖

      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.0.0</version>
      </dependency>
      

      2. 代码实现

      ```scala object Spark01_demo { def main(args: Array[String]): Unit = { // 1. 创建上下文环境配置对象 val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“Spark01_demo”)

      // 2. 创建 SparkSession 对象 val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

      //RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换 // 2.1 spark 不是包名,是上下文环境对象名 import sparkSession.implicits._

      // 3. 读取 json 文件 创建 DataFrame // json 格式 一行一个json格式 val df: DataFrame = sparkSession.read.json(“input/session.json”) df.show()

      // 4. SQL 风格语法 df.createOrReplaceTempView(“user”) sparkSession.sql(“select avg(age) from user”) .show()

      // 5. DSL 风格语法 df.select(“username”,”age”).show

// 6. *****RDD=>DataFrame=>DataSet*****
// 6.1 RDD=>DataFrame
val rdd1: RDD[(Int, String, Int)] = sparkSession.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu", 20)))
val dataFrame:DataFrame = rdd1.toDF("id", "name", "age")
dataFrame.show()

// 6.2 DataFrame=>DataSet
val dataSet:Dataset[User] = dataFrame.as[User]
dataSet.show()

// 7. *****DataSet=>DataFrame=>RDD*****
// 7.1 DataSet=>DataFrame
val DataFrame:DataFrame = dataSet.toDF()

// 7.2 DataFrame=>RDD
val rdd:RDD[Row] = dataFrame.rdd
//RDD 返回的 RDD 类型为 Row,里面提供的 getXXX 方法可以获取字段值,类似 jdbc 处理结果集,但是索引从 0 开始
rdd.foreach(x=>println(x.getString(1)))


// 8.1  *****RDD=>DataSet*****
val rdd2: RDD[(Int, String, Int)] = sparkSession.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu", 20)))
val dataSet1:Dataset[User] = rdd2.map(x => User(x._1, x._2, x._3)).toDS()
// 8.2 *****DataSet=>=>RDD*****
val rdd3 : RDD[User] = dataSet1.rdd

//9. 释放资源
sparkSession.stop()

} } case class User(id:Int,name:String,age:Int)

<a name="nhegt"></a>
### 7. 用户自定义函数
可以通过 spark.udf 功能添加自定义函数,实现自定义功能。
<a name="qWhRd"></a>
#### 1. UDF

   1. 创建 DataFrame
   1. 注册 UDF
   1. 创建临时表
   1. 应用 UDF
- 案例 : 修改输出
```scala
  def main(args: Array[String]): Unit = {
    // 1. 创建上下文环境配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_demo")

    // 2. 创建 SparkSession 对象
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    val df: DataFrame = sparkSession.read.json("datas/session.json")

    df.createTempView("persion")
    sparkSession.udf.register("udfFunction",(age:Int) =>{
      "年龄 :" + age.toString
    })
    sparkSession.sql("select name ,udfFunction(age) from persion").show()
    sparkSession.close()
  }

2. UDAF

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator

  • 案例 :计算平均工资
    • 实现方式 - UDAF - 弱类型 ```scala import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}

object Spark03_SparkSQL_UDAF {

def main(args: Array[String]): Unit = {

    // TODO 创建SparkSQL的运行环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    val df = spark.read.json("datas/user.json")
    df.createOrReplaceTempView("user")

    spark.udf.register("ageAvg", new MyAvgUDAF())

    spark.sql("select ageAvg(age) from user").show


    // TODO 关闭环境
    spark.close()
}
/*
 自定义聚合函数类:计算年龄的平均值
 1. 继承UserDefinedAggregateFunction
 2. 重写方法(8)
 */
class MyAvgUDAF extends UserDefinedAggregateFunction{
    // 输入数据的结构 : Int
    override def inputSchema: StructType = {
        StructType(
            Array(
                StructField("age", LongType)
            )
        )
    }
    // 缓冲区数据的结构 : Buffer
    override def bufferSchema: StructType = {
        StructType(
            Array(
                StructField("total", LongType),
                StructField("count", LongType)
            )
        )
    }

    // 函数计算结果的数据类型:Out
    override def dataType: DataType = LongType

    // 函数的稳定性
    override def deterministic: Boolean = true

    // 缓冲区初始化
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
        //buffer(0) = 0L
        //buffer(1) = 0L

        buffer.update(0, 0L)
        buffer.update(1, 0L)
    }

    // 根据输入的值更新缓冲区数据
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        buffer.update(0, buffer.getLong(0)+input.getLong(0))
        buffer.update(1, buffer.getLong(1)+1)
    }

    // 缓冲区数据合并
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
        buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
    }

    // 计算平均值
    override def evaluate(buffer: Row): Any = {
        buffer.getLong(0)/buffer.getLong(1)
    }
}

}


   - 实现方式 - UDAF - 强类型
```scala

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
import org.apache.spark.sql.{Encoder, Encoders, Row, SparkSession, functions}

object Spark03_SparkSQL_UDAF1 {

    def main(args: Array[String]): Unit = {

        // TODO 创建SparkSQL的运行环境
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        val df = spark.read.json("datas/user.json")
        df.createOrReplaceTempView("user")

        spark.udf.register("ageAvg", functions.udaf(new MyAvgUDAF()))

        spark.sql("select ageAvg(age) from user").show


        // TODO 关闭环境
        spark.close()
    }
    /*
     自定义聚合函数类:计算年龄的平均值
     1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
         IN : 输入的数据类型 Long
         BUF : 缓冲区的数据类型 Buff
         OUT : 输出的数据类型 Long
     2. 重写方法(6)
     */
    case class Buff( var total:Long, var count:Long )
    class MyAvgUDAF extends Aggregator[Long, Buff, Long]{
        // z & zero : 初始值或零值
        // 缓冲区的初始化
        override def zero: Buff = {
            Buff(0L,0L)
        }

        // 根据输入的数据更新缓冲区的数据
        override def reduce(buff: Buff, in: Long): Buff = {
            buff.total = buff.total + in
            buff.count = buff.count + 1
            buff
        }

        // 合并缓冲区
        override def merge(buff1: Buff, buff2: Buff): Buff = {
            buff1.total = buff1.total + buff2.total
            buff1.count = buff1.count + buff2.count
            buff1
        }

        //计算结果
        override def finish(buff: Buff): Long = {
            buff.total / buff.count
        }

        // 缓冲区的编码操作
        override def bufferEncoder: Encoder[Buff] = Encoders.product

        // 输出的编码操作
        override def outputEncoder: Encoder[Long] = Encoders.scalaLong
    }
}
  • Spark3.0 版本可以采用强类型的 Aggregator 方式代替 UserDefinedAggregateFunction ```scala

import org.apache.spark.SparkConf import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession, TypedColumn, functions}

object Spark03_SparkSQL_UDAF2 {

def main(args: Array[String]): Unit = {

    // TODO 创建SparkSQL的运行环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._
    val df = spark.read.json("datas/user.json")

    // 早期版本中,spark不能在sql中使用强类型UDAF操作
    // SQL & DSL
    // 早期的UDAF强类型聚合函数使用DSL语法操作
    val ds: Dataset[User] = df.as[User]

    // 将UDAF函数转换为查询的列对象
    val udafCol: TypedColumn[User, Long] = new MyAvgUDAF().toColumn

    ds.select(udafCol).show


    // TODO 关闭环境
    spark.close()
}
/*
 自定义聚合函数类:计算年龄的平均值
 1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型
     IN : 输入的数据类型 User
     BUF : 缓冲区的数据类型 Buff
     OUT : 输出的数据类型 Long
 2. 重写方法(6)
 */
case class User(username:String, age:Long)
case class Buff( var total:Long, var count:Long )
class MyAvgUDAF extends Aggregator[User, Buff, Long]{
    // z & zero : 初始值或零值
    // 缓冲区的初始化
    override def zero: Buff = {
        Buff(0L,0L)
    }

    // 根据输入的数据更新缓冲区的数据
    override def reduce(buff: Buff, in: User): Buff = {
        buff.total = buff.total + in.age
        buff.count = buff.count + 1
        buff
    }

    // 合并缓冲区
    override def merge(buff1: Buff, buff2: Buff): Buff = {
        buff1.total = buff1.total + buff2.total
        buff1.count = buff1.count + buff2.count
        buff1
    }

    //计算结果
    override def finish(buff: Buff): Long = {
        buff.total / buff.count
    }

    // 缓冲区的编码操作
    override def bufferEncoder: Encoder[Buff] = Encoders.product

    // 输出的编码操作
    override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}

}

<a name="zzllB"></a>
### 8. 数据的加载与保存
<a name="EVE0O"></a>
#### 1.通用的数据加载保存方式
SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式为 parquet
<a name="ZNnxH"></a>
##### 1. 加载数据
     ` spark.read.load  `是加载数据的通用方法
```scala
scala> spark.read.
csv format jdbc json load option options orc parquet schema 
table text textFile
  • 如果读取不同格式的数据,可以对不同的数据格式进行设定

spark.read.format("…")[.option("…")].load("…")

  • format(“…”):指定加载的数据类型,包括”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”。
  • load(“…”):在”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”格式下需要传入加载数据的路径。
  • option(“…”):在”jdbc”格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
    2. 保存数据
    df.write.save 是保存数据的通用方法
    scala>df.write.
    csv jdbc json orc parquet textFile… …
    
  • 如果保存不同格式的数据,可以对不同的数据格式进行设定

df.write.format("…")[.option("…")].save("…")

  • format(“…”):指定保存的数据类型,包括”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”。
  • save (“…”):在”csv”、”orc”、”parquet”和”textFile”格式下需要传入保存数据的路径。
  • option(“…”):在”jdbc”格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。SaveMode 是一个枚举类,其中的常量包括:

image.png

2. Parquet

Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式存储格式。
数据源为 Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用 format。修改配置项 spark.sql.sources.default,可修改默认数据源格式。

scala> val df = spark.read.load("../examples/src/main/resources/users.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> df.show
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

scala> df.write.mode("append").save("./data/demo1")

1. JSON

Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row]. 可以通过 SparkSession.read.json()去加载 JSON 文件。
注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串。例如:

{"name":"Michael"}
{"name":"Andy", "age":30}
[{"name":"Justin", "age":19},{"name":"Justin", "age":19}]
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("allen").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val df: DataFrame = spark.read.json("datas/resources/people.json")   // json 格式  一行一个json格式
    df.select("*").show()
    df.write.mode("append").json("datas/resources/people" )
  }

2. CSV
    val conf: SparkConf = new SparkConf().setAppName("allen").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val dataFrame = spark.read.csv("datas/resources/people.csv")
    dataFrame.select("*").show()
    dataFrame.write.mode("append").csv("datas/resources/people1")

3. mysql

Spark SQL 通过 JDBC 从关系型数据库中读取数据创建 DataFrame。对 DataFrame 进行一系列操作后,还可以将数据再次写入到关系型数据库中。

  1. 引入依赖 ```xml mysql mysql-connector-java 5.1.27
```scala
object spark02 {
  def main(args: Array[String]): Unit = {
    // 1. 创建上下文环境配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_demo")

    // 2. 创建 SparkSession 对象
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    import sparkSession.implicits._

    // 读取MySQL数据
    val df = sparkSession.read
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/test1")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "user")
      .load()
    df.show

    val writeDf = sparkSession.sparkContext.makeRDD(List(user("user6", 6), user("user7", 7))).toDS()

    // 保存数据
    writeDf.write
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/test1")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "user")
      .mode(SaveMode.Append)
      .save()

    sparkSession.close()
  }
}
case  class user(name:String,age:Int)

4. Hive

Hive 是hadoop 上的 SQL引擎,分为两种 内置 hive 和 外置 hive 。一般使用外置hive,使用步骤如下

  1. hive-site.xml 拷贝到 conf/目录下
  2. 把 Mysql 的驱动 copy 到 jars/目录下
  3. 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
  4. 启动 hadoop hive (包括 Metastore thriftserver )
  5. 重启 spark-shell ```xml scala> spark.sql(“show tables”).show 2022-03-28 13:57:03,601 WARN conf.HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 2022-03-28 13:57:03,601 WARN conf.HiveConf: HiveConf of name hive.server2.active.passive.ha.enable does not exist +————+—————————-+—————-+ |database| tableName|isTemporary| +————+—————————-+—————-+ | default| demo| false| | default| demo1| false| | default| emp_table| false| | default| hive_emp_table| false| | default|relevance_hbase_emp| false| | default| test| false| | default| test1| false| +————+—————————-+—————-+

scala> :q


- 代码操作
   1. 引入依赖
```xml
<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-hive_2.12</artifactId>
 <version>3.0.0</version>
</dependency>
<dependency>
 <groupId>org.apache.hive</groupId>
 <artifactId>hive-exec</artifactId>
 <version>1.2.1</version>
</dependency>
<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.27</version>
</dependency>
  1. 将 hive-site.xml 文件拷贝到项目的 resources 目录中,代码实现 ```scala System.setProperty(“HADOOP_USER_NAME”, “root”) // TODO 创建SparkSQL的运行环境 val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“sparkSQL”) val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

// 使用SparkSQL连接外置的Hive // 1. 拷贝Hive-size.xml文件到classpath下 // 2. 启用Hive的支持 // 3. 增加对应的依赖关系(包含MySQL驱动) spark.sql(“show tables”).show

// TODO 关闭环境 spark.close() ```