SparkSQL是Spark用于处理结构化数据的模块,前身为Shark。

特点:

  • 易整合
    • 无缝的整合了SQL查询和Spark编程
  • 统一的数据访问
    • 相同方式连接不同数据源
  • 兼容Hive
    • 支持SQL或者HiveQL
  • 标准数据连接
    • 通过JDBC或ODBC连接

      数据模型

      1、DataFrame

      在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame 与 RDD 的主要区别在于,前者带有 schema 元信息,即 DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得 Spark SQL 得以洞察更多的结构信息,从而对藏于 DataFrame 背后的数据源以及作用于 DataFrame 之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观 RDD,由于无从得知所存数据元素的具体内部结构,Spark Core 只能在 stage 层面进行简单、通用的流水线优化。
      同时,与 Hive 类似,DataFrame 也支持嵌套数据类型(struct、array 和 map)。从 API易用性的角度上看,DataFrame API 提供的是一套高层的关系操作,比函数式的 RDD API 要更加友好,门槛更低。

和RDD的区别:
image.png
左侧的 RDD[Person]虽然以 Person 为类型参数,但 Spark 框架本身不了解 Person 类的内部结构。而右侧的 DataFrame 却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么

DataFrame 也是懒执行的,但性能上比 RDD 要高,主要原因:优化的执行计划,即查询计划通过 Spark catalyst optimiser 进行优化。

2、DataSet

DataSet 是分布式数据集合。DataSet 是 Spark 1.6 中添加的一个新抽象,是 DataFrame的一个扩展。它提供了 RDD 的优势(强类型,使用强大的 lambda 函数的能力)以及 Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换(操作 map,flatMap,filter等等)

  • DataSet 是 DataFrame API 的一个扩展,是 SparkSQL 最新的数据抽象
  • 用户友好的 API 风格,既具有类型安全检查也具有 DataFrame 的查询优化特性;
  • 用样例类来对 DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet 中的字段名称;
  • DataSet 是强类型的。比如可以有 DataSet[Car],DataSet[Person]。
  • DataFrame 是 DataSet 的特列,DataFrame=DataSet[Row] ,所以可以通过 as 方法将
  • DataFrame 转换为 DataSet。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的表结构信息都用 Row 来表示。获取数据时需要指定顺序

SparkSQL编程

在Spark-Core里面,环境对象为上下文对象:SparkContext,而在SparkSQL里面老的版本中,SparkSQL 提供两种 SQL 查询起始点:一个叫 SQLContext,用于 Spark自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。

SparkSession 是 Spark 最新的 SQL 查询起始点,实质上是 SQLContext 和 HiveContext的组合,所以在 SQLContex 和HiveContext 上可用的 API 在 SparkSession 上同样是可以使用的。SparkSession 内部封装了 SparkContext,所以计算实际上是由 sparkContext 完成的。

启动spark-shell,会准备好SparkSession对象spark
image.png
有一个read命令,可以读取文件,现在在bin/data/创建一个json文件,叫user.json,写入如下数据:
image.png

  1. {"username":"zhangsan","age":30}
  2. {"username":"lisi","age":20}
  3. {"username":"wangwu","age":40}

使用spark.read.json()读取文件,会得到一个DataFrame对象,结构会按照字典顺序排序,使用show来展示该对象
image.png

使用DataFrame对象创建临时视图,可以使用sql语句进行查询了。
df.createTempView()

spark.sql(sql语句).show

image.png

1、DataFrame

在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame有三种方式:

  • 通过 Spark 的数据源进行创建;
  • 从一个存在的 RDD 进行转换;
  • 还可以从 Hive Table 进行查询返回。

1、SQL语法

1、读取文件创建DataFrame
spark.read.xxx格式(文件名)

2、创建一个临时表
df.createOrReplaceTempView(表名)

可以使用df.createOrReplaceGlobalTempView()创建全局临时表

注意:普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,加上前缀global_temp如:global_temp.people

spark.newSession.xxx,新建一个session进行操作。

