1 SparkSQL概述
1.1 SparkSQL是什么
Spark SQL是Spark用于结构化处理的Spark模块。
1.2 Hive and SparkSQL
- 数据兼容方面 SparkSQL 不但兼容 Hive,还可以从 RDD、parquet 文件、JSON 文件中获取数据,未来版本甚至支持获取 RDBMS 数据以及 cassandra 等 NOSQL 数据;
- 性能优化方面 除了采取 In-Memory Columnar Storage、byte-code generation 等优化技术外、将会引进 Cost Model 对查询进行动态评估、获取最佳物理计划等等;
- 组件扩展方面 无论是 SQL 的语法解析器、分析器还是优化器都可以重新定义,进行扩展
- SparkSQL 可以简化 RDD 的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是 SparkSQL。Spark SQL 为了简化 RDD 的开发,提高开发效率,提供了 2 个编程抽象,类似 Spark Core 中的 RDD
- 易整合
无缝整合了SQL查询和Spark编程
- 统一的数据访问
使用相同的方式连接不同的数据源
- 兼容Hive
在已有的仓库上直接运行SQL或者HiveQL
- 标准数据连接
1.4 DataFrame
在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame 与 RDD 的主要区别在于,前者带有 schema 元信息,即 DataFrame所表示的二维表数据集的每一列都带有名称和类型。
DataFrame 是为数据提供了 Schema 的视图。可以把它当做数据库中的一张表来对待 DataFrame 也是懒执行的,但性能上比 RDD 要高,主要原因:优化的执行计划,即查询计划通过 Spark catalyst optimiser 进行优化。比如先对 DataFrame 进行过滤,再 join 过滤后的较小的结果集。
1.5 DataSet
DataSet 是分布式数据集合。DataSet 是 Spark 1.6 中添加的一个新抽象,是 DataFrame的一个扩展。它提供了 RDD 的优势(强类型,使用强大的 lambda 函数的能力)以及 Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换(操作 map,flatMap,filter等等)。
- DataSet 是 DataFrame API 的一个扩展,是 SparkSQL 最新的数据抽象
- 用户友好的 API 风格,既具有类型安全检查也具有 DataFrame 的查询优化特性;
- 用样例类来对 DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射DataSet 中的字段名称;
- DataSet 是强类型的。比如可以有 DataSet[Car],DataSet[Person]。
- DataFrame 是 DataSet 的特列,DataFrame=DataSet[Row] ,所以可以通过 as 方法将DataFrame 转换为 DataSet。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的表结构信息都用 Row 来表示。获取数据时需要指定顺序
2 SparkSQL核心编程
2.1 查询起始点
SparkSession 是 Spark 最新的 SQL 查询起始点,实质上是 SQLContext 和 HiveContext的组合。SparkSession 内部封装了 SparkContext,所以计算实际上是由 sparkContext 完成的。使用 spark-shell 的时候, spark 框架会自动的创建一个名称叫做 spark 的 SparkSession 对 象, 就像以前可以自动获取到一个 sc 来表示 SparkContext 对象一样
2.2 DataFrame
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。2.2.1 创建DataFrame
在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame有三种方式:通过 Spark 的数据源进行创建;从一个存在的 RDD 进行转换;还可以从 Hive Table 进行查询返回。
从Spark数据源进行创建
查看Spark支持创建文件的数据源格式
scala> spark.read.csv format jdbc json load option options orc parquet schema table text textFile
在spark的\bin\input目录中创建user.json
{"username":"zhangsan","age":30}{"username":"lisi","age":20}{"username":"wangwu","age":32}
读取json文件创建DataFrame
scala> spark.read.json("input/user.json")res0: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
从RDD进行转换
-
2.2.2 SQL语法
SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。
读取json文件创建DataFrame

- 对DataFrame创建一个临时表

- 通过SQL语句实现查询



