Spark SQL介绍
Spark SQL和我们之前分享Hive的时候说的hive on spark是不一样的。hive on spark是表示把底层的mapreduce引擎替换为spark引擎。而Spark SQL是Spark自己实现的一套SQL处理引擎。
SparkSession
要使用Spark SQL,首先需要创建一个SpakSession对象,SparkSession中包含了SparkContext和SqlContext
所以说想通过SparkSession来操作RDD的话需要先通过它来获取SparkContextSqlContext是使用sparkSQL操作hive的时候会用到的。
DataFrame介绍
Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。DataFrame=RDD+Schema。
它其实和关系型数据库中的表非常类似,RDD可以认为是表中的数据,Schema是表结构信息。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD
Spark1.3出现的DataFrame,Spark1.6出现了DataSet,在Spark2.0中两者统一,DataFrame等于DataSet[Row]
创建DataFrame
使用SparkSession,可以从RDD、HIve表或者其它数据源创建DataFrame
那下面我们来使用JSON文student.json件来创建一个DataFrame
student.json
{"name":"jack","age":19,"sex":"male"}{"name":"tom","age":18,"sex":"male"}{"name":"jessic","age":27,"sex":"female"}{"name":"hehe","age":18,"sex":"female"}{"name":"haha","age":15,"sex":"male"}
引入spark-sql的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder().appName("SqlDemoScala").config(conf).getOrCreate()
//读取json文件,获取DataFrame
val stuDf = sparkSession.read.json("D:\\test\\student.json")
//查看DataFrame中的数据
stuDf.show()
//查看表结构
stuDf.printSchema()
sparkSession.stop()
}
运行结果

由于DataFrame等于DataSet[Row],它们两个可以互相转换,所以创建哪个都是一样的
前面的scala代码默认创建的是DataFrame,java代码默认创建的是DataSet
尝试对他们进行转换
在Scala代码中将DataFrame转换为DataSet[Row],对后面的操作没有影响
//将DataFrame转换为DataSet[Row]
val stuDf = sparkSession.read.json("D:\\test\\student.json").as("stu")
在Java代码中将DataSet[Row]转换为DataFrame
//将Dataset<Row>转换为DataFrame
Dataset<Row> stuDf = sparkSession.read().json("D:\\test\\student.json").toDF();
DataFrame和RDD的对比
RDD是Spark的内核
RDD弹性的分布式数据集五大特性
1、他有一系列的Partition组成的
2、每一个算子作用在每一个partition上
3、rdd之间是有依赖关系的
宽依赖:父RDD的分区被子RDD的多个分区使用 例如 groupByKey、reduceByKey、sortByKey等操作会产生宽依赖,会产生shuffle
窄依赖:父RDD的每个分区都只被子RDD的一个分区使用 例如map、filter、union等操作会产生窄依赖
4、可选项:分区器作用在KV格式的RDD上 (1)分区器是在shuffle阶段起作用(2) GroupByKey, reduceBykey, join, sortByKey等这些算子会产生shuffle(3)这些算子必须作用在KV格式的RDD
5、RDD会提供一系列最佳计算位置,说白了就是暴露每一个partitior的位置这是数据本地化的基础
- RDD:
java/scala ==> jvm
python ==> python runtime - DataFrame:
java/scala/python ==> Logic PlanDataFrame常见算子操作
先看一下官方文档

student.json
{"name":"jack","age":19,"sex":"male"}
{"name":"tom","age":18,"sex":"male"}
{"name":"jessic","age":27,"sex":"female"}
{"name":"hehe","age":18,"sex":"female"}
{"name":"haha","age":15,"sex":"male"}
printSchema()
打印schema信息
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder().appName("DataFrameOpScala").config(conf).getOrCreate()
//读取json文件,获取DataFrame
val df = sparkSession.read.json("D:\\test\\student.json")
//打印schema信息
df.printSchema()
sparkSession.stop()
}
show()
默认显示所有数据,可以通过参数控制显示多少条
df.show(2)
select()
查询数据中的指定字段信息
df.select("name", "age").show()

