学习链接:https://www.bilibili.com/video/BV11A411L7CK?p=153&spm_id_from=333.1007.top_right_bar_window_history.content.click&vd_source=b9e4f35102d61e6d02e0a5e1bbfea480


1 SparkSQL概述

1.1 SparkSQL是什么

Spark SQL是Spark用于结构化处理的Spark模块。

1.2 Hive and SparkSQL

  • 数据兼容方面 SparkSQL 不但兼容 Hive,还可以从 RDD、parquet 文件、JSON 文件中获取数据,未来版本甚至支持获取 RDBMS 数据以及 cassandra 等 NOSQL 数据;
  • 性能优化方面 除了采取 In-Memory Columnar Storage、byte-code generation 等优化技术外、将会引进 Cost Model 对查询进行动态评估、获取最佳物理计划等等;
  • 组件扩展方面 无论是 SQL 的语法解析器、分析器还是优化器都可以重新定义,进行扩展
  • SparkSQL 可以简化 RDD 的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是 SparkSQL。Spark SQL 为了简化 RDD 的开发,提高开发效率,提供了 2 个编程抽象,类似 Spark Core 中的 RDD
    • DataFrame
    • DataSet

      1.3 SparkSQL特点

  1. 易整合

无缝整合了SQL查询和Spark编程

  1. 统一的数据访问

使用相同的方式连接不同的数据源

  1. 兼容Hive

在已有的仓库上直接运行SQL或者HiveQL

  1. 标准数据连接

通过JDBC或ODBC连接

1.4 DataFrame

在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame 与 RDD 的主要区别在于,前者带有 schema 元信息,即 DataFrame所表示的二维表数据集的每一列都带有名称和类型。
DataFrame 是为数据提供了 Schema 的视图。可以把它当做数据库中的一张表来对待 DataFrame 也是懒执行的,但性能上比 RDD 要高,主要原因:优化的执行计划,即查询计划通过 Spark catalyst optimiser 进行优化。比如先对 DataFrame 进行过滤,再 join 过滤后的较小的结果集。
图片1.png

1.5 DataSet

DataSet 是分布式数据集合。DataSet 是 Spark 1.6 中添加的一个新抽象,是 DataFrame的一个扩展。它提供了 RDD 的优势(强类型,使用强大的 lambda 函数的能力)以及 Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换(操作 map,flatMap,filter等等)。

  • DataSet 是 DataFrame API 的一个扩展,是 SparkSQL 最新的数据抽象
  • 用户友好的 API 风格,既具有类型安全检查也具有 DataFrame 的查询优化特性;
  • 用样例类来对 DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射DataSet 中的字段名称;
  • DataSet 是强类型的。比如可以有 DataSet[Car],DataSet[Person]。
  • DataFrame 是 DataSet 的特列,DataFrame=DataSet[Row] ,所以可以通过 as 方法将DataFrame 转换为 DataSet。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的表结构信息都用 Row 来表示。获取数据时需要指定顺序

    2 SparkSQL核心编程

    2.1 查询起始点

    SparkSession 是 Spark 最新的 SQL 查询起始点,实质上是 SQLContext 和 HiveContext的组合。SparkSession 内部封装了 SparkContext,所以计算实际上是由 sparkContext 完成的。使用 spark-shell 的时候, spark 框架会自动的创建一个名称叫做 spark 的 SparkSession 对 象, 就像以前可以自动获取到一个 sc 来表示 SparkContext 对象一样
    QQ截图20220624105828.png

    2.2 DataFrame

    Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。

    2.2.1 创建DataFrame

    在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame有三种方式:通过 Spark 的数据源进行创建;从一个存在的 RDD 进行转换;还可以从 Hive Table 进行查询返回。
  1. 从Spark数据源进行创建

    1. 查看Spark支持创建文件的数据源格式

      1. scala> spark.read.
      2. csv format jdbc json load option options orc parquet schema table text textFile
    2. 在spark的\bin\input目录中创建user.json

      1. {"username":"zhangsan","age":30}
      2. {"username":"lisi","age":20}
      3. {"username":"wangwu","age":32}
    3. 读取json文件创建DataFrame

      1. scala> spark.read.json("input/user.json")
      2. res0: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
  2. 从RDD进行转换

  3. 从 Hive Table 进行查询返回

    2.2.2 SQL语法

    SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。

  4. 读取json文件创建DataFrame