- 对于DataFrame创建一个全局表
普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.user
scala> df.createOrReplaceGlobalTempView("emp")
- 通过SQL语句实现查询全表
2.2.3 DSL语法
DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必创建临时视图。
查看DataFrame的Schema信息
scala> df.printSchemaroot|-- age: long (nullable = true)|-- username: string (nullable = true)
只查看”username”列数据
scala> df.select("username").show()+--------+|username|+--------+|zhangsan|| lisi|| wangwu|+--------+
查看age+1
scala> df.select($"age"+1).show+---------+|(age + 1)|+---------+| 31|| 21|| 33|+---------+scala> df.select('age + 1).show+---------+|(age + 1)|+---------+| 31|| 21|| 33|+---------+
查看age大于20的数据
scala> df.filter('age > 20).show+---+--------+|age|username|+---+--------+| 30|zhangsan|| 32| wangwu|+---+--------+
按照age分组,查看数据条数
scala> df.groupBy("age").count.show+---+-----+|age|count|+---+-----+| 32| 1|| 30| 1|| 20| 1|+---+-----+
2.2.4 RDD和DataFrame转换
在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入import spark.implicits._
spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持val 修饰的对象的引入。
spark-shell 中无需导入,自动完成此操作。 ```scala scala> val rdd = sc.makeRDD(List(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[53] at makeRDD at:24
scala> val df = rdd.toDF(“id”) df: org.apache.spark.sql.DataFrame = [id: int]
scala> df.show +—-+ | id| +—-+ | 1| | 2| | 3| | 4| +—-+
scala> df.rdd
res16: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[61] at rdd at
<a name="UK5cy"></a>## 2.3 DataSetDataSet是具有强类型的数据集合,需要提供对应得类型信息<a name="ugsP7"></a>### 2.3.1 创建DataSet1. 使用样例类序列创建DataSet```scalascala> case class Person(name: String, age: Long)defined class Personscala> val list = List(Person("zhangsan",30),Person("lisi",40))list: List[Person] = List(Person(zhangsan,30), Person(lisi,40))scala> list.toDSres17: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]scala> val ds = list.toDSds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]scala> ds.show+--------+---+| name|age|+--------+---+|zhangsan| 30|| lisi| 40|+--------+---+
- 使用基本类型得序列创建DataSet ```scala scala> val ds = Seq(1,2,3,4).toDS ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show +——-+ |value| +——-+ | 1| | 2| | 3| | 4| +——-+
<a name="lap0A"></a>### 2.3.2 RDD和DataSet转换SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。```scalascala> case class User(name:String,age:Int)defined class Userscala> sc.makeRDD(List(("zhangsan",30),("lisi",40))).map(t=>User(t._1,t._2)).toDSres3: org.apache.spark.sql.Dataset[User] = [name: string, age: int]scala> val rdd = res3.rddrdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[5] at rdd at <console>:25scala> rdd.collectres4: Array[User] = Array(User(zhangsan,30), User(lisi,40))
2.4 DataFrame和DataSet转换
scala> case class User(name:String,age:Int)defined class Userscala> val df = sc.makeRDD(List(("zhangsan",30),("lisi",40))).toDF("name","age")df: org.apache.spark.sql.DataFrame = [name: string, age: int]scala> val ds = df.as[User]ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]scala> val df = ds.toDFdf: org.apache.spark.sql.DataFrame = [name: string, age: int]
2.5 RDD、DataFrame、DataSet三者的关系
如果同样的数据都给到这三个数据结构,分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。
2.5.1 三者的共性
- RDD、DataFrame、DataSet 全都是 spark 平台下的分布式弹性数据集,为处理超大型数据提供便利
- 三者都有惰性机制,在进行创建、转换,如 map 方法时,不会立即执行,只有在遇到Action 如 foreach 时,三者才会开始遍历运算
- 三者有许多共同的函数,如 filter,排序等; ➢ 在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:import spark.implicits._(在创建好 SparkSession 对象后尽量直接导入)
- 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
- 三者都有 partition 的概念
DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型
2.5.2 三者的区别
RDD
- RDD 一般和 spark mllib 同时使用
- RDD 不支持 sparksql 操作
- DataFrame
- 与 RDD 和 Dataset 不同,DataFrame 每一行的类型固定为 Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
- DataFrame 与 DataSet 一般不与 spark mllib 同时使用
- DataFrame 与 DataSet 均支持 SparkSQL 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql 语句操作
- DataFrame 与 DataSet 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头
- DataSet
- Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]
- DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性中的第七条提到的模式匹配拿出特定字段。而 Dataset 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息
2.6 IDEA开发SparkSQL
2.6.1 添加依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.0.3</version></dependency>
2.6.2 环境
object Spark01_SparkSQL_Basic {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(sparkConf).getOrCreate()// TODO 执行逻辑操作// TODO 关闭环境spark.close()}}
2.6.3 代码
object Spark01_SparkSQL_Basic {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._// TODO 执行逻辑操作// TODO DataFrame// val df: DataFrame = spark.read.json("datas/user.json")// df.show()// DataFrame => SQL// df.createOrReplaceTempView("user")// spark.sql("select * from user").show()// spark.sql("select age, username from user").show()// spark.sql("select avg(age) from user").show()// DataFrame => DSL// 在使用DataFrame时,如果涉及到转换操作,要引入转换规则// df.select("age","username").show// df.select($"age" + 1).show// df.select('age + 1).show// TODO DataSet// DataFrame是特定泛型的DataSetval seq = Seq(1, 2, 3, 4)// val ds: Dataset[Int] = seq.toDS()// ds.show()// RDD <=> DataFrameval rdd = spark.sparkContext.makeRDD(List((1, "zhangsan", 30), (2, "lisi", 40)))val df: DataFrame = rdd.toDF("id", "name", "age")val rowRDD: RDD[Row] = df.rdd// DataFrame <=> DataSetval ds: Dataset[User] = df.as[User]val df1: DataFrame = ds.toDF()// RDD <=> DataSetval ds1: Dataset[User] = rdd.map {case (id, name, age) => {User(id, name, age)}}.toDS()val userRDD: RDD[User] = ds1.rdd// TODO 关闭环境spark.close()}case class User(id: Int, name: String, age: Int)}
2.7 用户自定义函数
2.7.1 UDF
object Spark02_SparkSQL_UDF {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(sparkConf).getOrCreate()val df = spark.read.json("datas/user.json")df.createOrReplaceTempView("user")spark.udf.register("prefixName", (name: String) => {"Name: " + name})spark.sql("select age, prefixName(username) from user").show// TODO 关闭环境spark.close()}case class User(id: Int, name: String, age: Int)}
2.7.2 UDAF
自定义聚合函数。通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数,已经不再推荐使用。可以统一采用强类型聚合函数Aggregator。
弱类型
object Spark03_SparkSQL_UDAF {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(sparkConf).getOrCreate()val df = spark.read.json("datas/user.json")df.createOrReplaceTempView("user")spark.udf.register("ageAvg", new MyAvgUDAF())spark.sql("select ageAvg(age) from user").show// TODO 关闭环境spark.close()}/*自定义聚合函数类:计算年龄的平均值1. 继承UserDefinedAggregateFunction2. 重写方法*/class MyAvgUDAF extends UserDefinedAggregateFunction {// 输入数据的结构:Inoverride def inputSchema: StructType = {StructType(Array(StructField("age", LongType)))}// 缓冲区数据的结构:Bufferoverride def bufferSchema: StructType = {StructType(Array(StructField("total", LongType),StructField("count", LongType)))}// 函数计算结果的数据类型:Outoverride def dataType: DataType = LongType// 函数的稳定性,即传入相同的参数结果是否相同override def deterministic: Boolean = true// 缓冲区初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {// 下面两种写法一样// buffer(0) = 0L// buffer(1) = 0Lbuffer.update(0, 0L)buffer.update(1, 0L)}// 根据输入的值更新缓冲区数据override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer.update(0, buffer.getLong(0) + input.getLong(0))buffer.update(1, buffer.getLong(1) + 1)}// 缓冲区数据合并override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))}// 计算override def evaluate(buffer: Row): Any = {buffer.getLong(0) / buffer.getLong(1)}}}
强类型
object Spark03_SparkSQL_UDAF1 {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(sparkConf).getOrCreate()val df = spark.read.json("datas/user.json")df.createOrReplaceTempView("user")spark.udf.register("ageAvg", functions.udaf(new MyAvgUDAF()))spark.sql("select ageAvg(age) from user").show// TODO 关闭环境spark.close()}/*自定义聚合函数类:计算年龄的平均值1. 继承org.apache.spark.sql.expressions.Aggregator,定义泛型IN:输入的数据类型 LongBUF:缓冲区的数据类型OUT:输出的数据类型 Long2. 重写方法*/case class Buff(var total: Long, var count: Long)class MyAvgUDAF extends Aggregator[Long, Buff, Long] {// 初始值或零值,缓冲区的初始化override def zero: Buff = {Buff(0L, 0L)}// 根据输入的数据更新缓冲区的数据override def reduce(buff: Buff, in: Long): Buff = {buff.total = buff.total + inbuff.count = buff.count + 1buff}// 合并缓冲区override def merge(buff1: Buff, buff2: Buff): Buff = {buff1.total = buff1.total + buff2.totalbuff1.count = buff1.count + buff2.countbuff1}// 计算结果override def finish(buff: Buff): Long = {buff.total / buff.count}// 缓冲区的编码操作override def bufferEncoder: Encoder[Buff] = Encoders.product// 输出的编码操作override def outputEncoder: Encoder[Long] = Encoders.scalaLong}}
object Spark03_SparkSQL_UDAF2 {def main(args: Array[String]): Unit = {// TODO 创建SparkSQL的运行环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._val df = spark.read.json("datas/user.json")df.createOrReplaceTempView("user")// 早期版本中,spark不能在sql中使用强类型UDAF操作// 早期的UDAF强类型聚合函数使用DSL语法操作val ds: Dataset[User] = df.as[User]// 将UDAF转换为查询的列对象val udafCol: TypedColumn[User, Long] = new MyAvgUDAF().toColumnds.select(udafCol).show// TODO 关闭环境spark.close()}/*自定义聚合函数类:计算年龄的平均值1. 继承org.apache.spark.sql.expressions.Aggregator,定义泛型IN:输入的数据类型 UserBUF:缓冲区的数据类型OUT:输出的数据类型 Long2. 重写方法*/case class User(username: String, age: Long)case class Buff(var total: Long, var count: Long)class MyAvgUDAF extends Aggregator[User, Buff, Long] {// 初始值或零值,缓冲区的初始化override def zero: Buff = {Buff(0L, 0L)}// 根据输入的数据更新缓冲区的数据override def reduce(buff: Buff, in: User): Buff = {buff.total = buff.total + in.agebuff.count = buff.count + 1buff}// 合并缓冲区override def merge(buff1: Buff, buff2: Buff): Buff = {buff1.total = buff1.total + buff2.totalbuff1.count = buff1.count + buff2.countbuff1}// 计算结果override def finish(buff: Buff): Long = {buff.total / buff.count}// 缓冲区的编码操作override def bufferEncoder: Encoder[Buff] = Encoders.product// 输出的编码操作override def outputEncoder: Encoder[Long] = Encoders.scalaLong}}
2.8 数据的加载和保存
2.8.1 通用的加载和保存方式
SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式为 parquet。
- 加载数据
spark.read.load 是加载数据的通用方法
scala> spark.read.csv format jdbc json load option options orc parquet schematable text textFile
scala> spark.read.format("…")[.option("…")].load("…")// format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"// load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载数据的路径// option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
- 直接在文件上进行查询:文件格式.
文件路径scala>spark.sql("select * from json.`/opt/module/data/user.json`").show
- 保存数据
df.write.save是保存数据的通用方法
scala>df.write.csv jdbc json orc parquet textFile…
保存不同格式的数据,可以对不同的数据格式进行设定
scala>df.write.format("…")[.option("…")].save("…")// format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"// save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径// option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置
SaveMode是一个枚举类
| Scala/Java | Any Language | Meaning |
|---|---|---|
| SaveMode.ErrorIfExists(default) | “error”(default) | 如果文件已经存在则抛出异常 |
| SaveMode.Append | “append” | 如果文件已经存在则追加 |
| SaveMode.Overwrite | “overwrite” | 如果文件已经存在则覆盖 |
| SaveMode.Ignore | “ignore” | 如果文件已经存在则忽略 |
df.write.mode("append").json("/opt/module/data/output")
2.8.2 Parquet
SparkSQL的默认数据源为Parquet格式,是一种能够有效存储嵌套数据的列式存储格式。
数据源为 Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用 format。修改配置项 spark.sql.sources.default,可修改默认数据源格式。
加载数据
scala> val df = spark.read.load("examples/src/main/resources/users.parquet")scala> df.show
保存数据
scala> var df = spark.read.json("/opt/module/data/input/people.json")//保存为 parquet 格式scala> df.write.mode("append").save("/opt/module/data/output")
2.8.3 JSON
Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row].。可以通过 SparkSession.read.json()去加载 JSON 文件。
注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串导入隐式转换
import spark.implicits._
加载JSON文件
val path = "/opt/module/spark-local/people.json"val peopleDF = spark.read.json(path)
创建临时表
peopleDF.createOrReplaceTempView("people")
数据查询
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13AND 19")teenagerNamesDF.show()+------+| name|+------+|Justin|+------+
2.8.4 CSV
Spark SQL 可以配置 CSV 文件的列表信息,读取 CSV 文件,CSV 文件的第一行设置为数据列
spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("data/user.csv")2.8.5 MySQL
Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。如果使用 spark-shell 操作,可在启动 shell 时指定相关的数据库驱动路径或者将相关的数据库驱动放到 spark 的类路径下。
在IDEA中通过JDBC对MySQL进行操作
导入依赖
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>读取和保存数据
object Spark04_SparkSQL_JDBC { def main(args: Array[String]): Unit = { // TODO 创建SparkSQL的运行环境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL") val spark = SparkSession.builder().config(sparkConf).getOrCreate() // 读取MySQL数据 val df = spark.read .format("jdbc") .option("url", "jdbc:mysql://localhost:3306/spark-sql") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "123456") .option("dbtable", "user") .load() df.show // 保存数据 df.write .format("jdbc") .option("url", "jdbc:mysql://localhost:3306/spark-sql") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "123456") .option("dbtable", "user1") .mode(SaveMode.Append) .save() // TODO 关闭环境 spark.close() } }2.8.6 Hive
Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL 编译时可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译 Spark SQL 时引入 Hive支持,这样就可以使用这些特性了。如果下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。
若要把 Spark SQL 连接到一个部署好的 Hive 上,必须把 hive-site.xml 复制到Spark 的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以运行。 需要注意的是,如果没有部署好 Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。此外,如果尝试使用 HiveQL 中的CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在默认的文件系统中的 /user/hive/warehouse 目录中(如果 classpath 中有配好的hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。
spark-shell 默认是 Hive 支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。
- 内嵌的HIVE
Hive 的元数据存储在 derby 中, 默认仓库地址在spark的/spark-warehouse
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| | user| true|
+--------+---------+-----------+
向表加载本地数据
scala> spark.sql("create table aaa(id int)")
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| aaa| false|
| | user| true|
+--------+---------+-----------+
scala> spark.sql("select * from aaa").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
| 5|
+---+
外部的HIVE
- Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下
- 把 Mysql 的驱动 copy 到 jars/目录下
- 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
- 重启 spark-shell
scala> spark.sql("show tables").show
运行Spark SQL CLI
Spark SQL CLI 可以很方便的在本地运行 Hive 元数据服务以及从命令行执行查询任务。在Spark 目录下执行如下命令启动 Spark SQL CLI,直接执行 SQL 语句,类似一 Hive 窗口bin/spark-sql
- 运行Spark beeline
Spark Thrift Server 是 Spark 社区基于 HiveServer2 实现的一个 Thrift 服务。旨在无缝兼容HiveServer2。因为 Spark Thrift Server 的接口和协议都和 HiveServer2 完全一致,因此我们部署好 Spark Thrift Server 后,可以直接使用 hive 的 beeline 访问 Spark Thrift Server 执行相关语句。Spark Thrift Server 的目的也只是取代 HiveServer2,因此它依旧可以和 Hive Metastore进行交互,获取到 hive 的元数据。
连接 Thrift Server,需要通过以下几个步骤:
- Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下
- 把 Mysql 的驱动 copy 到 jars/目录下
- 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
启动 Thrift Server
sbin/start-thriftserver.sh使用beeline连接Thrift Server
bin/beeline -u jdbc:hive2://hadoop102:10000 -n root
代码
导入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>将 hive-site.xml 文件拷贝到项目的 resources 目录中,代码实现
//创建 SparkSession val spark: SparkSession = SparkSession .builder() .enableHiveSupport() .master("local[*]") .appName("sql") .getOrCreate()在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址:
config(“spark.sql.warehouse.dir”, “hdfs://hadoop102:8020/user/hive/warehouse”)