2、DSL语法

DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了

查看用法:df.printSchema
image.png

可以写的所有函数:
image.png

1、只查看“username”列:
select()选择某列
image.png
2、查看username列并且将所有age+1
可以使用$"字段"引用该字段,但是如果有一个引用的字段,所有字段都需要使用$引用
image.png
提供了一种简便的方式:使用单引号字段名即可引用该字段。

image.png
3、过滤字段
filter()过滤条件
image.png

4、分组并计数
分组使用groupBy()
计数使用count()
image.png

3、与RDD之间的转换

  • RDD转换为DataFrame
    • 使用rdd.toDF(字段名1,字段名2)
  • DataFrame转换为RDD
    • df.rdd()

注意:此时得到的 RDD 存储类型为 Row
image.png

2、DataSet

DataSet 是具有强类型的数据集合,需要提供对应的类型信息。

1、创建DataSet

使用样例类创建DataSet

case class Person(name: String, age: Long)
val ds = Seq(Person(“zhangsan”,2)).toDS()
ds.show
image.png
使用基本序列创建DataSet
image.png

注意:在实际使用的时候,很少用到把序列转换成 DataSet,更多的是通过RDD来得到 DataSet
**

2、与DataFrame的转换

  • DataSet to DataFrame
    • ds.toDF
  • DataFrame to DataSet
    • 定义一个case class,内部属性和DataSet一致
    • df.as[case class]

image.png

image.png

3、与RDD的直接转换

前提:一个RDD有其类型信息,可以是一个样例类

  • 样例类rdd.toDS

  • ds.rdd

image.png

3、RDD、DataFrame、DataSet三者的关系

  • Spark1.0 => Rdd
  • Spark1.3 => DataFrame
  • Spark1.6 => DataSet

1、共性

  • RDD、DataFrame、DataSet 全都是 spark 平台下的分布式弹性数据集,为处理超大型数据提供便利;
  • 三者都有惰性机制,在进行创建、转换,如 map 方法时,不会立即执行,只有在遇到Action 如 foreach 时,三者才会开始遍历运算;
  • 三者有许多共同的函数,如 filter,排序等;
  • 在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:import spark.implicits._(在创建好 SparkSession 对象后尽量直接导入)
  • 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
  • 三者都有 partition 的概念
  • DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型

2、不同

  • RDD
    • RDD 一般和 spark mllib 同时使用
    • RDD 不支持 sparksql 操作
  • DataFrame
    • 与 RDD 和 Dataset 不同,DataFrame 每一行的类型固定为 Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
    • DataFrame 与 DataSet 一般不与 spark mllib 同时使用
    • DataFrame 与 DataSet 均支持 SparkSQL 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql 语句操作
    • DataFrame 与 DataSet 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头,这样每一列的字段名一目了然(后面专门讲解)
  • DataSet
    • Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]
    • DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性中的第七条提到的模式匹配拿出特定字段。而 Dataset 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息

4、Idea开发

1、创建SparkSQL的运行环境