QQ截图20220624110859.png

  1. 对DataFrame创建一个临时表

QQ截图20220624111001.png

  1. 通过SQL语句实现查询

QQ截图20220624111124.png
QQ截图20220624111411.png
QQ截图20220624111428.png

  1. 对于DataFrame创建一个全局表

    普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.user

  1. scala> df.createOrReplaceGlobalTempView("emp")
  1. 通过SQL语句实现查询全表

QQ截图20220624111334.png

2.2.3 DSL语法

DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必创建临时视图。

  1. 查看DataFrame的Schema信息

    1. scala> df.printSchema
    2. root
    3. |-- age: long (nullable = true)
    4. |-- username: string (nullable = true)
  2. 只查看”username”列数据

    1. scala> df.select("username").show()
    2. +--------+
    3. |username|
    4. +--------+
    5. |zhangsan|
    6. | lisi|
    7. | wangwu|
    8. +--------+
  3. 查看age+1

    1. scala> df.select($"age"+1).show
    2. +---------+
    3. |(age + 1)|
    4. +---------+
    5. | 31|
    6. | 21|
    7. | 33|
    8. +---------+
    9. scala> df.select('age + 1).show
    10. +---------+
    11. |(age + 1)|
    12. +---------+
    13. | 31|
    14. | 21|
    15. | 33|
    16. +---------+
  4. 查看age大于20的数据

    1. scala> df.filter('age > 20).show
    2. +---+--------+
    3. |age|username|
    4. +---+--------+
    5. | 30|zhangsan|
    6. | 32| wangwu|
    7. +---+--------+
  5. 按照age分组,查看数据条数

    1. scala> df.groupBy("age").count.show
    2. +---+-----+
    3. |age|count|
    4. +---+-----+
    5. | 32| 1|
    6. | 30| 1|
    7. | 20| 1|
    8. +---+-----+

    2.2.4 RDD和DataFrame转换

    在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入import spark.implicits._
    spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持val 修饰的对象的引入。
    spark-shell 中无需导入,自动完成此操作。 ```scala scala> val rdd = sc.makeRDD(List(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[53] at makeRDD at :24

scala> val df = rdd.toDF(“id”) df: org.apache.spark.sql.DataFrame = [id: int]

scala> df.show +—-+ | id| +—-+ | 1| | 2| | 3| | 4| +—-+

scala> df.rdd res16: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[61] at rdd at :26

  1. <a name="UK5cy"></a>
  2. ## 2.3 DataSet
  3. DataSet是具有强类型的数据集合,需要提供对应得类型信息
  4. <a name="ugsP7"></a>
  5. ### 2.3.1 创建DataSet
  6. 1. 使用样例类序列创建DataSet
  7. ```scala
  8. scala> case class Person(name: String, age: Long)
  9. defined class Person
  10. scala> val list = List(Person("zhangsan",30),Person("lisi",40))
  11. list: List[Person] = List(Person(zhangsan,30), Person(lisi,40))
  12. scala> list.toDS
  13. res17: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
  14. scala> val ds = list.toDS
  15. ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
  16. scala> ds.show
  17. +--------+---+
  18. | name|age|
  19. +--------+---+
  20. |zhangsan| 30|
  21. | lisi| 40|
  22. +--------+---+
  1. 使用基本类型得序列创建DataSet ```scala scala> val ds = Seq(1,2,3,4).toDS ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show +——-+ |value| +——-+ | 1| | 2| | 3| | 4| +——-+

  1. <a name="lap0A"></a>
  2. ### 2.3.2 RDD和DataSet转换
  3. SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。
  4. ```scala
  5. scala> case class User(name:String,age:Int)
  6. defined class User
  7. scala> sc.makeRDD(List(("zhangsan",30),("lisi",40))).map(t=>User(t._1,t._2)).toDS
  8. res3: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
  9. scala> val rdd = res3.rdd
  10. rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[5] at rdd at <console>:25
  11. scala> rdd.collect
  12. res4: Array[User] = Array(User(zhangsan,30), User(lisi,40))

2.4 DataFrame和DataSet转换

  1. scala> case class User(name:String,age:Int)
  2. defined class User
  3. scala> val df = sc.makeRDD(List(("zhangsan",30),("lisi",40))).toDF("name","age")
  4. df: org.apache.spark.sql.DataFrame = [name: string, age: int]
  5. scala> val ds = df.as[User]
  6. ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]
  7. scala> val df = ds.toDF
  8. df: org.apache.spark.sql.DataFrame = [name: string, age: int]

2.5 RDD、DataFrame、DataSet三者的关系

如果同样的数据都给到这三个数据结构,分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。

2.5.1 三者的共性

  • RDD、DataFrame、DataSet 全都是 spark 平台下的分布式弹性数据集,为处理超大型数据提供便利
  • 三者都有惰性机制,在进行创建、转换,如 map 方法时,不会立即执行,只有在遇到Action 如 foreach 时,三者才会开始遍历运算
  • 三者有许多共同的函数,如 filter,排序等; ➢ 在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:import spark.implicits._(在创建好 SparkSession 对象后尽量直接导入)
  • 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
  • 三者都有 partition 的概念
  • DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型

    2.5.2 三者的区别

  • RDD

    • RDD 一般和 spark mllib 同时使用
    • RDD 不支持 sparksql 操作
  • DataFrame
    • 与 RDD 和 Dataset 不同,DataFrame 每一行的类型固定为 Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
    • DataFrame 与 DataSet 一般不与 spark mllib 同时使用
    • DataFrame 与 DataSet 均支持 SparkSQL 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql 语句操作
    • DataFrame 与 DataSet 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头
  • DataSet
    • Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]
    • DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性中的第七条提到的模式匹配拿出特定字段。而 Dataset 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息

