一、Spark sql概述

1、sparksql是什么

  • spark sql是spark处理数据的一个模块
  • 专门用来处理结构化数据的模块,
  • 与基础的RDD不同,spark sql会提供更多的处理方案

    2、sparksql操作方式说明

  • SparkSql shell(类似于Hive sql)

  • DartaFrames API(与RDD相似,增加了数据结构scheme部分)
  • DaraSets API (集成了RDD和DF的优点)

    3、Hadoop生态圈SQL相关重要名词解释

  • SQL :面向数据编程的最高级抽象

  • HQL = Hive Sql :SQL on Hadoop的实现
  • Hive on mr:SQL on mr引擎
  • Shark早期发展的sql on spark,已经被废除
  • SparkSql:sql on spark 的实现,目前主流
  • Hive on spark: Hive团队的主推,但spark并没有主推

    二、SparkSql shell操作SparkSql

  • 环境进入方式 spark-sql

  • 操作方式基本和Hive完全一样
  • 需要注意的一点是spark无法读取hive的内表,因为它无法读取事务表

    三、DataFrames API操作SparkSql

    创建maven项目 引入依赖

    1. <!-- sparkcore依赖 -->
    2. <dependency>
    3. <groupId>org.apache.spark</groupId>
    4. <artifactId>spark-core_${scala.compat.version}</artifactId>
    5. <version>2.3.2</version>
    6. <scope>provided</scope>
    7. </dependency>
    8. <!-- sparkSQL依赖 -->
    9. <dependency>
    10. <groupId>org.apache.spark</groupId>
    11. <artifactId>spark-sql_${scala.compat.version}</artifactId>
    12. <version>2.3.2</version>
    13. <scope>provided</scope>
    14. </dependency>

    1、创建DataFrames-1.6.x

    ```scala package com.tledu

import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext /**

  • @author */ object SparkSqlDF_16 { def main(args: Array[String]): Unit = { var conf = new SparkConf() conf.setMaster(“local”) conf.setAppName(“mhk_test_df_1.6”) var sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) var df = sqlContext.read.json(“C:\Users\16070\Desktop\data.json”) df.printSchema() df.show() df.select(“content”,”commentCount”).show() df.filter(“commentCount>1000”) .select(“commentCount”) .orderBy(“commentCount”) .show() sc.stop() } }
  1. <a name="LE2pN"></a>
  2. ## 2、创建DataFrames-2.3.x
  3. ```scala
  4. package com.tledu
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. import org.apache.spark.sql.SQLContext
  7. /**
  8. * @author
  9. */
  10. object SparkSqlDF_16 {
  11. def main(args: Array[String]): Unit = {
  12. var conf = new SparkConf()
  13. conf.setMaster("local")
  14. conf.setAppName("mhk_test_df_1.6")
  15. var sc = new SparkContext(conf)
  16. val sqlContext = new SQLContext(sc)
  17. var df = sqlContext.read.json("C:\\Users\\16070\\Desktop\\data.json")
  18. df.printSchema()
  19. df.show()
  20. df.select("content","commentCount").show()
  21. df.filter("commentCount>1000")
  22. .select("commentCount")
  23. .orderBy("commentCount")
  24. .show()
  25. sc.stop()
  26. }
  27. }

3、RDD转化为DF

  1. package com.tledu
  2. import org.apache.spark.sql.{Row, SparkSession}
  3. import org.apache.spark.sql.types.{StringType, StructField, StructType}
  4. import java.sql.Struct
  5. /**
  6. * @author
  7. */
  8. object RDDToDF {
  9. def main(args: Array[String]): Unit = {
  10. var sparkSession = SparkSession.builder()
  11. .appName("test_rdd to df")
  12. .master("local[*]")
  13. .getOrCreate()
  14. var scheme = StructType(
  15. "stdno name classId className".split(" ").map(t => StructField(t,StringType,true))
  16. )
  17. var lineRDD = sparkSession.sparkContext.textFile("C:\\Users\\16070\\Desktop\\student_mysql.txt")
  18. var rowRDD = lineRDD.map(_.split("\t")).map(row => Row(row(0),row(1),row(2),row(3)))
  19. var df = sparkSession.createDataFrame(rowRDD,scheme)
  20. df.show()
  21. df.printSchema()
  22. sparkSession.stop()
  23. }
  24. }