SparkSession由于构造器是private私有的,所以不可以直接new出来,可以考虑通过伴生对象或者工厂方法等产生。
添加spark-sql的依赖

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-sql_2.12</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>
  1. class SparkSession private(
  2. private[sql] def this(sc: SparkContext) {
  3. this(sc, None, None,
  4. SparkSession.applyExtensions(
  5. sc.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
  6. new SparkSessionExtensions))
  7. }

image.png

  1. def main(args: Array[String]): Unit = {
  2. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
  3. // 私有的构造器,使用伴生对象的工厂方法创建
  4. val spark = SparkSession.builder().config(conf).getOrCreate()
  5. spark.close()
  6. }

2、DataFrame基本操作

唯一需要注意的是,要对数据进行转换时,需要导入sparkSession对象的隐式转换规则。
import spark.implicits._

  1. def main(args: Array[String]): Unit = {
  2. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
  3. // 私有的构造器,使用伴生对象的工厂方法创建
  4. val spark = SparkSession.builder().config(conf).getOrCreate()
  5. // 逻辑
  6. // DataFrame
  7. val df: DataFrame = spark.read.json("datas/user.json")
  8. df.show()
  9. df.createTempView("user")
  10. df.select("username").show()
  11. spark.sql("select * from user").show()
  12. // 需要引入隐式转换。是sparkSession对象里面的!!!
  13. import spark.implicits._
  14. df.select($"age"+1).show()
  15. df.filter('age > 20).show()
  16. // 关闭
  17. spark.close()
  18. }

3、DataSet基本操作

DataFrame其实就是特定泛型的DataSet。

  1. type DataFrame = Dataset[Row]
  1. // DataSet
  2. val seq = Seq(1,2,3,4)
  3. val ds: Dataset[Int] = seq.toDS()
  4. ds.show()

4、DataFrame和DataSet转换

  1. // RDD => DataFrame
  2. val originRdd: RDD[(Int, String)] = spark.sparkContext.makeRDD(List((1, "zhangsan"), (2, "lisi"), (3, "wang5")))
  3. val rddDf: DataFrame = originRdd.toDF("id", "name")
  4. rddDf.show()
  5. // DataFrame => RDD
  6. val dfRdd: RDD[Row] = rddDf.rdd
  7. dfRdd.collect.foreach(println)
  8. // RDD => DataSet
  9. val userRdd: RDD[User] = spark.sparkContext.makeRDD(List(User(1, "zhangsan"), User(2, "lisi"), User(3, "wang5")))
  10. val rddDs: Dataset[User] = userRdd.toDS()
  11. rddDs.show()
  12. // DataSet => RDD
  13. val dsRdd: RDD[User] = rddDs.rdd
  14. dsRdd.collect.foreach(println)
  15. // DataFrame => DataSet
  16. val dff: DataFrame = userRdd.toDF("id", "name")
  17. val dfDs: Dataset[User] = dff.as[User]
  18. dfDs.show()
  19. // DataSet => DataFrame
  20. val dsDf: DataFrame = dfDs.toDF()
  21. dsDf.show()
  22. // 关闭
  23. spark.close()
  24. }
  25. case class User(id: Int, name: String)

5、UDF函数

用户自定义函数(User Defined Function),可以用来在spark.sql()中执行自定义函数

  1. // 注册函数
  2. // 注册的函数名;自定义函数逻辑
  3. spark.udf.register("prefix",(username:String) => {
  4. "Name: " + username
  5. })
  6. // 自定义函数UDF,实现用户自定义逻辑
  7. spark.sql("select prefix(username) from user").show()

6、UDAF函数

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator

1、弱类型函数实现

计算年龄平均值

  1. object TestUDFA {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
  4. // 私有的构造器,使用伴生对象的工厂方法创建
  5. val spark = SparkSession.builder().config(conf).getOrCreate()
  6. // 逻辑
  7. // DataFrame
  8. val df: DataFrame = spark.read.json("datas/user.json")
  9. df.show()
  10. df.createTempView("user")
  11. df.select("username").show()
  12. spark.sql("select * from user").show()
  13. // 注册函数
  14. // 注册的函数名;自定义函数逻辑
  15. spark.udf.register("avgAge",new MyAvgUDFA)
  16. // 自定义函数UDF,实现用户自定义逻辑
  17. spark.sql("select avgAge(age) from user").show()
  18. // 关闭
  19. spark.close()
  20. }
  21. /**
  22. * 自定义输入函数
  23. */
  24. class MyAvgUDFA extends UserDefinedAggregateFunction{
  25. // 输入数据的结构
  26. override def inputSchema: StructType = {
  27. // case class StructType(fields: Array[StructField])
  28. StructType(
  29. Array(
  30. //case class StructField(
  31. // name: String,
  32. // dataType: DataType,
  33. // nullable: Boolean = true,
  34. // metadata: Metadata = Metadata.empty)
  35. StructField("age",LongType)
  36. )
  37. )
  38. }
  39. // 缓冲区结构
  40. override def bufferSchema: StructType = {
  41. StructType(Array(
  42. StructField("total",LongType), // 计算总和
  43. StructField("count",LongType) // 所有个数
  44. ))
  45. }
  46. // 输出的数据类型
  47. override def dataType: DataType = LongType
  48. // 函数的稳定性
  49. override def deterministic: Boolean = true
  50. // 缓冲区初始化
  51. override def initialize(buffer: MutableAggregationBuffer): Unit = {
  52. // 初始化总和和个数为0
  53. buffer(0) = 0L // buffer(0) = Row.apply(0) = buffer.get(0)
  54. buffer(1) = 0L
  55. // 或者可以使用update
  56. // buffer.update(0,0L)
  57. // buffer.update(1,0L)
  58. }
  59. // 更新缓冲区的值
  60. override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
  61. // 每来一条数据,更新一次记录
  62. buffer.update(0,buffer.getLong(0) + input.getLong(0))
  63. buffer.update(1,buffer.getLong(1) + 1)
  64. }
  65. // 合并缓冲区
  66. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  67. buffer1.update(0,buffer1.getLong(0) + buffer2.getLong(0))
  68. buffer1.update(1,buffer1.getLong(1) + buffer2.getLong(1))
  69. }
  70. // 计算
  71. override def evaluate(buffer: Row): Any = {
  72. // 我们求平均值,缓冲区第一个是总和,第二个是总量,相除即可获取平均值
  73. buffer.getLong(0) / buffer.getLong(1)
  74. }
  75. }
  76. }

