Spark SQL介绍

Spark SQL和我们之前分享Hive的时候说的hive on spark是不一样的。hive on spark是表示把底层的mapreduce引擎替换为spark引擎。而Spark SQL是Spark自己实现的一套SQL处理引擎。

SparkSession

要使用Spark SQL,首先需要创建一个SpakSession对象,SparkSession中包含了SparkContextSqlContext
所以说想通过SparkSession来操作RDD的话需要先通过它来获取SparkContext
SqlContext是使用sparkSQL操作hive的时候会用到的。

DataFrame介绍

Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame
DataFrame=RDD+Schema

它其实和关系型数据库中的表非常类似,RDD可以认为是表中的数据,Schema是表结构信息。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD

Spark1.3出现的DataFrame,Spark1.6出现了DataSet,在Spark2.0中两者统一,DataFrame等于DataSet[Row]

创建DataFrame

使用SparkSession,可以从RDD、HIve表或者其它数据源创建DataFrame
那下面我们来使用JSON文student.json件来创建一个DataFrame
student.json

  1. {"name":"jack","age":19,"sex":"male"}
  2. {"name":"tom","age":18,"sex":"male"}
  3. {"name":"jessic","age":27,"sex":"female"}
  4. {"name":"hehe","age":18,"sex":"female"}
  5. {"name":"haha","age":15,"sex":"male"}

引入spark-sql的依赖

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.3</version>
        </dependency>
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")

    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder().appName("SqlDemoScala").config(conf).getOrCreate()

    //读取json文件,获取DataFrame
    val stuDf = sparkSession.read.json("D:\\test\\student.json")

    //查看DataFrame中的数据
    stuDf.show()

    //查看表结构
    stuDf.printSchema()

    sparkSession.stop()

  }

运行结果
image.pngimage.png
由于DataFrame等于DataSet[Row],它们两个可以互相转换,所以创建哪个都是一样的
前面的scala代码默认创建的是DataFrame,java代码默认创建的是DataSet
尝试对他们进行转换
在Scala代码中将DataFrame转换为DataSet[Row],对后面的操作没有影响

//将DataFrame转换为DataSet[Row]     
val stuDf = sparkSession.read.json("D:\\test\\student.json").as("stu")

在Java代码中将DataSet[Row]转换为DataFrame

//将Dataset<Row>转换为DataFrame     
Dataset<Row> stuDf = sparkSession.read().json("D:\\test\\student.json").toDF();

DataFrame和RDD的对比

RDD是Spark的内核
RDD弹性的分布式数据集五大特性
1、他有一系列的Partition组成的
2、每一个算子作用在每一个partition上
3、rdd之间是有依赖关系的
宽依赖:父RDD的分区被子RDD的多个分区使用 例如 groupByKey、reduceByKey、sortByKey等操作会产生宽依赖,会产生shuffle
窄依赖:父RDD的每个分区都只被子RDD的一个分区使用 例如map、filter、union等操作会产生窄依赖image.png
4、可选项:分区器作用在KV格式的RDD上 (1)分区器是在shuffle阶段起作用(2) GroupByKey, reduceBykey, join, sortByKey等这些算子会产生shuffle(3)这些算子必须作用在KV格式的RDD
5、RDD会提供一系列最佳计算位置,说白了就是暴露每一个partitior的位置这是数据本地化的基础
image.png

  • RDD:
    java/scala ==> jvm
    python ==> python runtime
  • DataFrame:
    java/scala/python ==> Logic Plan

    DataFrame常见算子操作

    先看一下官方文档
    image.png

image.png
student.json

{"name":"jack","age":19,"sex":"male"}
{"name":"tom","age":18,"sex":"male"}
{"name":"jessic","age":27,"sex":"female"}
{"name":"hehe","age":18,"sex":"female"}
{"name":"haha","age":15,"sex":"male"}

printSchema()

打印schema信息

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")

    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder().appName("DataFrameOpScala").config(conf).getOrCreate()

    //读取json文件,获取DataFrame
    val df = sparkSession.read.json("D:\\test\\student.json")

    //打印schema信息
    df.printSchema()

    sparkSession.stop()
  }

image.png

show()

默认显示所有数据,可以通过参数控制显示多少条

df.show(2)

image.png

select()

查询数据中的指定字段信息

df.select("name", "age").show()

image.png

    //在使用select的时候可以对数据做一些操作,需要添加隐式转换函数,否则语法报错
    import sparkSession.implicits._
    df.select($"name", $"age" + 1).show()

image.png

filter()、where()

对数据进行过滤,需要添加隐式转换函数,否则报错

    import sparkSession.implicits._
    df.filter($"age" > 18).show()

image.png

    //where底层调用的就是filter
    df.where($"age" > 18).show()

image.png
where底层调用的就是filter
image.png

groupBy()和count()

    //对数据进行分组求和
    df.groupBy("age").count().show()

image.png

