今日内容:
- 0- Spark SQL中数据清洗的相关API(知道即可)
- 1- Spark SQL中shuffle分区设置(会设置)
- 2- Spark SQL数据写出操作(掌握)
- 3- Pandas 的相关内容(整体了解)
-
0. Spark SQL的相关的清洗API
1- 去重API:df.dropDuplicates()
- 说明:当不加参数的时候,默认对数据整体进行去重,同样支持针对指定列进行去重操作
- 2- 删除null值数据:df.dropna()
- 说明:默认支持对所有列进行判断,如果有一列的对应值为null,就会将null这一行数据全部都删除,也支持针对某些列处理
- 3- 替换null值:df.fillna()
- 说明:将表中为null的数据替换为指定的值,同样也可以针对某些列处理
代码演示
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示清洗相关的API")
# 1- 创建SparkSession对象:
spark = SparkSession.builder.master('local[*]').appName('movie').config("spark.sql.shuffle.partitions", "4").getOrCreate()
# 2- 读取HDFS上movie数据集
df_init = spark.read.csv(
path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/stu.csv',
sep=',',
header=True,
inferSchema=True
)
# 演示一: 去重API: df.dropDuplicates()
df = df_init.dropDuplicates()
df = df_init.dropDuplicates(['address','name'])
# 演示二: 2- 删除null值数据: df.dropna()
df = df_init.dropna()
df = df_init.dropna(thresh=3)
df = df_init.dropna(thresh=2,subset=['name','address','age'])
# 演示三: 替换null值: df.fillna()
df = df_init.fillna('aaaa')
df = df_init.fillna('aaa',subset=['name','address'])
df = df.fillna(1,subset=['id','age'])
df = df_init.fillna({'id':0,'name':'未知','age':0,'address':'未知'})
df_init.show()
df.printSchema()
df.show()
1. Spark SQL的shuffle分区设置
Spark SQL在执行的过程中,会将SQL翻译为Spark的RDD程序来运行,对于Spark SQL来说,执行的时候,同样也会触发shuffle操作,默认情况下, Spark SQL的shuffle的分区数量默认为 200个
在实际生产中使用的时候,默认为200个,有时候并不合适,当任务比较大的时候,一般可能需要调整分区数量更多,当任务较小的时候,可能需要调小分区数量
如何来调整Spark SQL的分区数量呢? 参数 : spark.sql.shuffle.partitios
方案一 : 在配置文件中调整 : spark-defaults.conf (全局的方式)
添加配置 : spark.sql.shuffle.partitions 100
方案二 : 在通过 spark-sumbit 提交任务的时候,通过 --conf "spark.sql.shuffle.partitions=100" 主要在生产中
方案三 : 在代码中,在构建SparkSession对象的时候,来设置shuffle的分区数量 主要在测试中
sparkSession.builder.config("spark.sql.shuffle.partitions",'100')
在测试环境中,一般都是右键运行,此时需要设置分区数量,可以通过方案三来处理,但是在后续上线部署的时候,需要通过spark-submit提供,为了能够让参数动态传递,会将代码中参数迁移到spark-submit命令上设置
2. Spark SQL的数据写出操作
统一写出的API:
上述的完整的API 同样也有简单写法:
df.write.mode().输出类型()
比如说:
df.write.mode().csv()
df.write.mode().json()
....
演示:输出到文件
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示数据写出到文件中")
# 1- 创建SparkSession对象:
spark = SparkSession.builder.master('local[*]').appName('movie').config("spark.sql.shuffle.partitions", "1").getOrCreate()
# 2- 读取HDFS上movie数据集
df_init = spark.read.csv(
path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/stu.csv',
sep=',',
header=True,
inferSchema=True
)
# 3- 对数据进行清洗操作:
# 演示一: 去重API: df.dropDuplicates()
df = df_init.dropDuplicates()
# 演示二: 2- 删除null值数据: df.dropna()
df = df.dropna()
# 4- 将清洗后的数据写出到文件中
# 演示写出为CSV文件
#df.write.mode('overwrite').format('csv').option('header',True).option('sep','|').save('hdfs://node1:8020/sparkwrite/output1')
# 演示写出为 JSON
#df.write.mode('overwrite').format('json').save('hdfs://node1:8020/sparkwrite/output2')
# 演示输出为text
#df.select('name').write.mode('overwrite').format('text').save('hdfs://node1:8020/sparkwrite/output3')
# 演示输出为orc
df.write.mode('overwrite').format('orc').save('hdfs://node1:8020/sparkwrite/output4')
将数据写出到HIVE中
df.write.mode('append|overwrite|ignore|error').saveAsTable('表名','存储类型')
# 说明 :目前无法演示输出到HIVE, 因为 HIVE 和 spark 没有整合
将数据输出到MySQL:
df.write.mode('append|overwrite|ignore|error').format('jdbc')\
.option("url","jdbc:mysql://xxx:3306/库名?useSSL=false&useUnicode=true&characterEncoding=utf-8")\
.option("dbtable","表名")\
.option("user","用户名")\
.option("password","密码")\
.save()
说明:
当表不存在的时候, 会自动建表, 对于overwrite来说, 每次都是将表删除, 重建
如果想要自定义字段的类型, 请先创建表, 然后使用append的方式来添加数据即可
1- 在MySQL中创建一个库
CREATE DATABASE day06_pyspark CHARSET utf8;
2- 编写代码 ```python from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession import os
锁定远端python版本:
os.environ[‘SPARK_HOME’] = ‘/export/server/spark’ os.environ[‘PYSPARK_PYTHON’] = ‘/root/anaconda3/bin/python3’ os.environ[‘PYSPARK_DRIVER_PYTHON’] = ‘/root/anaconda3/bin/python3’
if name == ‘main‘: print(“演示数据写出到文件中”)
# 1- 创建SparkSession对象:
spark = SparkSession.builder.master('local[*]').appName('movie').config("spark.sql.shuffle.partitions", "1").getOrCreate()
# 2- 读取HDFS上movie数据集
df_init = spark.read.csv(
path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/stu.csv',
sep=',',
header=True,
inferSchema=True
)
# 3- 对数据进行清洗操作:
# 演示一: 去重API: df.dropDuplicates()
df = df_init.dropDuplicates()
# 演示二: 2- 删除null值数据: df.dropna()
df = df.dropna()
# 4- 将清洗后的数据写出到文件中
# 演示输出MySQL
df.write.mode('overwrite').format('jdbc')\
.option("url", "jdbc:mysql://node1:3306/day06_pyspark?useSSL=false&useUnicode=true&characterEncoding=utf-8") \
.option("dbtable", "stu") \
.option("user", "root") \
.option("password", "123456") \
.save()
可能报出的错误:<br />**图片。。。**
```properties
原因:
当前Spark无法找到一个适合的驱动连接MySQL
解决方案: 添加MySQL的驱动包, 需要在以下几个位置中添加驱动包
1- 在python的环境中添加mysql的驱动包 (在pycharm本地右键运行的时候, 需要加载)
Base的pyspark库的相关的jar包路径: /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
虚拟环境: /root/anaconda3/envs/虚拟环境名称/lib/python3.8/site-packages/pyspark/jars/
2- 需要在 Spark的家目录下jars目录下添加mysql的驱动包 (spark-submit提交到spark集群或者local模式需要使用)
/export/server/spark/jars/
3- 需要在HDFS的/spark/jars目录下添加mysql的驱动包 (spark-submit提交到yarn环境的时候)
建议: 如果是常用的jar包, 建议以上三个位置都添加
3. Spark SQL函数定义
3.1 如何使用窗口函数
回顾窗口函数:
窗口函数格式:
分析函数 over(partition by xxx order by xxx [asc | desc] [rows between 窗口范围 and 窗口范围 ])
分析函数:
第一类函数: row_number() rank() dense_rank() ntile(N)
第二类函数: 与聚合函数组合使用 sum() max() min() avg() count()
第三类函数: lead() lag() frist_value() last_value()
这些窗口函数如何在spark SQL中使用呢?
- 通过SQL的方式 : 与在hive中使用基本是雷同 ```python from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession import os
锁定远端python版本:
os.environ[‘SPARK_HOME’] = ‘/export/server/spark’ os.environ[‘PYSPARK_PYTHON’] = ‘/root/anaconda3/bin/python3’ os.environ[‘PYSPARK_DRIVER_PYTHON’] = ‘/root/anaconda3/bin/python3’
if name == ‘main‘: print(“演示spark SQL的窗口函数使用”)
# 1- 创建SparkSession对象:
spark = SparkSession.builder.master('local[*]').appName('windows').getOrCreate()
# 2- 读取外部数据源
df_init = spark.read.csv(path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/pv.csv',header=True,inferSchema=True)
# 3- 处理数据操作: 演示窗口函数
df_init.createTempView('pv_t1')
# 3.1 SQL方式
spark.sql("""
select
uid,
datestr,
pv,
row_number() over(partition by uid order by pv desc) as rank1,
rank() over(partition by uid order by pv desc) as rank2,
dense_rank() over(partition by uid order by pv desc) as rank3
from pv_t1
""").show()
- DSL方式
```python
# 3.2 DSL方式:
df_init.select(
df_init['uid'],
df_init['datestr'],
df_init['pv'],
F.row_number().over(win.partitionBy('uid').orderBy(F.desc('pv'))).alias('rank1'),
F.rank().over(win.partitionBy('uid').orderBy(F.desc('pv'))).alias('rank2'),
F.dense_rank().over(win.partitionBy('uid').orderBy(F.desc('pv'))).alias('rank3')
).show()
3.2 SQL函数的分类说明
回顾 SQL 函数的分类:
- UDF: 用户自定义函数
- 特点: 一进一出 大部分的内置函数都是属于UDF函数
- 比如 substr()
- UDAF: 用户自定义聚合函数
- 特点: 多进一出
- 比如: sum() count() avg()….
- UDTF: 用户自定义表生成函数
- 特点: 一进多出 (给一个数据, 返回多行或者多列的数据)
- 比如: explode() 爆炸函数
其实不管是spark SQL中 内置函数, 还是hive中内置函数, 其实也都是属于这三类中其中一类
自定义函数目的:
- 扩充函数, 因为在进行业务分析的时候, 有时候会使用某些功能, 但是内置函数并没有提供这样的功能, 此时需要进行自定义函数来解决
目前支持自定义函数:
对于spark SQL 目前支持定义 UDF 和 UDAF , 但是对于python语言 仅支持定义UDF函数, 如果想要定义UDAF函数, 需要使用pandas UDF实现
注意点:
在使用python 原生 spark SQL的 UDF方案, 整个执行效率不是特别高, 因为整个内部运算是一个来处理., 一个个返回, 这样会导致频繁进行 序列化和反序列化的操作 从而影响效率
后续改进版本: 采用java 或者scala来定义, 然后python调用即可
目前主要使用版本: 是采用pandas的UDF函数解决, 同时采用apache arrow 内存数据结构框架 来实现, 提升整个执行效率
3.3 Spark原生自定义UDF函数
如何自定义UDF函数:
1- 第一步 : 根据业务功能要求,定义一个普通的Python的函数:
2- 第二步 : 将这个Python的函数注册到Spark SQL中:
注册方式有以下俩种方案:
【方式一】: 可适用于 SQL 和 DSL
udf对象 = sparkSession.udf.register(参数1,参数2,参数3)
参数1:udf函数的函数名称,此名称用于SQL当中使用
参数2:需要将那个python函数注册为udf函数
参数3:设置python函数返回的类型(spark)
【方式二】: 只能在DSL使用
udf对象 = F.udf(参数1,参数2)
参数1:需要将那个python函数注册为udf函数
参数2:设置python函数返回的类型
udf对象 主要使用在DSL中
方式二还有一种语法糖写法: @F.udf(参数2) 底层走的是装饰器/与方式二不能共用,因为已经注 册过了
(放置在普通的python函数的上面)
3- 在SQL或者DSL使用即可
自定义UDF函数演示操作:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示如何自定义UDF函数")
# 1- 创建SparkSession对象
spark = SparkSession.builder.master('local[*]').appName('udf_01').getOrCreate()
# 2- 读取外部的数据集
df_init = spark.read.csv(
path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/student.csv',
schema='id int,name string,age int'
)
df_init.createTempView('t1')
# 3- 处理数据
# 需求: 自定义函数需求 请在name名称后面 添加一个 _itcast
# 3.1 自定义一个python函数, 完成主题功能
# 方式三: 不能和方式二共用
# @F.udf(returnType=StringType())
def concat_udf(name: str) -> str:
return name + '_itcast'
# 3.2 将函数注册给spark SQL
#方式一:
concat_udf_dsl = spark.udf.register('concat_udf_sql',concat_udf,StringType())
# 方式二:
concat_udf_dsl_2 = F.udf(concat_udf,StringType())
# 3.3 使用自定义函数
# SQL使用
spark.sql("""
select id,concat_udf_sql(name) as name ,age from t1
""").show()
# DSL中使用
df_init.select(
df_init['id'],
concat_udf_dsl(df_init['name']).alias('name'),
df_init['age']
).show()
df_init.select(
df_init['id'],
concat_udf_dsl_2(df_init['name']).alias('name'),
df_init['age']
).show()
df_init.select(
df_init['id'],
concat_udf(df_init['name']).alias('name'),
df_init['age']
).show()
演示返回类型为 字典/列表
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示spark sql的UDF函数: 返回列表/字典")
# 1- 创建SparkSession对象
spark = SparkSession.builder.master('local[*]').appName('udf_01').getOrCreate()
# 2- 读取外部的数据集
df_init = spark.read.csv(
path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/user.csv',
header=True,
inferSchema=True
)
df_init.createTempView('t1')
# 3- 处理数据
# 需求: 自定义函数 请将line字段切割开, 将其转换 姓名 地址 年龄
# 3.1 定义一个普通python的函数
def split_3col_1(line):
return line.split('|')
def split_3col_2(line):
arr = line.split('|')
return {'name':arr[0],'address':arr[1],'age':arr[2]}
# 3.2 注册
# 对于返回列表的注册方式
# 方式一
schema = StructType().add('name',StringType()).add('address',StringType()).add('age',StringType())
split_3col_1_dsl = spark.udf.register('split_3col_1_sql',split_3col_1,schema)
# 对于返回为字典的方式
# 方式二:
schema = StructType().add('name', StringType()).add('address', StringType()).add('age', StringType())
split_3col_2_dsl = F.udf(split_3col_2,schema)
# 3.3 使用自定义函数
spark.sql("""
select
userid,
split_3col_1_sql(line) as 3col,
split_3col_1_sql(line)['name'] as name,
split_3col_1_sql(line)['address'] as address,
split_3col_1_sql(line)['age'] as age
from t1
""").show()
# DSL中
df_init.select(
'userid',
split_3col_2_dsl('line').alias('3col'),
split_3col_2_dsl('line')['name'].alias('name'),
split_3col_2_dsl('line')['address'].alias('address'),
split_3col_2_dsl('line')['age'].alias('age')
).show()
3.4 基于pandas的Spark UDF函数
3.4.1 apache arrow基本介绍
apache arrow 是apache旗下的一款顶级的项目, 是一个跨平台的在内存中以列式存储的数据层, 它设计的目的是作为一个跨平台的数据层, 来加快大数据分析项目的运行效率
pandas与pyspark SQL 进行交互的时候, 建立在apache arrow上, 带来低开销 高性能的UDF函数
arrow 并不会自动使用, 需要对配置以及代码做一定小的更改才可以使用并兼容
如何安装?
pip install pyspark[sql]
说明: 三个节点要求要安装, 如果使用除base虚拟环境以外的环境, 需要先切换到对应虚拟环境下
注意:
如果安装比较慢, 可以添加一下 清华镜像源
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]
不管是否使用我的虚拟机, 都建议跑一次, 看一下是否存在
如何使用呢?
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
3.4.2 如何基于arrow完成pandas DF 与 Spark DF的互转操作
如何将pandas的DF对象转换spark的DF,以及如何从spark df 转换为 pandas 的 df 对象
pandas DF --> Spark DF :
spark_df = sparkSession.createDataFrame(pd_df)
Spark DF ---> pandas DF:
pd_df = spark_df.toPandas()
代码演示:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("pySpark模板")
# 1- 创建SparkSession对象
spark = SparkSession.builder.master('local[*]').appName('udf_01').getOrCreate()
# 开启arrow
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# 2- 创建 pandas的DF对象:
pd_df = pd.DataFrame({'name':['张三','李四','王五'],'age':[20,18,15]})
# 3- 可以使用panda的API来对数据进行处理操作
pd_df = pd_df[pd_df['age'] > 16]
print(pd_df)
# 4- 将其转换为Spark DF
spark_df = spark.createDataFrame(pd_df)
# 5- 可以使用spark的相关API处理数据
spark_df = spark_df.select(F.sum('age').alias('sum_age'))
spark_df.printSchema()
spark_df.show()
# 6- 还可以将spark df 转换为pandas df
pd_df = spark_df.toPandas()
print(pd_df)
请注意:开启arrow方案,必须先安装arrow,否则无法使用,一执行就会报出:no module ‘pyarrow’(没有此模块)
pandas UDF代码, 请各位先检查 当前虚拟机中pyspark库是否为3.1.2
命令:
conda list | grep pyspark
3.4.3 pandas UDF
方式一: series TO series
- 描述: 定义一个python的函数, 接收series类型, 返回series类型, 接收一列返回一列
- 目的: 用于定义 pandas的UDF函数
代码演示:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("基于pandas定义UDF函数")
# 1- 创建SparkSession对象
spark = SparkSession.builder.master('local[*]').appName('udf_01').getOrCreate()
# 2- 构建数据集
df_init = spark.createDataFrame([(1,3),(2,5),(3,8),(5,4),(6,7)],schema='a int,b int')
df_init.createTempView('t1')
# 3- 处理数据:
# 需求: 基于pandas的UDF 完成 对 a 和 b列乘积计算
# 3.1 自定义一个python的函数: 传入series类型, 返回series类型
@F.pandas_udf(returnType=IntegerType()) # 装饰器 用于将pandas的函数转换为 spark SQL的函数
def pd_cj(a:pd.Series,b:pd.Series) -> pd.Series:
return a * b
#3.2 对函数进行注册操作
# 方式一:
pd_cj_dsl = spark.udf.register('pd_cj_sql',pd_cj)
#3.3 使用自定义函数
# SQL
spark.sql("""
select a,b, pd_cj_sql(a,b) as cj from t1
""").show()
# DSL
df_init.select('a','b',pd_cj('a','b').alias('cj')).show()
从series类型 到 标量(python基本数据类型) :
- 描述: 定义一个python函数, 接收series类型的数据, 输出为标量, 用于定义 UDAF函数 ```properties import pandas as pd from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession from pyspark.sql.types import * import pyspark.sql.functions as F import os
锁定远端python版本:
os.environ[‘SPARK_HOME’] = ‘/export/server/spark’ os.environ[‘PYSPARK_PYTHON’] = ‘/root/anaconda3/bin/python3’ os.environ[‘PYSPARK_DRIVER_PYTHON’] = ‘/root/anaconda3/bin/python3’
if name == ‘main‘: print(“基于pandas定义UDF函数”)
# 1- 创建SparkSession对象
spark = SparkSession.builder.master('local[*]').appName('udf_01').getOrCreate()
# 2- 构建数据集
df_init = spark.createDataFrame([(1,3),(1,5),(1,8),(2,4),(2,7)],schema='a int,b int')
df_init.createTempView('t1')
# 3- 处理数据:
# 需求: pandas的UDAF需求 对 B列 求和
# 3.1 创建python的函数: 接收series类型, 输出基本数据类型(标量)
@F.pandas_udf(returnType=IntegerType())
def pd_b_sum(b:pd.Series) -> int:
return b.sum()
# 3.2 注册函数
spark.udf.register('pd_b_sum_sql',pd_b_sum)
# 3.3 使用自定义函数
spark.sql("""
select
pd_b_sum_sql(b) as avg_b
from t1
""").show()
spark.sql("""
select
a,
b,
pd_b_sum_sql(b) over(partition by a order by b) as sum_b
from t1
""").show()
```
3.4.4 pandas UDF函数案例(作业)
需求: 必须自定义panda的UDF函数使用 (无难度 (连一点点都没有))