2、强类型函数实现

继承Aggregator,定义泛型,序列化的时候使用Encoders.product序列化缓冲区,使用Encoders.scalaxxx序列化基本数据类型

在注册自定义函数的时候,使用functions.udaf()包裹自定义函数类

  1. object TestUDFA_StrongType {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
  4. // 私有的构造器,使用伴生对象的工厂方法创建
  5. val spark = SparkSession.builder().config(conf).getOrCreate()
  6. // 逻辑
  7. // DataFrame
  8. val df: DataFrame = spark.read.json("datas/user.json")
  9. df.show()
  10. df.createTempView("user")
  11. df.select("username").show()
  12. spark.sql("select * from user").show()
  13. // 注册函数
  14. // 注册的函数名;自定义函数逻辑
  15. // 使用functions.udaf(new MyAvgUDFA) 包裹
  16. spark.udf.register("avgAge",functions.udaf(new MyAvgUDFA))
  17. // 自定义函数UDF,实现用户自定义逻辑
  18. spark.sql("select avgAge(age) from user").show()
  19. // 关闭
  20. spark.close()
  21. }
  22. /**
  23. * 自定义输入函数
  24. */
  25. // abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
  26. case class Buff(var total:Long,var count:Long)
  27. class MyAvgUDFA extends Aggregator[Long,Buff,Long]{
  28. // 初始值
  29. override def zero: Buff = {
  30. Buff(0L,0L)
  31. }
  32. // 计算
  33. override def reduce(b: Buff, a: Long): Buff = {
  34. b.total += a
  35. b.count += 1
  36. b
  37. }
  38. // 合并
  39. override def merge(b1: Buff, b2: Buff): Buff = {
  40. b1.total += b2.total
  41. b1.count += b2.count
  42. b1
  43. }
  44. // 完成结果
  45. override def finish(reduction: Buff): Long = {
  46. reduction.total / reduction.count
  47. }
  48. // 编码
  49. // 如果是缓冲区,使用Encoders.produce
  50. // 如果是基本数据类型,使用Encoders.scalaxxxx
  51. override def bufferEncoder: Encoder[Buff] = Encoders.product
  52. override def outputEncoder: Encoder[Long] = Encoders.scalaLong
  53. }
  54. }
object TestUDFA_StrongType {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
    // 私有的构造器,使用伴生对象的工厂方法创建
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // 逻辑

    // DataFrame
    val df: DataFrame = spark.read.json("datas/user.json")
    df.show()

