第一章.SparkSQL概述

1.什么是SparkSQL

SparkSQL是Spark用于结构化数据(Structured Data) 处理的Spark模块

$07[SparkSQL(概述_编程)] - 图1

2.为什么要有SparkSQL

$07[SparkSQL(概述_编程)] - 图2

3.SparkSQL原理

$07[SparkSQL(概述_编程)] - 图3

一.什么是DataFrame

  1. 1. DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格
  2. 2. DataFrameRDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集每一列都带有名称和类型
  3. 3. SparkSQL性能上比RDD要高,因为SparkSQL了解数据内部结构,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标,反观RDD,由于无从得知所存数据元素的具体内部结构,SparkCore只能在Stage层面进行简单,通用的流水线优化

$07[SparkSQL(概述_编程)] - 图4

执行时间对比:

$07[SparkSQL(概述_编程)] - 图5

二.什么是DataSet

  • DataSet是分布式数据集合
  • DataSet是强类型的,比如可以有DataSet[Car],DataSet[User],具有类型安全检查
  • DataFrame是DataSet的特例, type DataFrame = DataSet[ROW],ROW是一个类型.跟Car,User这些的类型一样,所有的表结构信息都用ROW来表示

三.RDD,DataFrame和DataSet之间的关系

  1. 发展历史
  1. - RDD(Spark1.0) => DataFrame(Spark1.3) => DataSet(Spark1.6)
  2. - 如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果,不同的是他们的执行效率和执行方式,在后期的Spark版本中,DataSet有可能会逐步取代RDDDataFrame成为唯一的API接口
  1. 三者的共性
  • RDD,DataFrame,DataSet全都是Spark平台下的分布式数据集,为处理超大数据提供便利
  • 三者都有惰性机制,在进行创建,转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会遍历计算
  • 三者有许多共同的函数,如filter,排序等
  • 三者都会根据Spark的内存情况自动缓存计算
  • 三者都有分区的概念

四.SparkSQL的优点

  1. 易整合: 无缝的整合了SQL查询和Spark编程

$07[SparkSQL(概述_编程)] - 图6

  1. 统一的数据访问方式:使用相同的方式连接不同的数据源

$07[SparkSQL(概述_编程)] - 图7

  1. 兼容Hive:在已有的仓库上直接运行SQL或者HQL

$07[SparkSQL(概述_编程)] - 图8

  1. 标准的数据连接:通过JDBC或者ODBC来连接

$07[SparkSQL(概述_编程)] - 图9

第二章.SparkSQL编程

1.SparkSession创建

  1. SparkSession介绍
  1. - 在老的版本中,SparkSQL提供两种SQL查询起始点
  2. - 1.一个叫SparkContext,用于Spark自己提供的SQL查询
  3. - 2.一个叫HiveContext,用于连接Hive的查询
  4. - SparkSessionSpark最新的SQL查询起始点,实质上是SparkContextHiveContext的组合,所以SparkContextHiveContext上可用的APISparkSession上同样是可以使用的
  5. - SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的,当我们使用spark-shell的时候,spark框架会自动的创建一个名称叫做SparkSparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext
  1. 添加SparkSQL依赖
  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-sql_2.12</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>
  1. 编写代码
  1. package com.atguigu.spark.day07
  2. object $01_SparkSession {
  3. /**
  4. * SparkSession的创建
  5. * 1.自己创建内部类对象创建SparkSession
  6. * val spark = new sql.SparkSession.Builder().master("local[4]").appName("test").getOrCreate()
  7. * 2.通过静态方法创建内部类对象创建SparkSession
  8. * val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  9. */
  10. def main(args: Array[String]): Unit = {
  11. //val spark = new sql.SparkSession.Builder().master("local[4]").appName("test").getOrCreate()
  12. import org.apache.spark.sql.SparkSession
  13. val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  14. }
  15. }