QQ截图20220624144212.png

2.6 IDEA开发SparkSQL

2.6.1 添加依赖

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-sql_2.12</artifactId>
  4. <version>3.0.3</version>
  5. </dependency>

2.6.2 环境

  1. object Spark01_SparkSQL_Basic {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 创建SparkSQL的运行环境
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
  5. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  6. // TODO 执行逻辑操作
  7. // TODO 关闭环境
  8. spark.close()
  9. }
  10. }

2.6.3 代码

  1. object Spark01_SparkSQL_Basic {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 创建SparkSQL的运行环境
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
  5. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  6. import spark.implicits._
  7. // TODO 执行逻辑操作
  8. // TODO DataFrame
  9. // val df: DataFrame = spark.read.json("datas/user.json")
  10. // df.show()
  11. // DataFrame => SQL
  12. // df.createOrReplaceTempView("user")
  13. // spark.sql("select * from user").show()
  14. // spark.sql("select age, username from user").show()
  15. // spark.sql("select avg(age) from user").show()
  16. // DataFrame => DSL
  17. // 在使用DataFrame时,如果涉及到转换操作,要引入转换规则
  18. // df.select("age","username").show
  19. // df.select($"age" + 1).show
  20. // df.select('age + 1).show
  21. // TODO DataSet
  22. // DataFrame是特定泛型的DataSet
  23. val seq = Seq(1, 2, 3, 4)
  24. // val ds: Dataset[Int] = seq.toDS()
  25. // ds.show()
  26. // RDD <=> DataFrame
  27. val rdd = spark.sparkContext.makeRDD(List((1, "zhangsan", 30), (2, "lisi", 40)))
  28. val df: DataFrame = rdd.toDF("id", "name", "age")
  29. val rowRDD: RDD[Row] = df.rdd
  30. // DataFrame <=> DataSet
  31. val ds: Dataset[User] = df.as[User]
  32. val df1: DataFrame = ds.toDF()
  33. // RDD <=> DataSet
  34. val ds1: Dataset[User] = rdd.map {
  35. case (id, name, age) => {
  36. User(id, name, age)
  37. }
  38. }.toDS()
  39. val userRDD: RDD[User] = ds1.rdd
  40. // TODO 关闭环境
  41. spark.close()
  42. }
  43. case class User(id: Int, name: String, age: Int)
  44. }

2.7 用户自定义函数

2.7.1 UDF

  1. object Spark02_SparkSQL_UDF {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 创建SparkSQL的运行环境
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
  5. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  6. val df = spark.read.json("datas/user.json")
  7. df.createOrReplaceTempView("user")
  8. spark.udf.register("prefixName", (name: String) => {
  9. "Name: " + name
  10. })
  11. spark.sql("select age, prefixName(username) from user").show
  12. // TODO 关闭环境
  13. spark.close()
  14. }
  15. case class User(id: Int, name: String, age: Int)
  16. }

QQ截图20220625184215.png