这些就是针对DataFrame的一些常见的操作。
但是现在这种方式其实用起来还是不方便,只是提供了一些类似于可以操作表的算子,很对一些简单的查询还是可以的,但是针对一些复杂的操作,使用算子写起来就很麻烦了,所以我们希望能够直接支持用sql的方式执行,Spark SQL也是支持的

DataFrame的sql操作

想要实现直接支持sql语句查询DataFrame中的数据
需要两步操作
1、将DataFrame注册为一个临时表df.createOrReplaceTempView()
2、使用sparkSession中的sql函数执行sql语句sparkSession.sql()

object DataFrameSqlScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")

    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder().appName("DataFrameSqlScala").config(conf).getOrCreate()

    //读取json文件,获取DataFrame
    val df = sparkSession.read.json("D:\\test\\student.json")

    //将DataFrame注册为一个临时表
    df.createOrReplaceTempView("student")

    //使用sql查询临时表中的数据
    sparkSession.sql("select age,count(1) as num from student group by age").show()

    sparkSession.stop()
  }
}

image.png

RDD转换为DataFrame

为什么要将RDD转换为DataFrame?
在实际工作中我们可能会先把hdfs上的一些日志数据加载进来,然后进行一些处理,最终变成结构化的数据,希望对这些数据做一些统计分析,当然了我们可以使用spark中提供的transformation算子来实现,只不过会有一些麻烦,毕竟是需要写代码的,如果能够使用sql实现,其实是更加方便的。
所以可以针对我们前面创建的RDD,将它转换为DataFrame,这样就可以使用dataFrame中的一些算子或者直接写sql来操作数据了。
Spark SQL支持这两种方式将RDD转换为DataFrame

  1. 反射方式
  2. 编程方式

    反射方式(rdd.toDF())

    基于反射的方式,代码比较简洁,需要提前知道RDD中数据的格式
    Scala具有隐式转换的特性,所以spark sql的scala接口是支持自动将包含了case class的RDD转换为DataFrame的 ```scala object RddToDataFrameByReflectScala {

    def main(args: Array[String]): Unit = {

    val conf = new SparkConf() conf.setMaster(“local”)

    //创建SparkSession对象,里面包含SparkContext和SqlContext val sparkSession = SparkSession.builder().appName(“RddToDataFrameByReflectScala”).config(conf).getOrCreate() //获取SparkContext val sc = sparkSession.sparkContext

    val rdd = sc.parallelize(Array((“jack”, 18), (“tom”, 20), (“jessic”, 30)))

    //基于反射直接将包含Student对象的dataRDD转换为dataFrame //需要导入隐式转换 import sparkSession.implicits._ val stuDf = rdd.map(tup => Student(tup._1, tup._2)).toDF()

    //1、显示所有数据 stuDf.show()

    stuDf.createOrReplaceTempView(“student”)

    //2、查询age<25岁的,并展示 sparkSession.sql(“select name,age from student where age<25”).show()

    //查询age>18岁的 val resDf = sparkSession.sql(“select name,age from student where age>18”)

    //将DataFrame转化为RDD val resRDD = resDf.rdd

    //3、从row中取数据,封装成student,打印到控制台 resRDD.map(row => Student(row(0).toString, row(1).toString.toInt)) .collect() .foreach(println(_))

    //4、使用row的getAs方法,获取指定列名的值 resRDD.map(row => Student(row.getAsString, row.getAsInt)) .collect() .foreach(println(_))

    sparkSession.stop()

    }

}

//定义一个Student case class Student(name: String, age: Int)

![image.png](https://cdn.nlark.com/yuque/0/2022/png/5363330/1644377232472-b6bbca4f-6cdc-4368-b822-1a1e128e7501.png#clientId=u5ac112ca-b012-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=203&id=u83f295f2&margin=%5Bobject%20Object%5D&name=image.png&originHeight=203&originWidth=154&originalType=binary&ratio=1&rotation=0&showTitle=true&size=8067&status=done&style=stroke&taskId=u26cdbbad-390d-4be4-97bc-b9de1ee7079&title=1&width=154 "1")![image.png](https://cdn.nlark.com/yuque/0/2022/png/5363330/1644377241075-0cc4e0dc-e926-4d0e-840b-29de4aaa2ea8.png#clientId=u5ac112ca-b012-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=179&id=uea2c0d40&margin=%5Bobject%20Object%5D&name=image.png&originHeight=179&originWidth=140&originalType=binary&ratio=1&rotation=0&showTitle=true&size=6382&status=done&style=stroke&taskId=ua246a316-7391-4b7e-9bae-3a92cf0b50b&title=2&width=140 "2")![image.png](https://cdn.nlark.com/yuque/0/2022/png/5363330/1644377256737-4df484d6-fe86-4115-b1e4-9f7004cec538.png#clientId=u5ac112ca-b012-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=69&id=u0154b42d&margin=%5Bobject%20Object%5D&name=image.png&originHeight=69&originWidth=208&originalType=binary&ratio=1&rotation=0&showTitle=true&size=5133&status=done&style=stroke&taskId=u11822666-81c0-4dd9-aa29-f28d654eee1&title=3&width=208 "3")![image.png](https://cdn.nlark.com/yuque/0/2022/png/5363330/1644377266062-0022b832-6a4a-4575-baad-8a7f3281f695.png#clientId=u5ac112ca-b012-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=65&id=ub32c6ee8&margin=%5Bobject%20Object%5D&name=image.png&originHeight=65&originWidth=217&originalType=binary&ratio=1&rotation=0&showTitle=true&size=4437&status=done&style=stroke&taskId=u59162c97-d71a-4e3b-95d3-b7221a29cb0&title=4&width=217 "4")
<a name="tOs2o"></a>
## 编程方式
这种方式是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,就是Schema,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。<br />也就是说当case calss中的字段无法预先定义的时候,就只能用编程方式动态指定元数据了
```scala
object RddToDataFrameByProgramScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")

    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder().appName("RddToDataFrameByProgramScala").config(conf).getOrCreate()
    //获取SparkContext
    val sc = sparkSession.sparkContext

    val rdd = sc.parallelize(Array(("jack", 18), ("tom", 20), ("jessic", 30)))

    //组装rowRDD
    val rowRDD = rdd.map(tup => Row(tup._1, tup._2))

    //指定元数据信息【这个元数据信息就可以动态从外部获取了,比较灵活】
    val schema = StructType(Array(
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)
    ))

    //组装DataFrame
    val stuDf = sparkSession.createDataFrame(rowRDD, schema)

    //下面就可以通过DataFrame的方式操作dataRDD中的数据了
    stuDf.createOrReplaceTempView("student")

    //执行sql查询
    sparkSession.sql("select name,age from student where age > 18").show()
    val resDf = sparkSession.sql("select name,age from student where age > 18")

    //将DataFrame转化为RDD
    val resRDD = resDf.rdd
    resRDD.map(row => (row(0).toString, row(1).toString.toInt))
      .collect()
      .foreach(println(_))

    sparkSession.stop()
  }

}

