一、Spark sql概述
1、sparksql是什么
- spark sql是spark处理数据的一个模块
- 专门用来处理结构化数据的模块,
-
2、sparksql操作方式说明
SparkSql shell(类似于Hive sql)
- DartaFrames API(与RDD相似,增加了数据结构scheme部分)
-
3、Hadoop生态圈SQL相关重要名词解释
SQL :面向数据编程的最高级抽象
- HQL = Hive Sql :SQL on Hadoop的实现
- Hive on mr:SQL on mr引擎
- Shark早期发展的sql on spark,已经被废除
- SparkSql:sql on spark 的实现,目前主流
Hive on spark: Hive团队的主推,但spark并没有主推
二、SparkSql shell操作SparkSql
环境进入方式 spark-sql
- 操作方式基本和Hive完全一样
需要注意的一点是spark无法读取hive的内表,因为它无法读取事务表
三、DataFrames API操作SparkSql
创建maven项目 引入依赖
<!-- sparkcore依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.compat.version}</artifactId><version>2.3.2</version><scope>provided</scope></dependency><!-- sparkSQL依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.compat.version}</artifactId><version>2.3.2</version><scope>provided</scope></dependency>
1、创建DataFrames-1.6.x
```scala package com.tledu
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext /**
- @author */ object SparkSqlDF_16 { def main(args: Array[String]): Unit = { var conf = new SparkConf() conf.setMaster(“local”) conf.setAppName(“mhk_test_df_1.6”) var sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) var df = sqlContext.read.json(“C:\Users\16070\Desktop\data.json”) df.printSchema() df.show() df.select(“content”,”commentCount”).show() df.filter(“commentCount>1000”) .select(“commentCount”) .orderBy(“commentCount”) .show() sc.stop() } }
<a name="LE2pN"></a>## 2、创建DataFrames-2.3.x```scalapackage com.tleduimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.SQLContext/*** @author*/object SparkSqlDF_16 {def main(args: Array[String]): Unit = {var conf = new SparkConf()conf.setMaster("local")conf.setAppName("mhk_test_df_1.6")var sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)var df = sqlContext.read.json("C:\\Users\\16070\\Desktop\\data.json")df.printSchema()df.show()df.select("content","commentCount").show()df.filter("commentCount>1000").select("commentCount").orderBy("commentCount").show()sc.stop()}}
3、RDD转化为DF
package com.tleduimport org.apache.spark.sql.{Row, SparkSession}import org.apache.spark.sql.types.{StringType, StructField, StructType}import java.sql.Struct/*** @author*/object RDDToDF {def main(args: Array[String]): Unit = {var sparkSession = SparkSession.builder().appName("test_rdd to df").master("local[*]").getOrCreate()var scheme = StructType("stdno name classId className".split(" ").map(t => StructField(t,StringType,true)))var lineRDD = sparkSession.sparkContext.textFile("C:\\Users\\16070\\Desktop\\student_mysql.txt")var rowRDD = lineRDD.map(_.split("\t")).map(row => Row(row(0),row(1),row(2),row(3)))var df = sparkSession.createDataFrame(rowRDD,scheme)df.show()df.printSchema()sparkSession.stop()}}
4、虚拟表使用
package com.tleduimport org.apache.spark.sql.SparkSession/*** @author*/object SparkSqlDF_23 {def main(args: Array[String]): Unit = {var sparkSession = SparkSession.builder().appName("mhk_test_df_2.3").master("local[*]").getOrCreate()var df = sparkSession.read.json("C:\\Users\\16070\\Desktop\\data.json")//虚拟表使用var testtable = df.createTempView("teble1")sparkSession.sql("select * from teble1")// df.select("content","commentCount").show()sparkSession.stop()}}
5、DF数据持久化
- parquet是默认的输入输出格式
- spark自带的
parquet的优点
- 压缩数据,内部自带gzip压缩
- 不失真
- 减少IO吞吐量
- 高效查询
- 多数据处理平台均支持 ```scala package com.tledu
import org.apache.spark.sql.SparkSession
/**
@author / object SparkSqlDF_23 { def main(args: Array[String]): Unit = { var sparkSession = SparkSession.builder() .appName(“mhk_test_df_2.3”) .master(“local[]”).getOrCreate()
var df = sparkSession.read.json(“C:\Users\16070\Desktop\data.json”)
// //虚拟表使用 var testtable = df.createTempView(“teble1”)
sparkSession.sql("select * from teble1")df.select("content","commentCount").show()
// df.repartition(2).write.format(“parquet”).save(“./data”) //数据持久化 df.write.save(“./data11”) sparkSession.stop()
}
}
<a name="rknLa"></a>## 6.DataSets API 操作Spark SQL<a name="E0x3b"></a>### 1、开发环境与DataFrames完全相同<a name="wy3Fe"></a>### 2、DataSets相关操作- DataSets集合了RDD和Dataframes的优点,被称为强类型的Dataframes- DataSets和DataFrames拥有完全相同的成员函数- 两者中每个行的数据类型不同。DataFrames也可以称为DataSets[Row],可以被看作是DataSets的一种。而DataSets每一行是不固定的,需要通过模式匹配来确定实例:```scalapackage com.tledu.sparkSQLimport org.apache.spark.sqlimport org.apache.spark.sql.SparkSession/*** @author*/case class Student(name:String,age:Long,address:String)object SparkSqlDS_23 {def main(args: Array[String]): Unit = {//DataSets测试//构建SparkSessionval sparkSession = SparkSession.builder().master("local[*]").appName("mhk_test_ds").getOrCreate()import sparkSession.implicits._//从基础数据对象类型创建var DS1 = Seq(1,2,3,4,5,6).toDS()DS1.map(item => item+1)DS1.show()//用样例类创建var DS2 = Seq(Student("张三",21,"石家庄")).toDS()DS2.show()//使用样例类的文件类型导入创建dsval path = "C:\\Users\\16070\\Desktop\\student_data.txt"val DS3 =sparkSession.read.json(path).as[Student]DS3.show()sparkSession.stop()}}
7、数据集抽象类型对比分析
- RDD
- DataFrames
- DataSet
相同点:
- 都是Spark平台下的分布式弹性数据集,为处理大型数据提供便利
- 都有惰性机制,在Transform操作时不会立即执行,而是遇到Action算子才会正式提交作业执行
- 均采用Spark的内存运算和优化策略,内存使用和执行效率上均可以保证。
- 均有partition概念,可以进行分布式并行处理,达到分而治之。
- 均有许多相同的函数,比如sort,map,filter
- 个别操作均需引入一个包:import spark.implicits
- DF和DS均可以通过模式匹配获取内部的变量和值
- DF和DS产生于sparkSql,天然支持sparksql
不同点:
RDD:
- 不支持sparksql操作,需要转换为DF或者DS才行
- 类型时安全的,编译时即可检查出类型错误(强类型)
- 机器间通信,IO均需序列化和反序列化,开销很大
DataFrames:
- 有scheme的RDD:比RDD增加了数据的描述信息
- 比RDD的API更丰富,增加了结构化的API
- 是一个有固定类型的DataSet,可以看作是DataSet[Row]
- 序列化和反序列化时增加了结构化,提升了效率
DataSet:
- 强类型的DataFrames,与DF有完全相同的成员函数
- 每行的数据格式不固定,需要通过case class匹配后获取实际的信息
- 访问对象数据时,更加简单
- 在序列化和反序列化时,引入了Encoder机制,达到按需进行序列化反序列化
