typora-copy-images-to: asset
Spark SQL
Spark SQL概述
Spark SQL进化之路
1.0以前: Shark
1.1.x开始:SparkSQL(只是测试性的) SQL
1.3.x: SparkSQL(正式版本)+Dataframe
1.5.x: SparkSQL 钨丝计划(Tungsten) 聚焦于CPU和Memory使用,以达到对分布式硬件潜能的终极压榨!
1.6.x: SparkSQL+DataFrame+DataSet(测试版本)
2.x:
- SparkSQL+DataFrame+DataSet(正式版本)
- SparkSQL:还有其他的优化
- StructuredStreaming(DataSet)
认识Spark SQL
- Spark SQL是Spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。
- 作用:提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎
- 运行原理:将 Spark SQL 转化为 RDD, 然后提交到集群执行
- 特性:
- 集成。即:无缝地将SQL查询与Spark程序混合。
- 统一数据访问。即:加载和查询来自各种来源的数据。包括Hive、Parquet、JSON和JDBC等。
- Hive兼容性。即:在现有仓库上运行未修改的Hive查询。
- 标准连接。即:通过JDBC或ODBC连接。
SparkSession
SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习Spark的各项功能。
- 在Spark的早期版本中,SparkContext是Spark的主要切入点,由于RDD是主要的API,我们通过SparkContext来创建和操作RDD。
- 随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。
- SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。
特点:
- 为用户提供一个统一的切入点使用Spark 各项功能
- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序
- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互
- 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中
DataFrame
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。
DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。
创建DataFrame
# 通过列表生成
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [('zhangsan',18,'男'),('lisi',19,'男'),('wangwu',20,'女'),]
df = spark.createDataFrame(data,['name','age','sex'])
df.collect()
"""
[Row(name='zhangsan', age=18, sex='男'),
Row(name='lisi', age=19, sex='男'),
Row(name='wangwu', age=20, sex='女')]
"""
# 通过字典生成
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = [{"name":"zhangsan","age":18},{"name":"lisi","age":19}]
df = spark.createDataFrame(data)
df.collect()
"""[Row(age=18, name='zhangsan'), Row(age=19, name='lisi')]"""
# 通过文件生成
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv('person.csv',header=True)
df.collect()
"""
person.csv:
name,age
zhangsan,18
lisi,19
wangwu,20
[Row(name='zhangsan', age='18'),
Row(name='lisi', age='19'),
Row(name='wangwu', age='20')]
"""
# 通过pandas DataFrame生成
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = pd.DataFrame({"name":"zhangsan","age":18},{"name":"lisi","age":19})
df = spark.createDataFrame(data)
df.collect()
"""
[Row(name='zhangsan', age=18), Row(name='zhangsan', age=18)]
"""
# 转为pandas中的DataFrame
df.toPandas()
"""
name age
--------------------
0 zhangsan 18
1 zhangsan 18
"""
# 通过RDD生成
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# 生成RDD
data = ["zhangsan,18","lisi,19","wangwu,20"]
rdd = spark.sparkContext.parallelize(data).map(lambda x:x.split(","))
rdd.collect()
"""
[['zhangsan', '18'], ['lisi', '19'], ['wangwu', '20']]
"""
# 用反射来推断包含特定类型对象的RDD的schema
from pyspark.sql.types import Row
def func(x):
rel = {}
rel['name'] = x[0]
rel['age'] = int(x[1])
return rel
df = rdd.map(lambda x:Row(**func(x))).toDF()
df.collect()
"""
[Row(age=18, name='zhangsan'),
Row(age=19, name='lisi'),
Row(age=20, name='wangwu')]
"""
df.schema
"""
StructType(List(StructField(age,LongType,true),StructField(name,StringType,true)))
"""
# 使用编程接口
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import LongType
rdd = rdd.map(lambda x:(x[0],int(x[1])))
schema = StructType([
StructField('name',StringType(),True),
StructField('age',LongType(),True),
])
df = spark.createDataFrame(rdd,schema=schema)
df.collect()
"""
[Row(name='zhangsan', age=18),
Row(name='lisi', age=19),
Row(name='wangwu', age=20)]
"""
# 简化版本
df = spark.createDataFrame(rdd,['name','age'])
df.collect()
"""
[Row(name='zhangsan', age=18),
Row(name='lisi', age=19),
Row(name='wangwu', age=20)]
"""
df.schema
"""
StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))
"""
# 最简版本
df = rdd.toDF(['name','age'])
df.collect()
# DataFrame转为RDD
df.rdd
创建Spark SQL
- Spark SQL 是由DataFrame派生出来的,首先必须先出创建DataFrame,然后通过登录Spark SQL temp table就可以使用Spark SQL语句了。
- 使用Spark SQL最简单,就是直接使用SQL语句,即使是非程序设计人员,只需要懂得SQL语句就可以使用
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = ["zhangsan,18","lisi,19","wangwu,20"]
rdd = spark.sparkContext.parallelize(data).map(lambda x:x.split(","))
rdd = rdd.map(lambda x:(x[0],int(x[1])))
df = rdd.toDF(['name','age'])
# 方式一 SQL 通过createOrReplaceTempView注册为
df.createOrReplaceTempView('person1')
res = spark.sql("select * from person1 where age > 18")
res.collect()
"""
[Row(name='lisi', age=19), Row(name='wangwu', age=20)]
"""
# 方式二 SQL 通过registerTempTable注册
df.registerTempTable('person2')
res = spark.sql("select * from person2 where age > 18")
res.collect()
# 方式三 DSL
df.filter("age > 18").collect()
Spark SQL实验二
实验数据
emp.csv
emp_num,emp_name,role,leader_num,birth_date,salary,bonus,dept_num
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10
实验要求
使用DataFrame相关方法完成实验
实验内容
# 初始化SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
- 读取emp.csv文件,获取DataFrame
emp = spark.read.csv("emp.csv",header=True)
emp.show() # show默认展示前20条数据
- 将salary、bonus列改为整型
import pyspark.sql.functions as F
from pyspark.sql.types import *
方式二 createOrReplaceTempView
emp.createOrReplaceTempView(“emp”)
<br />spark.sql("select * from emp where bonus is not null").show() <br />+-------+--------+--------+----------+---------+------+-----+--------+<br />|emp_num|emp_name| role|leader_num| hiredate|salary|bonus|dept_num|<br />+-------+--------+--------+----------+---------+------+-----+--------+<br />| 7499| ALLEN|SALESMAN| 7698|1981/2/20| 1600| 300| 30|<br />| 7521| WARD|SALESMAN| 7698|1981/2/22| 1250| 500| 30|<br />| 7654| MARTIN|SALESMAN| 7698|1981/9/28| 1250| 1400| 30|<br />| 7844| TURNER|SALESMAN| 7698| 1981/9/8| 1500| 0| 30|<br />+-------+--------+--------+----------+---------+------+-----+--------+
<a name="SQL"></a>
# SQL
4. 新增一列总薪资(total),其值为薪资(salary)和奖金(bonus)的总和
```python
# DataFrame
emp.selectExpr("*","if(bonus is null,salary,salary+bonus) as total").show()
#SQL
spark.sql("select *,if(bonus is null,salary,salary+bonus) as total from emp").show()
#F.udf
@F.udf(returnType=IntegerType())
def get_total(x,y):
if y is None:
return x
else:
return x+y
emp.select("*",get_total(F.col("salary"),F.col("bonus")).alias("total")).show()
emp.select("*",get_total("salary","bonus").alias("total")).show()
- 新增age,其值为该员工年龄(2022-birth_date)
```python
DataFrame
emp.selectExpr(“*”,”2022 - cast(substring(birth_date,0,4) as int) as age”).show()
SQL
spark.sql(“select *,2022 - cast(substring(birth_date,0,4) as int) as age from emp”).show()
F.udf
@F.udf(returnType=IntegerType()) def get_age(x): year = int(x[:4]) return 2022 - year emp.select(“*”,get_age(“birth_date”).alias(“age”)).show()
6. 统计各岗位(role)人数
```python
# DataFrame
emp.groupby("role").count().show()
# SQL
spark.sql("select role,count(1) num from emp group by role").show()
- 计算各部门平均薪资(total),保留两位小数,按平均薪资降序排序
```python
DataFrame
emp.groupby(“dept_num”).agg(F.round(F.mean(“total”),2).alias(“mean_total”)).sort(F.col(“mean_total”).desc()).show()
SQL
spark.sql(“select dept_num,round(avg(total),2) as mean_total from emp group by dept_num order by mean_total desc”)
F.udf
8. 找出各部门总薪资(total)最高的员工
```python
# DataFrame
emp.select("*",F.row_number().over(Window.partitionBy("dept_num").orderBy(F.col("total").desc())).alias("rn")).filter("rn = 1").show()
# SQL
spark.sql("select * from (select *,row_number() over(partition by dept_num order by total desc) as rn from emp) a where a.rn = 1").show()
SparkSQL实验三
实验内容
- 读取book文件,生成RDD
('5173', '動力取向精神醫學--臨床應用與實務', 10.0, '1200元', '心灵工坊', 'https://book.douban.com/subject/6053667/')
('9929', '水彩绘森活', 10.0, '29.8', '人民邮电出版社', 'https://book.douban.com/subject/26115807/')
('10124', '殷周金文集成(修订增补本共8册)(精)', 10.0, '2400.00元', '中华书局', 'https://book.douban.com/subject/2235855/')
('16628', '纸雕游戏大书', 10.0, '99.00元', '重庆出版集团', 'https://book.douban.com/subject/26673804/')
('19103', 'Michelangelo', 10.0, '$200.00 ', 'Taschen', 'https://book.douban.com/subject/2342660/')
('20063', '一支笔的快乐涂鸦2', 10.0, '29.8', '人民邮电出版社', 'https://book.douban.com/subject/26280062/')
('32781', '亲亲宝贝装', 10.0, '28.00元', '江西科学技术出版社', 'https://book.douban.com/subject/20429352/')
('32879', 'Photoshop7解像', 10.0, '68.00元', '海洋出版社', 'https://book.douban.com/subject/1244906/')
('45687', '戚蓼生序本石头记', 10.0, '350.00元', '人民文学出版社', 'https://book.douban.com/subject/6751176/')
('52504', '宇宙兄弟(7)', 10.0, 'JPY580', '講談社', 'https://book.douban.com/subject/5407120/')
- 将RDD转为DataFrame(score列转为浮点型)
+-----+-------------------+-----+--------+---------+--------------------+
| id| name|score| price| pub| url|
+-----+-------------------+-----+--------+---------+--------------------+
| 5173| 動力取向精神醫學--臨床應用與實務| 10.0| 1200元| 心灵工坊|https://book.doub...|
| 9929| 水彩绘森活| 10.0| 29.8| 人民邮电出版社|https://book.doub...|
|10124|殷周金文集成(修订增补本共8册)(精)| 10.0|2400.00元| 中华书局|https://book.doub...|
|16628| 纸雕游戏大书| 10.0| 99.00元| 重庆出版集团|https://book.doub...|
|19103| Michelangelo| 10.0|$200.00 | Taschen|https://book.doub...|
|20063| 一支笔的快乐涂鸦2| 10.0| 29.8| 人民邮电出版社|https://book.doub...|
|32781| 亲亲宝贝装| 10.0| 28.00元|江西科学技术出版社|https://book.doub...|
|32879| Photoshop7解像| 10.0| 68.00元| 海洋出版社|https://book.doub...|
|45687| 戚蓼生序本石头记| 10.0| 350.00元| 人民文学出版社|https://book.doub...|
|52504| 宇宙兄弟(7)| 10.0| JPY580| 講談社|https://book.doub...|
+-----+-------------------+-----+--------+---------+--------------------+
- 将DataFrame注册为表,表名为:tb_book
- 使用SQL查询前10条数据
+-----+-------------------+-----+--------+---------+--------------------+
| id| name|score| price| pub| url|
+-----+-------------------+-----+--------+---------+--------------------+
| 5173| 動力取向精神醫學--臨床應用與實務| 10.0| 1200元| 心灵工坊|https://book.doub...|
| 9929| 水彩绘森活| 10.0| 29.8| 人民邮电出版社|https://book.doub...|
|10124|殷周金文集成(修订增补本共8册)(精)| 10.0|2400.00元| 中华书局|https://book.doub...|
|16628| 纸雕游戏大书| 10.0| 99.00元| 重庆出版集团|https://book.doub...|
|19103| Michelangelo| 10.0|$200.00 | Taschen|https://book.doub...|
|20063| 一支笔的快乐涂鸦2| 10.0| 29.8| 人民邮电出版社|https://book.doub...|
|32781| 亲亲宝贝装| 10.0| 28.00元|江西科学技术出版社|https://book.doub...|
|32879| Photoshop7解像| 10.0| 68.00元| 海洋出版社|https://book.doub...|
|45687| 戚蓼生序本石头记| 10.0| 350.00元| 人民文学出版社|https://book.doub...|
|52504| 宇宙兄弟(7)| 10.0| JPY580| 講談社|https://book.doub...|
+-----+-------------------+-----+--------+---------+--------------------+
- 使用SQL查询书名包含“微积分”的书
+-----+---------------+-----+------+--------------------+--------------------+
| id| name|score| price| pub| url|
+-----+---------------+-----+------+--------------------+--------------------+
| 5522| 微积分和数学分析引论-第1卷| 9.9|79.00元| 世界图书出版公司|https://book.doub...|
| 2219| 微积分学教程(第3卷)| 9.6|53.00元| 高等教育出版社|https://book.doub...|
| 2220| 微积分学教程(第2卷)| 9.4|65.00元| 高等教育出版社|https://book.doub...|
| 2363| 微积分学教程(第一卷)| 9.4|45.00元| 高等教育出版社|https://book.doub...|
| 1153| 微积分五讲| 9.3|14.00元| 科学出版社|https://book.doub...|
| 1427|微积分和数学分析引论(第一卷)| 9.3|48.00元| 科学出版社|https://book.doub...|
| 5526| 实用微积分| 9.3|89.00元| 人民邮电出版社|https://book.doub...|
| 1447| 简明微积分| 9.2|37.90元| 高等教育|https://book.doub...|
| 3587| 托马斯微积分| 9.2| 88| 高等教育出版社|https://book.doub...|
| 1360| 重温微积分| 9.1|39.60元| 高等教育出版社|https://book.doub...|
| 4767| 普林斯顿微积分读本| 9.1|95.00元| 人民邮电出版社|https://book.doub...|
| 1925| 高等微积分| 9.0|68.00元|北京蓝色畅想图书发行有限公司(原高...|https://book.doub...|
| 8293| 微积分(下册)| 9.0|43.80元| 高等教育出版社|https://book.doub...|
| 5523| 微积分(上册)| 8.7|60.00元| 高等教育出版社|https://book.doub...|
| 7305| 微积分之屠龙宝刀| 8.7|28.00元| 湖南科学技术出版社|https://book.doub...|
|15469| 微积分之屠龙宝刀| 8.7|24.80元| 湖南科学技术出版社|https://book.doub...|
| 5525| 微积分之倚天宝剑| 8.6|32.00元| 湖南科学技术出版社|https://book.doub...|
|10812| 微积分之倚天宝剑| 8.6|25.00元| 湖南科学技术出版社|https://book.doub...|
| 1097| 微积分的历程| 8.5|29.00元| 人民邮电出版社|https://book.doub...|
| 3588| 微积分及其应用| 8.5|69.00元| 机械工业出版社|https://book.doub...|
| 1126| 微积分和数学分析引论| 8.0|69.00元| 世界图书出版公司|https://book.doub...|
| 1870| 微积分入门I| 7.9|39.00元| 人民邮电|https://book.doub...|
| 2205| 微积分概念发展史| 7.9|28.00元| 复旦大学出版社|https://book.doub...|
+-----+---------------+-----+------+--------------------+--------------------+
- 使用SQL查询图书的前10行的name和price字段信息
+-------------------+--------+
| name| price|
+-------------------+--------+
| 動力取向精神醫學--臨床應用與實務| 1200元|
| 水彩绘森活| 29.8|
|殷周金文集成(修订增补本共8册)(精)|2400.00元|
| 纸雕游戏大书| 99.00元|
| Michelangelo|$200.00 |
| 一支笔的快乐涂鸦2| 29.8|
| 亲亲宝贝装| 28.00元|
| Photoshop7解像| 68.00元|
| 戚蓼生序本石头记| 350.00元|
| 宇宙兄弟(7)| JPY580|
+-------------------+--------+
- 使用SQL统计书名包含“微积分”的书的数量
+---+
|num|
+---+
| 23|
+---+
- 使用SQL查询评分大于9的图书,只展示前10条
+-----+-------------------+-----+--------+---------+--------------------+
| id| name|score| price| pub| url|
+-----+-------------------+-----+--------+---------+--------------------+
| 5173| 動力取向精神醫學--臨床應用與實務| 10.0| 1200元| 心灵工坊|https://book.doub...|
| 9929| 水彩绘森活| 10.0| 29.8| 人民邮电出版社|https://book.doub...|
|10124|殷周金文集成(修订增补本共8册)(精)| 10.0|2400.00元| 中华书局|https://book.doub...|
|16628| 纸雕游戏大书| 10.0| 99.00元| 重庆出版集团|https://book.doub...|
|19103| Michelangelo| 10.0|$200.00 | Taschen|https://book.doub...|
|20063| 一支笔的快乐涂鸦2| 10.0| 29.8| 人民邮电出版社|https://book.doub...|
|32781| 亲亲宝贝装| 10.0| 28.00元|江西科学技术出版社|https://book.doub...|
|32879| Photoshop7解像| 10.0| 68.00元| 海洋出版社|https://book.doub...|
|45687| 戚蓼生序本石头记| 10.0| 350.00元| 人民文学出版社|https://book.doub...|
|52504| 宇宙兄弟(7)| 10.0| JPY580| 講談社|https://book.doub...|
+-----+-------------------+-----+--------+---------+--------------------+
- 使用SQL计算所有书名包含“微积分”的评分平均值,保留两位小数
+----------+
|mean_score|
+----------+
| 8.91|
+----------+
- 使用SQL把书目按照评分从高到低进行排列,且只展示前20条
+-----+--------------------+-----+--------+------------------+--------------------+
| id| name|score| price| pub| url|
+-----+--------------------+-----+--------+------------------+--------------------+
| 5173| 動力取向精神醫學--臨床應用與實務| 10.0| 1200元| 心灵工坊|https://book.doub...|
| 9929| 水彩绘森活| 10.0| 29.8| 人民邮电出版社|https://book.doub...|
|10124| 殷周金文集成(修订增补本共8册)(精)| 10.0|2400.00元| 中华书局|https://book.doub...|
|16628| 纸雕游戏大书| 10.0| 99.00元| 重庆出版集团|https://book.doub...|
|19103| Michelangelo| 10.0|$200.00 | Taschen|https://book.doub...|
|20063| 一支笔的快乐涂鸦2| 10.0| 29.8| 人民邮电出版社|https://book.doub...|
|32781| 亲亲宝贝装| 10.0| 28.00元| 江西科学技术出版社|https://book.doub...|
|32879| Photoshop7解像| 10.0| 68.00元| 海洋出版社|https://book.doub...|
|45687| 戚蓼生序本石头记| 10.0| 350.00元| 人民文学出版社|https://book.doub...|
|52504| 宇宙兄弟(7)| 10.0| JPY580| 講談社|https://book.doub...|
|52505| 宇宙兄弟(8)| 10.0| JPY580| 講談社|https://book.doub...|
| 573| TCP\IP详解(卷1英文版)| 9.9| 45| 机械工业出版社|https://book.doub...|
| 589|计算机程序设计艺术卷1:基本算法(...| 9.9| 119.00元| 人民邮电出版社|https://book.doub...|
| 5522| 微积分和数学分析引论-第1卷| 9.9| 79.00元| 世界图书出版公司|https://book.doub...|
| 5547|PrinciplesofNeura...| 9.9|$103.41 |McGraw-HillMedical|https://book.doub...|
| 7443| 奈特人体神经解剖彩色图谱| 9.9| 138.00元| 人民卫生出版社|https://book.doub...|
| 8703| 数学、科学和认识论| 9.9| 32.00元| 商务印书馆|https://book.doub...|
| 9924| 零基础学素描| 9.9| 20元| 人民邮电出版社|https://book.doub...|
| 9926| 黑白花意3:300例超写实的花之绘| 9.9| 29.80元| 人民邮电出版社|https://book.doub...|
| 9927| 黑白画意:经典植物手绘教程| 9.9| 29.80元| 人民邮电出版社|https://book.doub...|
+-----+--------------------+-----+--------+------------------+--------------------+
- 使用SQL把图书按照出版社进行分组,统计图书总数大于500的出版社,按图书总数降序排列
group by having order by desc+------------+----+
| pub| num|
+------------+----+
| 人民文学出版社|1437|
| 上海译文出版社|1426|
| 中华书局|1278|
| 东立出版社|1223|
|生活·读书·新知三联书店|1105|
| 北京大学出版社| 948|
| 译林出版社| 934|
| 商务印书馆| 917|
| 上海人民出版社| 829|
| 广西师范大学出版社| 726|
| 中国人民大学出版社| 641|
| 人民邮电出版社| 599|
| 上海古籍出版社| 590|
| 南海出版公司| 575|
| 尖端出版社| 557|
| 中信出版社| 537|
| 机械工业出版社| 519|
| 新星出版社| 511|
+------------+----+