image.pngimage.png

DataFrame的load和save操作

load

主要用于加载数据,创建出DataFrame;

save

主要用于将DataFrame中的数据保存到文件中。

前面操作json格式的数据的方法

    //读取json文件,获取DataFrame
    val stuDf = sparkSession.read.json("D:\\test\\student.json")

    //查看DataFrame中的数据
    stuDf.show()

查看.json()方法的源码

def json(paths: String*): DataFrame = format("json").load(paths : _*)

如果不指定format,则默认读取的数据源格式是parquet
Spark SQL内置了一些常见的数据源类型,比如json, parquet, jdbc, orc, csv, text

案例

object LoadAndSaveOpScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")

    val sparkSession = SparkSession.builder().appName("LoadAndSaveOpScala").config(conf).getOrCreate()

    //读取json文件,获取DataFrame
    //val df = sparkSession.read.json("D:\\test\\student.json")
    val df = sparkSession.read.format("json").load("D:\\test\\student.json")

    //查看数据
    df.show()

    //保存数据
    df.select("name", "age") //查询指定字段
      .write
      .format("csv")
      .save("hdfs://bigdata1:9000/out-save001")

    sparkSession.stop()
  }

}

console:
image.png
HDFS:
image.png
image.png

SaveMode

Spark SQL对于save操作,提供了不同的save mode。
主要用来处理,当目标位置已经有数据时应该如何处理。save操作不会执行锁操作,并且也不是原子的,因此是有一定风险出现脏数据的。

SaveMode 解释
SaveMode.ErrorIfExists (默认) 如果目标位置已经存在数据,那么抛出一个异常
SaveMode.Append 如果目标位置已经存在数据,那么将数据追加进去
SaveMode.Overwrite 如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖
SaveMode.Ignore 如果目标位置已经存在数据,那么就忽略,不做任何操作
    df.select("name", "age") //查询指定字段
      .write
      .format("csv")
      .mode(SaveMode.Append) //追加
      .save("hdfs://bigdata1:9000/out-save001")

执行之后的结果确实是追加到之前的结果目录中了
image.png

内置函数

Spark中提供了很多内置的函数

种类            函数     
聚合函数            avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct     
集合函数            array_contains, explode, size     
日期/时间函数        datediff, date_add, date_sub, add_months, last_day, next_day, months_between, current_date, current_timestamp, date_format     
数学函数            abs, ceil, floor, round     
混合函数            if, isnull, md5, not, rand, when     
字符串函数        concat, get_json_object, length, reverse, split, upper     
窗口函数            denseRank, rank, rowNumber

其实这里面的函数和hive中的函数是类似的

注意:SparkSQL中的SQL函数文档不全,其实在使用这些函数的时候,大家完全可以去查看hive中sql的文档,使用的时候都是一样的。