2.7.2 UDAF

自定义聚合函数。通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数,已经不再推荐使用。可以统一采用强类型聚合函数Aggregator。

  • 弱类型

    1. object Spark03_SparkSQL_UDAF {
    2. def main(args: Array[String]): Unit = {
    3. // TODO 创建SparkSQL的运行环境
    4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    5. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    6. val df = spark.read.json("datas/user.json")
    7. df.createOrReplaceTempView("user")
    8. spark.udf.register("ageAvg", new MyAvgUDAF())
    9. spark.sql("select ageAvg(age) from user").show
    10. // TODO 关闭环境
    11. spark.close()
    12. }
    13. /*
    14. 自定义聚合函数类:计算年龄的平均值
    15. 1. 继承UserDefinedAggregateFunction
    16. 2. 重写方法
    17. */
    18. class MyAvgUDAF extends UserDefinedAggregateFunction {
    19. // 输入数据的结构:In
    20. override def inputSchema: StructType = {
    21. StructType(
    22. Array(
    23. StructField("age", LongType)
    24. )
    25. )
    26. }
    27. // 缓冲区数据的结构:Buffer
    28. override def bufferSchema: StructType = {
    29. StructType(Array(
    30. StructField("total", LongType),
    31. StructField("count", LongType)
    32. ))
    33. }
    34. // 函数计算结果的数据类型:Out
    35. override def dataType: DataType = LongType
    36. // 函数的稳定性,即传入相同的参数结果是否相同
    37. override def deterministic: Boolean = true
    38. // 缓冲区初始化
    39. override def initialize(buffer: MutableAggregationBuffer): Unit = {
    40. // 下面两种写法一样
    41. // buffer(0) = 0L
    42. // buffer(1) = 0L
    43. buffer.update(0, 0L)
    44. buffer.update(1, 0L)
    45. }
    46. // 根据输入的值更新缓冲区数据
    47. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    48. buffer.update(0, buffer.getLong(0) + input.getLong(0))
    49. buffer.update(1, buffer.getLong(1) + 1)
    50. }
    51. // 缓冲区数据合并
    52. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    53. buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
    54. buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
    55. }
    56. // 计算
    57. override def evaluate(buffer: Row): Any = {
    58. buffer.getLong(0) / buffer.getLong(1)
    59. }
    60. }
    61. }
  • 强类型

    1. object Spark03_SparkSQL_UDAF1 {
    2. def main(args: Array[String]): Unit = {
    3. // TODO 创建SparkSQL的运行环境
    4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    5. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    6. val df = spark.read.json("datas/user.json")
    7. df.createOrReplaceTempView("user")
    8. spark.udf.register("ageAvg", functions.udaf(new MyAvgUDAF()))
    9. spark.sql("select ageAvg(age) from user").show
    10. // TODO 关闭环境
    11. spark.close()
    12. }
    13. /*
    14. 自定义聚合函数类:计算年龄的平均值
    15. 1. 继承org.apache.spark.sql.expressions.Aggregator,定义泛型
    16. IN:输入的数据类型 Long
    17. BUF:缓冲区的数据类型
    18. OUT:输出的数据类型 Long
    19. 2. 重写方法
    20. */
    21. case class Buff(var total: Long, var count: Long)
    22. class MyAvgUDAF extends Aggregator[Long, Buff, Long] {
    23. // 初始值或零值,缓冲区的初始化
    24. override def zero: Buff = {
    25. Buff(0L, 0L)
    26. }
    27. // 根据输入的数据更新缓冲区的数据
    28. override def reduce(buff: Buff, in: Long): Buff = {
    29. buff.total = buff.total + in
    30. buff.count = buff.count + 1
    31. buff
    32. }
    33. // 合并缓冲区
    34. override def merge(buff1: Buff, buff2: Buff): Buff = {
    35. buff1.total = buff1.total + buff2.total
    36. buff1.count = buff1.count + buff2.count
    37. buff1
    38. }
    39. // 计算结果
    40. override def finish(buff: Buff): Long = {
    41. buff.total / buff.count
    42. }
    43. // 缓冲区的编码操作
    44. override def bufferEncoder: Encoder[Buff] = Encoders.product
    45. // 输出的编码操作
    46. override def outputEncoder: Encoder[Long] = Encoders.scalaLong
    47. }
    48. }
    1. object Spark03_SparkSQL_UDAF2 {
    2. def main(args: Array[String]): Unit = {
    3. // TODO 创建SparkSQL的运行环境
    4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    5. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    6. import spark.implicits._
    7. val df = spark.read.json("datas/user.json")
    8. df.createOrReplaceTempView("user")
    9. // 早期版本中,spark不能在sql中使用强类型UDAF操作
    10. // 早期的UDAF强类型聚合函数使用DSL语法操作
    11. val ds: Dataset[User] = df.as[User]
    12. // 将UDAF转换为查询的列对象
    13. val udafCol: TypedColumn[User, Long] = new MyAvgUDAF().toColumn
    14. ds.select(udafCol).show
    15. // TODO 关闭环境
    16. spark.close()
    17. }
    18. /*
    19. 自定义聚合函数类:计算年龄的平均值
    20. 1. 继承org.apache.spark.sql.expressions.Aggregator,定义泛型
    21. IN:输入的数据类型 User
    22. BUF:缓冲区的数据类型
    23. OUT:输出的数据类型 Long
    24. 2. 重写方法
    25. */
    26. case class User(username: String, age: Long)
    27. case class Buff(var total: Long, var count: Long)
    28. class MyAvgUDAF extends Aggregator[User, Buff, Long] {
    29. // 初始值或零值,缓冲区的初始化
    30. override def zero: Buff = {
    31. Buff(0L, 0L)
    32. }
    33. // 根据输入的数据更新缓冲区的数据
    34. override def reduce(buff: Buff, in: User): Buff = {
    35. buff.total = buff.total + in.age
    36. buff.count = buff.count + 1
    37. buff
    38. }
    39. // 合并缓冲区
    40. override def merge(buff1: Buff, buff2: Buff): Buff = {
    41. buff1.total = buff1.total + buff2.total
    42. buff1.count = buff1.count + buff2.count
    43. buff1
    44. }
    45. // 计算结果
    46. override def finish(buff: Buff): Long = {
    47. buff.total / buff.count
    48. }
    49. // 缓冲区的编码操作
    50. override def bufferEncoder: Encoder[Buff] = Encoders.product
    51. // 输出的编码操作
    52. override def outputEncoder: Encoder[Long] = Encoders.scalaLong
    53. }
    54. }

    2.8 数据的加载和保存

    2.8.1 通用的加载和保存方式

    SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式为 parquet。

  1. 加载数据