    df.createTempView("user")
    df.select("username").show()

    spark.sql("select * from user").show()

    // 注册函数
    // 注册的函数名;自定义函数逻辑
    // 使用functions.udaf(new MyAvgUDFA) 包裹
    spark.udf.register("avgAge",functions.udaf(new MyAvgUDFA))
    // 自定义函数UDF,实现用户自定义逻辑
    spark.sql("select avgAge(age) from user").show()

    // 关闭
    spark.close()
  }

  /**
   * 自定义输入函数
   */
  // abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
  case class Buff(var total:Long,var count:Long)
  class MyAvgUDFA extends Aggregator[Long,Buff,Long]{
    // 初始值
    override def zero: Buff = {
      Buff(0L,0L)
    }

    // 计算
    override def reduce(b: Buff, a: Long): Buff = {
      b.total += a
      b.count += 1
      b
    }

    // 合并
    override def merge(b1: Buff, b2: Buff): Buff = {

      b1.total += b2.total
      b1.count += b2.count
      b1
    }

    // 完成结果
    override def finish(reduction: Buff): Long = {
      reduction.total / reduction.count

    }

    // 编码
    // 如果是缓冲区,使用Encoders.produce
    // 如果是基本数据类型,使用Encoders.scalaxxxx
    override def bufferEncoder: Encoder[Buff] = Encoders.product

    override def outputEncoder: Encoder[Long] = Encoders.scalaLong
  }
}

3、早期强类型聚合函数实现

Aggregator是Spark3.0的时候才出现的,早期如何在弱类型的sql中使用强类型的聚合函数呢?

1、需要一个ds对象确定类型,自然要有一个样例类
2、将df转为ds,并执行查询操作
3、将UDAF对象转换为查询的列对象:toColumn()
4、ds.select(column)

object TestUDFA_Far_StrongType {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
    // 私有的构造器,使用伴生对象的工厂方法创建
    val spark = SparkSession.builder().config(conf).getOrCreate()

    // 逻辑

    // DataFrame
    val df: DataFrame = spark.read.json("datas/user.json")
    df.show()

    df.createTempView("user")

    import spark.implicits._
    val ds: Dataset[User] = df.as[User]

    // 将UDAF转成列对象
    val column: TypedColumn[User, Long] = new MyAvgUDFA().toColumn
    // 使用ds查询该列
    ds.select(column).show()


    // 关闭
    spark.close()
  }

  case class User(username:String,age:Long)

  /**
   * 自定义输入函数
   */
  // abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
  case class Buff(var total:Long,var count:Long)
  class MyAvgUDFA extends Aggregator[User,Buff,Long]{
    // 初始值
    override def zero: Buff = {
      Buff(0L,0L)
    }

    // 计算
    override def reduce(b: Buff, a: User): Buff = {
      b.total += a.age
      b.count += 1
      b
    }

    // 合并
    override def merge(b1: Buff, b2: Buff): Buff = {

      b1.total += b2.total
      b1.count += b2.count
      b1
    }

    // 完成结果
    override def finish(reduction: Buff): Long = {
      reduction.total / reduction.count

    }

    // 编码
    // 如果是缓冲区,使用Encoders.produce
    // 如果是基本数据类型,使用Encoders.scalaxxxx
    override def bufferEncoder: Encoder[Buff] = Encoders.product

    override def outputEncoder: Encoder[Long] = Encoders.scalaLong
  }
}

5、数据的加载和保存

1、通用方法

加载数据:

spark.read.load
默认加载和保存的都是parquet文件,

保存数据:

spark.write.save

保存数据还涉及到了一个保存模式,SaveMode

spark.write.format(“json”).mode(xxx).save

  • error(default):如果文件存在,直接抛异常
  • append:追加
  • overwrite:覆盖
  • ignore:忽略本次操作

如果我们就想要读取json文件,该如何?
spark.read.format(“json”).load(xxx)
spark.write.format(“json”).save(xx)
spark封装了json,spark.read.json

2、操作json和csv