//在使用select的时候可以对数据做一些操作,需要添加隐式转换函数,否则语法报错
import sparkSession.implicits._
df.select($"name", $"age" + 1).show()
filter()、where()
对数据进行过滤,需要添加隐式转换函数,否则报错
import sparkSession.implicits._
df.filter($"age" > 18).show()

//where底层调用的就是filter
df.where($"age" > 18).show()
groupBy()和count()
//对数据进行分组求和
df.groupBy("age").count().show()

这些就是针对DataFrame的一些常见的操作。
但是现在这种方式其实用起来还是不方便,只是提供了一些类似于可以操作表的算子,很对一些简单的查询还是可以的,但是针对一些复杂的操作,使用算子写起来就很麻烦了,所以我们希望能够直接支持用sql的方式执行,Spark SQL也是支持的
DataFrame的sql操作
想要实现直接支持sql语句查询DataFrame中的数据
需要两步操作
1、将DataFrame注册为一个临时表df.createOrReplaceTempView()
2、使用sparkSession中的sql函数执行sql语句sparkSession.sql()
object DataFrameSqlScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder().appName("DataFrameSqlScala").config(conf).getOrCreate()
//读取json文件,获取DataFrame
val df = sparkSession.read.json("D:\\test\\student.json")
//将DataFrame注册为一个临时表
df.createOrReplaceTempView("student")
//使用sql查询临时表中的数据
sparkSession.sql("select age,count(1) as num from student group by age").show()
sparkSession.stop()
}
}
RDD转换为DataFrame
为什么要将RDD转换为DataFrame?
在实际工作中我们可能会先把hdfs上的一些日志数据加载进来,然后进行一些处理,最终变成结构化的数据,希望对这些数据做一些统计分析,当然了我们可以使用spark中提供的transformation算子来实现,只不过会有一些麻烦,毕竟是需要写代码的,如果能够使用sql实现,其实是更加方便的。
所以可以针对我们前面创建的RDD,将它转换为DataFrame,这样就可以使用dataFrame中的一些算子或者直接写sql来操作数据了。
Spark SQL支持这两种方式将RDD转换为DataFrame
- 反射方式
-
反射方式(rdd.toDF())
基于反射的方式,代码比较简洁,需要提前知道RDD中数据的格式
Scala具有隐式转换的特性,所以spark sql的scala接口是支持自动将包含了case class的RDD转换为DataFrame的 ```scala object RddToDataFrameByReflectScala {def main(args: Array[String]): Unit = {
val conf = new SparkConf() conf.setMaster(“local”)
//创建SparkSession对象,里面包含SparkContext和SqlContext val sparkSession = SparkSession.builder().appName(“RddToDataFrameByReflectScala”).config(conf).getOrCreate() //获取SparkContext val sc = sparkSession.sparkContext
val rdd = sc.parallelize(Array((“jack”, 18), (“tom”, 20), (“jessic”, 30)))
//基于反射直接将包含Student对象的dataRDD转换为dataFrame //需要导入隐式转换 import sparkSession.implicits._ val stuDf = rdd.map(tup => Student(tup._1, tup._2)).toDF()
//1、显示所有数据 stuDf.show()
stuDf.createOrReplaceTempView(“student”)
//2、查询age<25岁的,并展示 sparkSession.sql(“select name,age from student where age<25”).show()
//查询age>18岁的 val resDf = sparkSession.sql(“select name,age from student where age>18”)
//将DataFrame转化为RDD val resRDD = resDf.rdd
//3、从row中取数据,封装成student,打印到控制台 resRDD.map(row => Student(row(0).toString, row(1).toString.toInt)) .collect() .foreach(println(_))
//4、使用row的getAs方法,获取指定列名的值 resRDD.map(row => Student(row.getAsString, row.getAsInt)) .collect() .foreach(println(_))
sparkSession.stop()
}
}
//定义一个Student case class Student(name: String, age: Int)

<a name="tOs2o"></a>
## 编程方式
这种方式是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,就是Schema,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。<br />也就是说当case calss中的字段无法预先定义的时候,就只能用编程方式动态指定元数据了
```scala
object RddToDataFrameByProgramScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder().appName("RddToDataFrameByProgramScala").config(conf).getOrCreate()
//获取SparkContext
val sc = sparkSession.sparkContext
val rdd = sc.parallelize(Array(("jack", 18), ("tom", 20), ("jessic", 30)))
//组装rowRDD
val rowRDD = rdd.map(tup => Row(tup._1, tup._2))
//指定元数据信息【这个元数据信息就可以动态从外部获取了,比较灵活】
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
//组装DataFrame
val stuDf = sparkSession.createDataFrame(rowRDD, schema)
//下面就可以通过DataFrame的方式操作dataRDD中的数据了
stuDf.createOrReplaceTempView("student")
//执行sql查询
sparkSession.sql("select name,age from student where age > 18").show()
val resDf = sparkSession.sql("select name,age from student where age > 18")
//将DataFrame转化为RDD
val resRDD = resDf.rdd
resRDD.map(row => (row(0).toString, row(1).toString.toInt))
.collect()
.foreach(println(_))
sparkSession.stop()
}
}
DataFrame的load和save操作
load
save
主要用于将DataFrame中的数据保存到文件中。
前面操作json格式的数据的方法
//读取json文件,获取DataFrame
val stuDf = sparkSession.read.json("D:\\test\\student.json")
//查看DataFrame中的数据
stuDf.show()
查看.json()方法的源码
def json(paths: String*): DataFrame = format("json").load(paths : _*)
如果不指定format,则默认读取的数据源格式是parquet
Spark SQL内置了一些常见的数据源类型,比如json, parquet, jdbc, orc, csv, text
案例
object LoadAndSaveOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
val sparkSession = SparkSession.builder().appName("LoadAndSaveOpScala").config(conf).getOrCreate()
//读取json文件,获取DataFrame
//val df = sparkSession.read.json("D:\\test\\student.json")
val df = sparkSession.read.format("json").load("D:\\test\\student.json")
//查看数据
df.show()
//保存数据
df.select("name", "age") //查询指定字段
.write
.format("csv")
.save("hdfs://bigdata1:9000/out-save001")
sparkSession.stop()
}
}
SaveMode
Spark SQL对于save操作,提供了不同的save mode。
主要用来处理,当目标位置已经有数据时应该如何处理。save操作不会执行锁操作,并且也不是原子的,因此是有一定风险出现脏数据的。
| SaveMode | 解释 |
|---|---|
| SaveMode.ErrorIfExists (默认) | 如果目标位置已经存在数据,那么抛出一个异常 |
| SaveMode.Append | 如果目标位置已经存在数据,那么将数据追加进去 |
| SaveMode.Overwrite | 如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖 |
| SaveMode.Ignore | 如果目标位置已经存在数据,那么就忽略,不做任何操作 |
df.select("name", "age") //查询指定字段
.write
.format("csv")
.mode(SaveMode.Append) //追加
.save("hdfs://bigdata1:9000/out-save001")
内置函数
Spark中提供了很多内置的函数
种类 函数
聚合函数 avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct
集合函数 array_contains, explode, size
日期/时间函数 datediff, date_add, date_sub, add_months, last_day, next_day, months_between, current_date, current_timestamp, date_format
数学函数 abs, ceil, floor, round
混合函数 if, isnull, md5, not, rand, when
字符串函数 concat, get_json_object, length, reverse, split, upper
窗口函数 denseRank, rank, rowNumber
其实这里面的函数和hive中的函数是类似的
注意:SparkSQL中的SQL函数文档不全,其实在使用这些函数的时候,大家完全可以去查看hive中sql的文档,使用的时候都是一样的。