4、虚拟表使用

  1. package com.tledu
  2. import org.apache.spark.sql.SparkSession
  3. /**
  4. * @author
  5. */
  6. object SparkSqlDF_23 {
  7. def main(args: Array[String]): Unit = {
  8. var sparkSession = SparkSession.builder()
  9. .appName("mhk_test_df_2.3")
  10. .master("local[*]").getOrCreate()
  11. var df = sparkSession.read.json("C:\\Users\\16070\\Desktop\\data.json")
  12. //虚拟表使用
  13. var testtable = df.createTempView("teble1")
  14. sparkSession.sql("select * from teble1")
  15. // df.select("content","commentCount").show()
  16. sparkSession.stop()
  17. }
  18. }

5、DF数据持久化

  • parquet是默认的输入输出格式
  • spark自带的

parquet的优点

  • 压缩数据,内部自带gzip压缩
  • 不失真
  • 减少IO吞吐量
  • 高效查询
  • 多数据处理平台均支持 ```scala package com.tledu

import org.apache.spark.sql.SparkSession

/**

  • @author / object SparkSqlDF_23 { def main(args: Array[String]): Unit = { var sparkSession = SparkSession.builder() .appName(“mhk_test_df_2.3”) .master(“local[]”).getOrCreate()

    var df = sparkSession.read.json(“C:\Users\16070\Desktop\data.json”)

// //虚拟表使用 var testtable = df.createTempView(“teble1”)

  1. sparkSession.sql("select * from teble1")
  2. df.select("content","commentCount").show()

// df.repartition(2).write.format(“parquet”).save(“./data”) //数据持久化 df.write.save(“./data11”) sparkSession.stop()

}

}

  1. <a name="rknLa"></a>
  2. ## 6.DataSets API 操作Spark SQL
  3. <a name="E0x3b"></a>
  4. ### 1、开发环境
  5. 与DataFrames完全相同
  6. <a name="wy3Fe"></a>
  7. ### 2、DataSets相关操作
  8. - DataSets集合了RDD和Dataframes的优点,被称为强类型的Dataframes
  9. - DataSets和DataFrames拥有完全相同的成员函数
  10. - 两者中每个行的数据类型不同。DataFrames也可以称为DataSets[Row],可以被看作是DataSets的一种。而DataSets每一行是不固定的,需要通过模式匹配来确定
  11. 实例:
  12. ```scala
  13. package com.tledu.sparkSQL
  14. import org.apache.spark.sql
  15. import org.apache.spark.sql.SparkSession
  16. /**
  17. * @author
  18. */
  19. case class Student(name:String,age:Long,address:String)
  20. object SparkSqlDS_23 {
  21. def main(args: Array[String]): Unit = {
  22. //DataSets测试
  23. //构建SparkSession
  24. val sparkSession = SparkSession.builder().master("local[*]").appName("mhk_test_ds").getOrCreate()
  25. import sparkSession.implicits._
  26. //从基础数据对象类型创建
  27. var DS1 = Seq(1,2,3,4,5,6).toDS()
  28. DS1.map(item => item+1)
  29. DS1.show()
  30. //用样例类创建
  31. var DS2 = Seq(Student("张三",21,"石家庄")).toDS()
  32. DS2.show()
  33. //使用样例类的文件类型导入创建ds
  34. val path = "C:\\Users\\16070\\Desktop\\student_data.txt"
  35. val DS3 =sparkSession.read.json(path).as[Student]
  36. DS3.show()
  37. sparkSession.stop()
  38. }
  39. }

7、数据集抽象类型对比分析

  • RDD
  • DataFrames
  • DataSet

相同点:

  • 都是Spark平台下的分布式弹性数据集,为处理大型数据提供便利
  • 都有惰性机制,在Transform操作时不会立即执行,而是遇到Action算子才会正式提交作业执行
  • 均采用Spark的内存运算和优化策略,内存使用和执行效率上均可以保证。
  • 均有partition概念,可以进行分布式并行处理,达到分而治之。
  • 均有许多相同的函数,比如sort,map,filter
  • 个别操作均需引入一个包:import spark.implicits
  • DF和DS均可以通过模式匹配获取内部的变量和值
  • DF和DS产生于sparkSql,天然支持sparksql

不同点:
RDD:

  • 不支持sparksql操作,需要转换为DF或者DS才行
  • 类型时安全的,编译时即可检查出类型错误(强类型)
  • 机器间通信,IO均需序列化和反序列化,开销很大

DataFrames:

  • 有scheme的RDD:比RDD增加了数据的描述信息
  • 比RDD的API更丰富,增加了结构化的API
  • 是一个有固定类型的DataSet,可以看作是DataSet[Row]
  • 序列化和反序列化时增加了结构化,提升了效率

DataSet:

  • 强类型的DataFrames,与DF有完全相同的成员函数
  • 每行的数据格式不固定,需要通过case class匹配后获取实际的信息
  • 访问对象数据时,更加简单
  • 在序列化和反序列化时,引入了Encoder机制,达到按需进行序列化反序列化