一 DataFrame

Spark SQL的DataFrame API允许我们使用DataFrame而不用必须去注册临时表或者生成SQL表达式 DataFrame API 既有transformation 操作也有action 操作

1.创建 DataFrame

在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame

创建DataFrame有三种方式

  • 通过 Spark 的数据源进行创建
  • 从一个存在的RDD 进行转换
  • 还可以从Hive Table 进行查询返回

1.1 从Spark 数据源进行创建

查看 Spark 支持创建文件的数据源格式

  1. scala> spark.read.
  2. csv format jdbc json load option options orc parquet schema table text textFile

在spark的bin/input目录中创建one.json文件

  1. { "name" : "zs" , "age" : 18 }

读取Json文件创建DataFrame

  1. scala> spark.read.json("/Users/yangyangyang/app/spark-3.0.0-bin-hadoop3.2/bin/input/one.json")
  2. res5: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

注意:如果从内存中获取数据,spark可以知道数据类型具体是什么。如果是数字,默认作为Int处理;但是从文件中读取的数字,不能确定是什么类型,所以用bigint接收,可以和Long类型转换,但是和Int不能进行转换

展示结果

  1. scala> val df =spark.read.json("/Users/yangyangyang/app/spark-3.0.0-bin-hadoop3.2/bin/input/one.json")
  2. df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  3. scala> df.show
  4. +---+----+
  5. |age|name|
  6. +---+----+
  7. | 18| zs|
  8. | 18| ls|
  9. +---+----+

1.2 从RDD进行转换

1.3 从Hive Table进行查询返回

二 SQL

SQL 语法风格是指我们查询数据的时候使用SQL 语句来查询 这种风格的查询必须要有临时视图或者全局视图来辅助

2.1.读取JSON文件创建DataFrame

  1. scala> val df =spark.read.json("/Users/yangyangyang/app/spark-3.0.0-bin-hadoop3.2/bin/input/one.json")
  2. df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2.2.对DataFrame创建

临时表:一次会话中有效

  1. scala> df.createTempView("user")

全局表:应用范围内有效

  1. scala> df.createOrReplaceGlobalTempView("user")

image.png

注意:普通临时表是 Session 范围内的, 如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people

2.3.通过SQL语句实现查询全表

  1. scala> spark.sql("select * from user").show
  2. +---+----+
  3. |age|name|
  4. +---+----+
  5. | 18| zs|
  6. | 18| ls|
  7. +---+----+
  8. scala> spark.sql("select avg(age) from user").show
  9. +--------+
  10. |avg(age)|
  11. +--------+
  12. | 18.0|
  13. +--------+

三 DSL

可以在Scala, Java 中使用DSL,使用DSL 语法风格不必去创建临时视图了

3.1 创建一个 DataFrame

  1. scala> val df = spark.read.json ("data/user.json")
  2. df: org.apache.spark.sql.DataFrame = [age: bigint name: string]

3.2 查看DataFrame 的Schema 信息

  1. scala> df.printSchema
  2. root
  3. |-- age: Long (nullable = true)
  4. |-- username: string (nullable = true)

3.3 只查看”username”列数据

  1. scala> df.select ("username").show()
  2. +--------+
  3. |username|
  4. +--------+
  5. |zhangsan|
  6. | lisi|
  7. | wangwu|
  8. +--------+

3.4 查询

3.4.1 查看”username”列数据以及”age+1”数据

注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式: 单引号+字段名

  1. scala> df.select ($"username",$"age" + 1).show
  2. scala> df.select ('username, 'age + 1).show()
  1. scala> df.select ('username, 'age + 1 as "newage").show()
  2. +--------+---------+
  3. |username|(age + 1)|
  4. +--------+---------+
  5. |zhangsan| 21|
  6. | lisi| 31|
  7. | wangwu| 41|
  8. +--------+---------+

3.4.2 查看”age”大于”30”的数据

  1. scala> df.filter($"age">30).show
  2. +---+---------+
  3. |age| username|
  4. +---+---------+
  5. | 40| wangwu|
  6. +---+---------+

3.4.3 按照”age”分组,查看数据条数

  1. scala> df.groupBy("age").count.show
  2. +---+-----+
  3. |age|count|
  4. +---+-----+
  5. | 20| 1|
  6. | 30| 1|
  7. | 40| 1|
  8. +---+-----+