一、Spark sql概述
1、sparksql是什么
- spark sql是spark处理数据的一个模块
- 专门用来处理结构化数据的模块,
-
2、sparksql操作方式说明
SparkSql shell(类似于Hive sql)
- DartaFrames API(与RDD相似,增加了数据结构scheme部分)
-
3、Hadoop生态圈SQL相关重要名词解释
SQL :面向数据编程的最高级抽象
- HQL = Hive Sql :SQL on Hadoop的实现
- Hive on mr:SQL on mr引擎
- Shark早期发展的sql on spark,已经被废除
- SparkSql:sql on spark 的实现,目前主流
Hive on spark: Hive团队的主推,但spark并没有主推
二、SparkSql shell操作SparkSql
环境进入方式 spark-sql
- 操作方式基本和Hive完全一样
需要注意的一点是spark无法读取hive的内表,因为它无法读取事务表
三、DataFrames API操作SparkSql
创建maven项目 引入依赖
<!-- sparkcore依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
<!-- sparkSQL依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
1、创建DataFrames-1.6.x
```scala package com.tledu
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext /**
- @author */ object SparkSqlDF_16 { def main(args: Array[String]): Unit = { var conf = new SparkConf() conf.setMaster(“local”) conf.setAppName(“mhk_test_df_1.6”) var sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) var df = sqlContext.read.json(“C:\Users\16070\Desktop\data.json”) df.printSchema() df.show() df.select(“content”,”commentCount”).show() df.filter(“commentCount>1000”) .select(“commentCount”) .orderBy(“commentCount”) .show() sc.stop() } }
<a name="LE2pN"></a>
## 2、创建DataFrames-2.3.x
```scala
package com.tledu
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
/**
* @author
*/
object SparkSqlDF_16 {
def main(args: Array[String]): Unit = {
var conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("mhk_test_df_1.6")
var sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var df = sqlContext.read.json("C:\\Users\\16070\\Desktop\\data.json")
df.printSchema()
df.show()
df.select("content","commentCount").show()
df.filter("commentCount>1000")
.select("commentCount")
.orderBy("commentCount")
.show()
sc.stop()
}
}
3、RDD转化为DF
package com.tledu
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import java.sql.Struct
/**
* @author
*/
object RDDToDF {
def main(args: Array[String]): Unit = {
var sparkSession = SparkSession.builder()
.appName("test_rdd to df")
.master("local[*]")
.getOrCreate()
var scheme = StructType(
"stdno name classId className".split(" ").map(t => StructField(t,StringType,true))
)
var lineRDD = sparkSession.sparkContext.textFile("C:\\Users\\16070\\Desktop\\student_mysql.txt")
var rowRDD = lineRDD.map(_.split("\t")).map(row => Row(row(0),row(1),row(2),row(3)))
var df = sparkSession.createDataFrame(rowRDD,scheme)
df.show()
df.printSchema()
sparkSession.stop()
}
}
4、虚拟表使用
package com.tledu
import org.apache.spark.sql.SparkSession
/**
* @author
*/
object SparkSqlDF_23 {
def main(args: Array[String]): Unit = {
var sparkSession = SparkSession.builder()
.appName("mhk_test_df_2.3")
.master("local[*]").getOrCreate()
var df = sparkSession.read.json("C:\\Users\\16070\\Desktop\\data.json")
//虚拟表使用
var testtable = df.createTempView("teble1")
sparkSession.sql("select * from teble1")
// df.select("content","commentCount").show()
sparkSession.stop()
}
}
5、DF数据持久化
- parquet是默认的输入输出格式
- spark自带的
parquet的优点
- 压缩数据,内部自带gzip压缩
- 不失真
- 减少IO吞吐量
- 高效查询
- 多数据处理平台均支持 ```scala package com.tledu
import org.apache.spark.sql.SparkSession
/**
@author / object SparkSqlDF_23 { def main(args: Array[String]): Unit = { var sparkSession = SparkSession.builder() .appName(“mhk_test_df_2.3”) .master(“local[]”).getOrCreate()
var df = sparkSession.read.json(“C:\Users\16070\Desktop\data.json”)
// //虚拟表使用 var testtable = df.createTempView(“teble1”)
sparkSession.sql("select * from teble1")
df.select("content","commentCount").show()
// df.repartition(2).write.format(“parquet”).save(“./data”) //数据持久化 df.write.save(“./data11”) sparkSession.stop()
}
}
<a name="rknLa"></a>
## 6.DataSets API 操作Spark SQL
<a name="E0x3b"></a>
### 1、开发环境
与DataFrames完全相同
<a name="wy3Fe"></a>
### 2、DataSets相关操作
- DataSets集合了RDD和Dataframes的优点,被称为强类型的Dataframes
- DataSets和DataFrames拥有完全相同的成员函数
- 两者中每个行的数据类型不同。DataFrames也可以称为DataSets[Row],可以被看作是DataSets的一种。而DataSets每一行是不固定的,需要通过模式匹配来确定
实例:
```scala
package com.tledu.sparkSQL
import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
/**
* @author
*/
case class Student(name:String,age:Long,address:String)
object SparkSqlDS_23 {
def main(args: Array[String]): Unit = {
//DataSets测试
//构建SparkSession
val sparkSession = SparkSession.builder().master("local[*]").appName("mhk_test_ds").getOrCreate()
import sparkSession.implicits._
//从基础数据对象类型创建
var DS1 = Seq(1,2,3,4,5,6).toDS()
DS1.map(item => item+1)
DS1.show()
//用样例类创建
var DS2 = Seq(Student("张三",21,"石家庄")).toDS()
DS2.show()
//使用样例类的文件类型导入创建ds
val path = "C:\\Users\\16070\\Desktop\\student_data.txt"
val DS3 =sparkSession.read.json(path).as[Student]
DS3.show()
sparkSession.stop()
}
}
7、数据集抽象类型对比分析
- RDD
- DataFrames
- DataSet
相同点:
- 都是Spark平台下的分布式弹性数据集,为处理大型数据提供便利
- 都有惰性机制,在Transform操作时不会立即执行,而是遇到Action算子才会正式提交作业执行
- 均采用Spark的内存运算和优化策略,内存使用和执行效率上均可以保证。
- 均有partition概念,可以进行分布式并行处理,达到分而治之。
- 均有许多相同的函数,比如sort,map,filter
- 个别操作均需引入一个包:import spark.implicits
- DF和DS均可以通过模式匹配获取内部的变量和值
- DF和DS产生于sparkSql,天然支持sparksql
不同点:
RDD:
- 不支持sparksql操作,需要转换为DF或者DS才行
- 类型时安全的,编译时即可检查出类型错误(强类型)
- 机器间通信,IO均需序列化和反序列化,开销很大
DataFrames:
- 有scheme的RDD:比RDD增加了数据的描述信息
- 比RDD的API更丰富,增加了结构化的API
- 是一个有固定类型的DataSet,可以看作是DataSet[Row]
- 序列化和反序列化时增加了结构化,提升了效率
DataSet:
- 强类型的DataFrames,与DF有完全相同的成员函数
- 每行的数据格式不固定,需要通过case class匹配后获取实际的信息
- 访问对象数据时,更加简单
- 在序列化和反序列化时,引入了Encoder机制,达到按需进行序列化反序列化