一 DataFrame
Spark SQL的DataFrame API允许我们使用DataFrame而不用必须去注册临时表或者生成SQL表达式 DataFrame API 既有transformation 操作也有action 操作
1.创建 DataFrame
在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame
创建DataFrame有三种方式
- 通过 Spark 的数据源进行创建
- 从一个存在的RDD 进行转换
- 还可以从Hive Table 进行查询返回
1.1 从Spark 数据源进行创建
查看 Spark 支持创建文件的数据源格式
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
在spark的bin/input目录中创建one.json文件
{ "name" : "zs" , "age" : 18 }
读取Json文件创建DataFrame
scala> spark.read.json("/Users/yangyangyang/app/spark-3.0.0-bin-hadoop3.2/bin/input/one.json")
res5: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
注意:如果从内存中获取数据,spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换
展示结果
scala> val df =spark.read.json("/Users/yangyangyang/app/spark-3.0.0-bin-hadoop3.2/bin/input/one.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show
+---+----+
|age|name|
+---+----+
| 18| zs|
| 18| ls|
+---+----+
1.2 从RDD进行转换
1.3 从Hive Table进行查询返回
二 SQL
SQL 语法风格是指我们查询数据的时候使用SQL 语句来查询 这种风格的查询必须要有临时视图或者全局视图来辅助
2.1.读取JSON文件创建DataFrame
scala> val df =spark.read.json("/Users/yangyangyang/app/spark-3.0.0-bin-hadoop3.2/bin/input/one.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2.2.对DataFrame创建
临时表:一次会话中有效
scala> df.createTempView("user")
全局表:应用范围内有效
scala> df.createOrReplaceGlobalTempView("user")
注意:普通临时表是 Session 范围内的, 如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people
2.3.通过SQL语句实现查询全表
scala> spark.sql("select * from user").show
+---+----+
|age|name|
+---+----+
| 18| zs|
| 18| ls|
+---+----+
scala> spark.sql("select avg(age) from user").show
+--------+
|avg(age)|
+--------+
| 18.0|
+--------+
三 DSL
可以在Scala, Java 中使用DSL,使用DSL 语法风格不必去创建临时视图了
3.1 创建一个 DataFrame
scala> val df = spark.read.json ("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
3.2 查看DataFrame 的Schema 信息
scala> df.printSchema
root
|-- age: Long (nullable = true)
|-- username: string (nullable = true)
3.3 只查看”username”列数据
scala> df.select ("username").show()
+--------+
|username|
+--------+
|zhangsan|
| lisi|
| wangwu|
+--------+
3.4 查询
3.4.1 查看”username”列数据以及”age+1”数据
注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式: 单引号+字段名
scala> df.select ($"username",$"age" + 1).show
scala> df.select ('username, 'age + 1).show()
scala> df.select ('username, 'age + 1 as "newage").show()
+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan| 21|
| lisi| 31|
| wangwu| 41|
+--------+---------+
3.4.2 查看”age”大于”30”的数据
scala> df.filter($"age">30).show
+---+---------+
|age| username|
+---+---------+
| 40| wangwu|
+---+---------+
3.4.3 按照”age”分组,查看数据条数
scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 20| 1|
| 30| 1|
| 40| 1|
+---+-----+