如果我们想要进行查询操作的时候,都是先要创建一张表,然后才能spark.sql(x)查询。

其实我们也可以直接在文件上查询,这其实还是创建了一张表,不过是由spark给我们做了。

spark.sql(“select * from json.user.json“).show()

使用````将文件引起来,前面加上要读取的格式

数据源为 Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用 format。
修改配置项 spark.sql.sources.default,可修改默认数据源格式。

注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串。

读取csv文件:
Spark SQL 可以配置 CSV 文件的列表信息,读取 CSV 文件,CSV 文件的第一行设置为数据列

spark.read.format("csv")
.option("sep",";") # 以分号为分隔符
.option("inferSchema",true) # 内部结构
.option("header",true) # 是否将第一行作为表头
.load("data/user.csv")

3、操作MySQL

1、导入mysql驱动

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.27</version>
</dependency>

2、使用jdbc方式读取数据

object TestMySQL {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
    // 私有的构造器,使用伴生对象的工厂方法创建
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    // 读
    val df = spark.read.format("jdbc")
      .option("url","jdbc:mysql://localhost:3306/guliedu")
      .option("driver","com.mysql.jdbc.Driver")
      .option("user","root")
      .option("password","root")
      .option("dbtable","acl_permission")
      .load()
    df.show()


    // 写

    df.write.format("jdbc").option("url","jdbc:mysql://localhost:3306/guliedu")
      .option("driver","com.mysql.jdbc.Driver")
      .option("user","root")
      .option("password","root")
      .option("dbtable","acl_permission")
      .mode(SaveMode.Overwrite)
      .save
  }

}

4、操作Hive

若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到Spark 的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以运行。 需要注意的是,如果你没有部署好 Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse目录中(如果你的 classpath 中有配好的hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。spark-shell 默认是 Hive 支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。

1、操作内置Hive

查询数据表:
spark.sql(“show tables”).show

创建数据表:
spark.sql(“create table aa(id int)”)

向表加载本地数据:
spark.sql(“load data local inpath ‘input/ids.txt’ into table aa”)

spark.sql(“select * from aa”)

在实际使用中, 几乎没有任何人会使用内置的 Hive
**

2、操作外置Hive

  • Spark接管hive,将hive-site.xml拷贝到conf目录下
  • 把MySQL的驱动拷贝到spark的jars中
  • 如果访问不到hdfs,需要把core-site.xml和hdfs-site.xml拷贝到conf下
  • 重启spark-shell

代码中操作Hive:
1、添加hive-site.xml到类路径下
2、启用Hive支持 :enableHiveSupport
3、添加依赖

添加依赖

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.12</artifactId>
  <version>3.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-exec</artifactId>
  <version>1.2.1</version>
</dependency>
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.27</version>
</dependency>

碰到jar包在中央仓库找不到,下载下来安装到本地仓库
jar包:

<!-- https://mvnrepository.com/artifact/org.pentaho/pentaho-aggdesigner-algorithm -->
<dependency>
  <groupId>org.pentaho</groupId>
  <artifactId>pentaho-aggdesigner-algorithm</artifactId>
  <version>5.1.5-jhyde</version>
  <scope>test</scope>
</dependency>

使用命令安装到本地仓库

mvn install:install-file -DgroupId=org.pentaho -DartifactId=pentaho-aggdesigner-algorithm -Dversion=5.1.5-jhyde -Dpackaging=jar -Dfile=pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar

配置hive的地址、目录等

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

如果遇到文件权限问题,可以在系统配置中添加当前用户为操作hadoop集群的用户

System.setProperty("HADOOP_USER_NAME",root);

3、beeline操作Hive

如果想连接 Thrift Server,需要通过以下几个步骤:

  • Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下
  • 把 Mysql 的驱动 copy 到 jars/目录下
  • 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
  • 启动 Thrift Server
bin/beeline \
-u jdbc:hive2://linux1:10000 \ # 连接到的hive地址
-n root #以什么用户运行

6、案例-操作Hive(略)