2.DateFrame创建

  1. package com.atguigu.spark.day07
  2. import org.junit.Test
  3. import org.apache.spark.sql.Row
  4. import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  5. /**
  6. * DateFrame的创建方式
  7. * 1:通过toDF方法创建
  8. * 2.通过读取文件创建
  9. * 3.通过createDateFrame API创建
  10. */
  11. case class Person(id:Int,name:String,age:Int,address:String)
  12. class $02_DateFrameCreate {
  13. import org.apache.spark.sql.SparkSession
  14. val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  15. /**
  16. * 1.通过toDF方法创建[toDF是隐式转换方法]
  17. * 集合.toDF
  18. * RDD.toDF
  19. * 无参toDF使用默认的列名[如果集合/RDD中元素类型是样例类,会使用属性名作为默认列名,如果元素类型是元组,会使用_1这种作为默认列名]
  20. * 工作中如果集合/RDD中元素类型是样例类,一般使用无参toDF方法,如果元素类型是元组,一般使用有参toDF方法重定义默认列名
  21. */
  22. @Test
  23. def createDateFrameByToDF():Unit={
  24. val list = List(
  25. Person(1,"lilei1",21,"shenzhen"),
  26. Person(2,"lilei2",22,"beijing"),
  27. Person(3,"lilei3",23,"tianjin"),
  28. Person(4,"lilei4",24,"shanghai"),
  29. )
  30. import spark.implicits._
  31. val df = list.toDF()
  32. //val df1 = list.toDF("x","y","z","a")
  33. df.show()
  34. df.printSchema()
  35. val rdd = spark.sparkContext.parallelize(List(
  36. (1,"lilei1",21,"宝安区"),
  37. (2,"lilei2",22,"龙华区"),
  38. (3,"lilei3",23,"光明区"),
  39. (4,"lilei4",24,"福田区")
  40. ))
  41. val df2 = rdd.toDF("id", "name", "age", "address")
  42. df2.show()
  43. }
  44. /**
  45. * 读取文件创建
  46. */
  47. @Test
  48. def createDateFrameReadFile():Unit={
  49. spark.read.csv("datas/presidential_polls.csv").show()
  50. }
  51. /**
  52. * createDateFrame方法创建
  53. */
  54. @Test
  55. def createDateFrameByApi():Unit={
  56. val list = List(
  57. Person(1,"lilei1",21,"shenzhen"),
  58. Person(2,"lilei2",22,"beijing"),
  59. Person(3,"lilei3",23,"tianjin"),
  60. Person(4,"lilei4",24,"shanghai"),
  61. )
  62. spark.createDataFrame(list).show()
  63. val rdd = spark.sparkContext.parallelize(List(
  64. Row(1,"zhangsan1",21),
  65. Row(2,"zhangsan2",22),
  66. Row(3,"zhangsan3",23)
  67. ))
  68. val fields = Array[StructField](
  69. StructField("id1",IntegerType),
  70. StructField("name1",StringType),
  71. StructField("age1",IntegerType)
  72. )
  73. val schema = StructType(fields)
  74. val df = spark.createDataFrame(rdd,schema)
  75. df.show()
  76. }
  77. }

3.DateSet创建

  1. package com.atguigu.spark.day07
  2. import org.junit.Test
  3. /**
  4. * DataSet的创建
  5. * 1.toDS方法创建
  6. * 2.读取文件创建
  7. * 3.通过createDateSet API创建
  8. *
  9. */
  10. class $03_DataSetCreate {
  11. import org.apache.spark.sql.SparkSession
  12. val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  13. /**
  14. * toDS方法创建DataSet
  15. * 1.集合.toDS
  16. * 2.RDD.toDS
  17. * toDS只有无参的方法,不能重定义列名
  18. */
  19. @Test
  20. def createDateSetByToDS():Unit={
  21. import spark.implicits._
  22. val list = List(
  23. Person(1,"lisi1",21,"shenzhen"),
  24. Person(2,"lisi2",22,"beijing"),
  25. Person(3,"lisi3",23,"tianj"),
  26. Person(4,"lisi4",24,"shanghai")
  27. )
  28. val ds = list.toDS()
  29. ds.printSchema()
  30. ds.show()
  31. val list2 = List(
  32. (1,"lisi1",21,"shenzhen"),
  33. (2,"lisi2",22,"beijing"),
  34. (3,"lisi3",23,"tianj"),
  35. (4,"lisi4",24,"shanghai")
  36. )
  37. val rdd1 = spark.sparkContext.parallelize(list2)
  38. val ds2 = rdd1.toDS()
  39. ds2.show()
  40. }
  41. /**
  42. * 读取文件创建
  43. */
  44. @Test
  45. def createDateSetByFile():Unit={
  46. import spark.implicits._
  47. val ds = spark.read.textFile("datas/wc.txt")
  48. ds.show()
  49. val ds2 = ds.flatMap(_.split(" "))
  50. ds2.show
  51. }
  52. /**
  53. * createDateSet 方法创建
  54. */
  55. @Test
  56. def createDateSetByApi()={
  57. import spark.implicits._
  58. val list2 = List(
  59. (1,"lisi1",21,"shenzhen"),
  60. (2,"lisi2",22,"beijing"),
  61. (3,"lisi3",23,"tianj"),
  62. (4,"lisi4",24,"shanghai")
  63. )
  64. val ds = spark.createDataset(list2)
  65. ds.show()
  66. }
  67. }