spark.read.load 是加载数据的通用方法

  1. scala> spark.read.
  2. csv format jdbc json load option options orc parquet schema
  3. table text textFile
  1. scala> spark.read.format("…")[.option("…")].load("…")
  2. // format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"
  3. // load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径
  4. // option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
  • 直接在文件上进行查询:文件格式.文件路径
    1. scala>spark.sql("select * from json.`/opt/module/data/user.json`").show
  1. 保存数据

df.write.save是保存数据的通用方法

  1. scala>df.write.
  2. csv jdbc json orc parquet textFile

保存不同格式的数据,可以对不同的数据格式进行设定

  1. scala>df.write.format("…")[.option("…")].save("…")
  2. // format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"
  3. // save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径
  4. // option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置

SaveMode是一个枚举类

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) “error”(default) 如果文件已经存在则抛出异常
SaveMode.Append “append” 如果文件已经存在则追加
SaveMode.Overwrite “overwrite” 如果文件已经存在则覆盖
SaveMode.Ignore “ignore” 如果文件已经存在则忽略
  1. df.write.mode("append").json("/opt/module/data/output")

2.8.2 Parquet

SparkSQL的默认数据源为Parquet格式,是一种能够有效存储嵌套数据的列式存储格式。
数据源为 Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用 format。修改配置项 spark.sql.sources.default,可修改默认数据源格式。

  1. 加载数据

    1. scala> val df = spark.read.load("examples/src/main/resources/users.parquet")
    2. scala> df.show
  2. 保存数据

    1. scala> var df = spark.read.json("/opt/module/data/input/people.json")
    2. //保存为 parquet 格式
    3. scala> df.write.mode("append").save("/opt/module/data/output")

    2.8.3 JSON

    Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row].。可以通过 SparkSession.read.json()去加载 JSON 文件。
    注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串

  3. 导入隐式转换

    1. import spark.implicits._
  4. 加载JSON文件

    1. val path = "/opt/module/spark-local/people.json"
    2. val peopleDF = spark.read.json(path)
  5. 创建临时表

    1. peopleDF.createOrReplaceTempView("people")
  6. 数据查询

    1. val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13
    2. AND 19")
    3. teenagerNamesDF.show()
    4. +------+
    5. | name|
    6. +------+
    7. |Justin|
    8. +------+

    2.8.4 CSV

    Spark SQL 可以配置 CSV 文件的列表信息,读取 CSV 文件,CSV 文件的第一行设置为数据列

    spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("data/user.csv")
    

    2.8.5 MySQL

    Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。如果使用 spark-shell 操作,可在启动 shell 时指定相关的数据库驱动路径或者将相关的数据库驱动放到 spark 的类路径下。

  • 在IDEA中通过JDBC对MySQL进行操作

    • 导入依赖

      <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.27</version>
      </dependency>
      
    • 读取和保存数据

      object Spark04_SparkSQL_JDBC {
      def main(args: Array[String]): Unit = {
      // TODO 创建SparkSQL的运行环境
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
      val spark = SparkSession.builder().config(sparkConf).getOrCreate()
      
      // 读取MySQL数据
      val df = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/spark-sql")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "user")
      .load()
      df.show
      
      // 保存数据
      df.write
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/spark-sql")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "user1")
      .mode(SaveMode.Append)
      .save()
      
      // TODO 关闭环境
      spark.close()
      }
      }
      

      2.8.6 Hive

      Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL 编译时可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译 Spark SQL 时引入 Hive支持,这样就可以使用这些特性了。如果下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。
      若要把 Spark SQL 连接到一个部署好的 Hive 上,必须把 hive-site.xml 复制到Spark 的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以运行。 需要注意的是,如果没有部署好 Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。此外,如果尝试使用 HiveQL 中的CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在默认的文件系统中的 /user/hive/warehouse 目录中(如果 classpath 中有配好的hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。
      spark-shell 默认是 Hive 支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。

  1. 内嵌的HIVE

Hive 的元数据存储在 derby 中, 默认仓库地址在spark的/spark-warehouse

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |     user|       true|
+--------+---------+-----------+

向表加载本地数据

scala> spark.sql("create table aaa(id int)")
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|      aaa|      false|
|        |     user|       true|
+--------+---------+-----------+


scala> spark.sql("select * from aaa").show
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+
  1. 外部的HIVE

    1. Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下
    2. 把 Mysql 的驱动 copy 到 jars/目录下
    3. 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
    4. 重启 spark-shell
      scala> spark.sql("show tables").show
      
  2. 运行Spark SQL CLI

Spark SQL CLI 可以很方便的在本地运行 Hive 元数据服务以及从命令行执行查询任务。在Spark 目录下执行如下命令启动 Spark SQL CLI,直接执行 SQL 语句,类似一 Hive 窗口bin/spark-sql

  1. 运行Spark beeline

Spark Thrift Server 是 Spark 社区基于 HiveServer2 实现的一个 Thrift 服务。旨在无缝兼容HiveServer2。因为 Spark Thrift Server 的接口和协议都和 HiveServer2 完全一致,因此我们部署好 Spark Thrift Server 后,可以直接使用 hive 的 beeline 访问 Spark Thrift Server 执行相关语句。Spark Thrift Server 的目的也只是取代 HiveServer2,因此它依旧可以和 Hive Metastore进行交互,获取到 hive 的元数据。
连接 Thrift Server,需要通过以下几个步骤:

  • Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下
  • 把 Mysql 的驱动 copy 到 jars/目录下
  • 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
  • 启动 Thrift Server

    sbin/start-thriftserver.sh
    
  • 使用beeline连接Thrift Server

    bin/beeline -u jdbc:hive2://hadoop102:10000 -n root
    
  1. 代码

    1. 导入依赖

      <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.12</artifactId>
      <version>3.0.0</version>
      </dependency>
      <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>1.2.1</version>
      </dependency>
      <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.27</version>
      </dependency>
      
    2. 将 hive-site.xml 文件拷贝到项目的 resources 目录中,代码实现

      //创建 SparkSession
      val spark: SparkSession = SparkSession
      .builder()
      .enableHiveSupport()
      .master("local[*]")
      .appName("sql")
      .getOrCreate()
      

      在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址:
      config(“spark.sql.warehouse.dir”, “hdfs://hadoop102:8020/user/hive/warehouse”)