第一章.SparkSQL概述
1.什么是SparkSQL
SparkSQL是Spark用于结构化数据(Structured Data) 处理的Spark模块
![$07[SparkSQL(概述_编程)] - 图1](/uploads/projects/liuye-6lcqc@gws1uf/c521c538ce96f666e42508e9b6baa5a0.png)
2.为什么要有SparkSQL
![$07[SparkSQL(概述_编程)] - 图2](/uploads/projects/liuye-6lcqc@gws1uf/69d19d552ccadfec7ee2703c82604693.png)
3.SparkSQL原理
![$07[SparkSQL(概述_编程)] - 图3](/uploads/projects/liuye-6lcqc@gws1uf/c9d43cd6b554ef9e63df819179f1ef7e.png)
一.什么是DataFrame
1. DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格2. DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集每一列都带有名称和类型3. SparkSQL性能上比RDD要高,因为SparkSQL了解数据内部结构,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标,反观RDD,由于无从得知所存数据元素的具体内部结构,SparkCore只能在Stage层面进行简单,通用的流水线优化
![$07[SparkSQL(概述_编程)] - 图4](/uploads/projects/liuye-6lcqc@gws1uf/0c7d448630d8018b8f026aa6e9fdd48d.png)
执行时间对比:
![$07[SparkSQL(概述_编程)] - 图5](/uploads/projects/liuye-6lcqc@gws1uf/ce975e89e1d48a6191851b8e74df8ca7.png)
二.什么是DataSet
- DataSet是分布式数据集合
- DataSet是强类型的,比如可以有DataSet[Car],DataSet[User],具有类型安全检查
- DataFrame是DataSet的特例, type DataFrame = DataSet[ROW],ROW是一个类型.跟Car,User这些的类型一样,所有的表结构信息都用ROW来表示
三.RDD,DataFrame和DataSet之间的关系
- 发展历史
- RDD(Spark1.0) => DataFrame(Spark1.3) => DataSet(Spark1.6)- 如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果,不同的是他们的执行效率和执行方式,在后期的Spark版本中,DataSet有可能会逐步取代RDD和DataFrame成为唯一的API接口
- 三者的共性
- RDD,DataFrame,DataSet全都是Spark平台下的分布式数据集,为处理超大数据提供便利
- 三者都有惰性机制,在进行创建,转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会遍历计算
- 三者有许多共同的函数,如filter,排序等
- 三者都会根据Spark的内存情况自动缓存计算
- 三者都有分区的概念
四.SparkSQL的优点
- 易整合: 无缝的整合了SQL查询和Spark编程
![$07[SparkSQL(概述_编程)] - 图6](/uploads/projects/liuye-6lcqc@gws1uf/0f73ae2824f11cf071a035f2ae1519ee.png)
- 统一的数据访问方式:使用相同的方式连接不同的数据源
![$07[SparkSQL(概述_编程)] - 图7](/uploads/projects/liuye-6lcqc@gws1uf/8ca2533a4c7f64cc446ee4665600a068.png)
- 兼容Hive:在已有的仓库上直接运行SQL或者HQL
![$07[SparkSQL(概述_编程)] - 图8](/uploads/projects/liuye-6lcqc@gws1uf/b2f59ee3444bc1c7a4dd5c704e82379f.png)
- 标准的数据连接:通过JDBC或者ODBC来连接
![$07[SparkSQL(概述_编程)] - 图9](/uploads/projects/liuye-6lcqc@gws1uf/734c7425ae9c84e05d8057b354d3c144.png)
第二章.SparkSQL编程
1.SparkSession创建
- SparkSession介绍
- 在老的版本中,SparkSQL提供两种SQL查询起始点- 1.一个叫SparkContext,用于Spark自己提供的SQL查询- 2.一个叫HiveContext,用于连接Hive的查询- SparkSession是Spark最新的SQL查询起始点,实质上是SparkContext和HiveContext的组合,所以SparkContext和HiveContext上可用的API在SparkSession上同样是可以使用的- SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的,当我们使用spark-shell的时候,spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext
- 添加SparkSQL依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.0.0</version></dependency>
- 编写代码
package com.atguigu.spark.day07object $01_SparkSession {/*** SparkSession的创建* 1.自己创建内部类对象创建SparkSession* val spark = new sql.SparkSession.Builder().master("local[4]").appName("test").getOrCreate()* 2.通过静态方法创建内部类对象创建SparkSession* val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()*/def main(args: Array[String]): Unit = {//val spark = new sql.SparkSession.Builder().master("local[4]").appName("test").getOrCreate()import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()}}
2.DateFrame创建
package com.atguigu.spark.day07import org.junit.Testimport org.apache.spark.sql.Rowimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}/*** DateFrame的创建方式* 1:通过toDF方法创建* 2.通过读取文件创建* 3.通过createDateFrame API创建*/case class Person(id:Int,name:String,age:Int,address:String)class $02_DateFrameCreate {import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()/*** 1.通过toDF方法创建[toDF是隐式转换方法]* 集合.toDF* RDD.toDF* 无参toDF使用默认的列名[如果集合/RDD中元素类型是样例类,会使用属性名作为默认列名,如果元素类型是元组,会使用_1这种作为默认列名]* 工作中如果集合/RDD中元素类型是样例类,一般使用无参toDF方法,如果元素类型是元组,一般使用有参toDF方法重定义默认列名*/@Testdef createDateFrameByToDF():Unit={val list = List(Person(1,"lilei1",21,"shenzhen"),Person(2,"lilei2",22,"beijing"),Person(3,"lilei3",23,"tianjin"),Person(4,"lilei4",24,"shanghai"),)import spark.implicits._val df = list.toDF()//val df1 = list.toDF("x","y","z","a")df.show()df.printSchema()val rdd = spark.sparkContext.parallelize(List((1,"lilei1",21,"宝安区"),(2,"lilei2",22,"龙华区"),(3,"lilei3",23,"光明区"),(4,"lilei4",24,"福田区")))val df2 = rdd.toDF("id", "name", "age", "address")df2.show()}/*** 读取文件创建*/@Testdef createDateFrameReadFile():Unit={spark.read.csv("datas/presidential_polls.csv").show()}/*** createDateFrame方法创建*/@Testdef createDateFrameByApi():Unit={val list = List(Person(1,"lilei1",21,"shenzhen"),Person(2,"lilei2",22,"beijing"),Person(3,"lilei3",23,"tianjin"),Person(4,"lilei4",24,"shanghai"),)spark.createDataFrame(list).show()val rdd = spark.sparkContext.parallelize(List(Row(1,"zhangsan1",21),Row(2,"zhangsan2",22),Row(3,"zhangsan3",23)))val fields = Array[StructField](StructField("id1",IntegerType),StructField("name1",StringType),StructField("age1",IntegerType))val schema = StructType(fields)val df = spark.createDataFrame(rdd,schema)df.show()}}
3.DateSet创建
package com.atguigu.spark.day07import org.junit.Test/*** DataSet的创建* 1.toDS方法创建* 2.读取文件创建* 3.通过createDateSet API创建**/class $03_DataSetCreate {import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()/*** toDS方法创建DataSet* 1.集合.toDS* 2.RDD.toDS* toDS只有无参的方法,不能重定义列名*/@Testdef createDateSetByToDS():Unit={import spark.implicits._val list = List(Person(1,"lisi1",21,"shenzhen"),Person(2,"lisi2",22,"beijing"),Person(3,"lisi3",23,"tianj"),Person(4,"lisi4",24,"shanghai"))val ds = list.toDS()ds.printSchema()ds.show()val list2 = List((1,"lisi1",21,"shenzhen"),(2,"lisi2",22,"beijing"),(3,"lisi3",23,"tianj"),(4,"lisi4",24,"shanghai"))val rdd1 = spark.sparkContext.parallelize(list2)val ds2 = rdd1.toDS()ds2.show()}/*** 读取文件创建*/@Testdef createDateSetByFile():Unit={import spark.implicits._val ds = spark.read.textFile("datas/wc.txt")ds.show()val ds2 = ds.flatMap(_.split(" "))ds2.show}/*** createDateSet 方法创建*/@Testdef createDateSetByApi()={import spark.implicits._val list2 = List((1,"lisi1",21,"shenzhen"),(2,"lisi2",22,"beijing"),(3,"lisi3",23,"tianj"),(4,"lisi4",24,"shanghai"))val ds = spark.createDataset(list2)ds.show()}}
4.DSL编程方式(命令式)
package com.atguigu.spark.day07import org.junit.Testclass $04_SparkSql {import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()import spark.implicits._/*** spark的编程有两种方式:* 1.命令式:select,where等算子操作数据* 2.声明式:使用SQL语句操作数据*//*** 1.命令式:select,where等算子操作数据*/@Testdef dsl():Unit={val list = List(Person(1,"lisi1",21,"shenzhen"),Person(2,"lisi2",22,"beijing"),Person(2,"lisi2",22,"beijing"),Person(2,"lisi2",30,"beijing"),Person(3,"lisi3",23,"tianj"),Person(4,"lisi4",24,"shanghai"),Person(6,"lisi4",35,"shenzhen"),Person(7,"lisi4",29,"hangzhou"),Person(8,"lisi4",30,"guangzhou"))val df = list.toDF//年龄大于25岁df.where("age>25").select("name").show()//常用的命令式//列裁剪/*** select 与 selectExpr的区别* 1.select默认只能写列名,不能使用函数,不能起别名,但是可以通过隐式转换来使用函数,和起别名* 2.selectExpr可以使用sql函数,可以给列起别名*/import org.apache.spark.sql.functions._df.select(sum($"age")).show()df.selectExpr("sum(age) sum_age").show()//去重/*** distinct:两行数据所有列相同才能去重* dropDuplicates:两行数据当指定列相同的时候就会去重*/df.distinct().show()df.dropDuplicates("id").show()//过滤df.where("age>25").show()df.filter("age>25").show()}}
5.SQL编程方式(声明式)
/*** 声明式:sql语句* createOrReplaceTempView:创建临时表,如果表存在则替换[临时表只能在当前sparksession中使用]* createOrReplaceGlobalTempView:创建全局表,如果表存在则替换[全局表可以在多个sparksession中使用]* createOrReplaceGlobalTempView创建的表,在使用的时候必须加上global_temp前缀*/@Testdef sqlMode():Unit={val list = List(Person(1,"lisi1",21,"shenzhen"),Person(2,"lisi2",22,"beijing"),Person(2,"lisi2",22,"beijing"),Person(2,"lisi2",30,"beijing"),Person(3,"lisi3",23,"tianj"),Person(4,"lisi4",24,"shanghai"),Person(6,"lisi4",35,"shenzhen"),Person(7,"lisi4",29,"hangzhou"),Person(8,"lisi4",30,"guangzhou"))val df = list.toDF()//将数据集注册成表df.createOrReplaceTempView("person")spark.sql("""|select * from person where age>20|""".stripMargin).show()//临时表只能在当前sparksession中使用val spark2 = spark.newSession()/*spark2.sql("""|select * from person where age>20|""".stripMargin).show()*/df.createOrReplaceGlobalTempView("student")spark.sql("""|select * from global_temp.student where age>20|""".stripMargin).show()spark2.sql("""|select * from global_temp.student where age>20|""".stripMargin).show()}
6.wordcount案例
package com.atguigu.spark.day07//使用spark编程的方式统计单词个数object $05_WordCount {def main(args: Array[String]): Unit = {import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()val ds = spark.read.textFile("datas/wc.txt")//切割句子得到单词import spark.implicits._val ds2 = ds.flatMap(_.split(" "))ds2.createOrReplaceTempView("wc")spark.sql("""|select value,count(1) num|from wc|group by value|""".stripMargin).show()}}