4.DSL编程方式(命令式)

  1. package com.atguigu.spark.day07
  2. import org.junit.Test
  3. class $04_SparkSql {
  4. import org.apache.spark.sql.SparkSession
  5. val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  6. import spark.implicits._
  7. /**
  8. * spark的编程有两种方式:
  9. * 1.命令式:select,where等算子操作数据
  10. * 2.声明式:使用SQL语句操作数据
  11. */
  12. /**
  13. * 1.命令式:select,where等算子操作数据
  14. */
  15. @Test
  16. def dsl():Unit={
  17. val list = List(
  18. Person(1,"lisi1",21,"shenzhen"),
  19. Person(2,"lisi2",22,"beijing"),
  20. Person(2,"lisi2",22,"beijing"),
  21. Person(2,"lisi2",30,"beijing"),
  22. Person(3,"lisi3",23,"tianj"),
  23. Person(4,"lisi4",24,"shanghai"),
  24. Person(6,"lisi4",35,"shenzhen"),
  25. Person(7,"lisi4",29,"hangzhou"),
  26. Person(8,"lisi4",30,"guangzhou")
  27. )
  28. val df = list.toDF
  29. //年龄大于25岁
  30. df.where("age>25").select("name").show()
  31. //常用的命令式
  32. //列裁剪
  33. /**
  34. * select 与 selectExpr的区别
  35. * 1.select默认只能写列名,不能使用函数,不能起别名,但是可以通过隐式转换来使用函数,和起别名
  36. * 2.selectExpr可以使用sql函数,可以给列起别名
  37. */
  38. import org.apache.spark.sql.functions._
  39. df.select(sum($"age")).show()
  40. df.selectExpr("sum(age) sum_age").show()
  41. //去重
  42. /**
  43. * distinct:两行数据所有列相同才能去重
  44. * dropDuplicates:两行数据当指定列相同的时候就会去重
  45. */
  46. df.distinct().show()
  47. df.dropDuplicates("id").show()
  48. //过滤
  49. df.where("age>25").show()
  50. df.filter("age>25").show()
  51. }
  52. }

5.SQL编程方式(声明式)

  1. /**
  2. * 声明式:sql语句
  3. * createOrReplaceTempView:创建临时表,如果表存在则替换[临时表只能在当前sparksession中使用]
  4. * createOrReplaceGlobalTempView:创建全局表,如果表存在则替换[全局表可以在多个sparksession中使用]
  5. * createOrReplaceGlobalTempView创建的表,在使用的时候必须加上global_temp前缀
  6. */
  7. @Test
  8. def sqlMode():Unit={
  9. val list = List(
  10. Person(1,"lisi1",21,"shenzhen"),
  11. Person(2,"lisi2",22,"beijing"),
  12. Person(2,"lisi2",22,"beijing"),
  13. Person(2,"lisi2",30,"beijing"),
  14. Person(3,"lisi3",23,"tianj"),
  15. Person(4,"lisi4",24,"shanghai"),
  16. Person(6,"lisi4",35,"shenzhen"),
  17. Person(7,"lisi4",29,"hangzhou"),
  18. Person(8,"lisi4",30,"guangzhou")
  19. )
  20. val df = list.toDF()
  21. //将数据集注册成表
  22. df.createOrReplaceTempView("person")
  23. spark.sql(
  24. """
  25. |select * from person where age>20
  26. |""".stripMargin
  27. ).show()
  28. //临时表只能在当前sparksession中使用
  29. val spark2 = spark.newSession()
  30. /*spark2.sql(
  31. """
  32. |select * from person where age>20
  33. |""".stripMargin
  34. ).show()*/
  35. df.createOrReplaceGlobalTempView("student")
  36. spark.sql(
  37. """
  38. |select * from global_temp.student where age>20
  39. |""".stripMargin
  40. ).show()
  41. spark2.sql(
  42. """
  43. |select * from global_temp.student where age>20
  44. |""".stripMargin
  45. ).show()
  46. }

6.wordcount案例

  1. package com.atguigu.spark.day07
  2. //使用spark编程的方式统计单词个数
  3. object $05_WordCount {
  4. def main(args: Array[String]): Unit = {
  5. import org.apache.spark.sql.SparkSession
  6. val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  7. val ds = spark.read.textFile("datas/wc.txt")
  8. //切割句子得到单词
  9. import spark.implicits._
  10. val ds2 = ds.flatMap(_.split(" "))
  11. ds2.createOrReplaceTempView("wc")
  12. spark.sql(
  13. """
  14. |select value,count(1) num
  15. |from wc
  16. |group by value
  17. |""".stripMargin
  18. ).show()
  19. }
  20. }