今日内容:

  • 0- Spark SQL中数据清洗的相关API(知道即可)
  • 1- Spark SQL中shuffle分区设置(会设置)
  • 2- Spark SQL数据写出操作(掌握)
  • 3- Pandas 的相关内容(整体了解)
  • 4- Spark SQL的函数定义(掌握)

    0. Spark SQL的相关的清洗API

  • 1- 去重API:df.dropDuplicates()

    • 说明:当不加参数的时候,默认对数据整体进行去重,同样支持针对指定列进行去重操作

image-20220601093936710.png

  • 2- 删除null值数据:df.dropna()
    • 说明:默认支持对所有列进行判断,如果有一列的对应值为null,就会将null这一行数据全部都删除,也支持针对某些列处理

image-20220601094108963.png

  • 3- 替换null值:df.fillna()
    • 说明:将表中为null的数据替换为指定的值,同样也可以针对某些列处理

image-20220601094235734.png
代码演示

  1. from pyspark import SparkContext, SparkConf
  2. from pyspark.sql import SparkSession
  3. import os
  4. # 锁定远端python版本:
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. if __name__ == '__main__':
  9. print("演示清洗相关的API")
  10. # 1- 创建SparkSession对象:
  11. spark = SparkSession.builder.master('local[*]').appName('movie').config("spark.sql.shuffle.partitions", "4").getOrCreate()
  12. # 2- 读取HDFS上movie数据集
  13. df_init = spark.read.csv(
  14. path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/stu.csv',
  15. sep=',',
  16. header=True,
  17. inferSchema=True
  18. )
  19. # 演示一: 去重API: df.dropDuplicates()
  20. df = df_init.dropDuplicates()
  21. df = df_init.dropDuplicates(['address','name'])
  22. # 演示二: 2- 删除null值数据: df.dropna()
  23. df = df_init.dropna()
  24. df = df_init.dropna(thresh=3)
  25. df = df_init.dropna(thresh=2,subset=['name','address','age'])
  26. # 演示三: 替换null值: df.fillna()
  27. df = df_init.fillna('aaaa')
  28. df = df_init.fillna('aaa',subset=['name','address'])
  29. df = df.fillna(1,subset=['id','age'])
  30. df = df_init.fillna({'id':0,'name':'未知','age':0,'address':'未知'})
  31. df_init.show()
  32. df.printSchema()
  33. df.show()

1. Spark SQL的shuffle分区设置

Spark SQL在执行的过程中,会将SQL翻译为Spark的RDD程序来运行,对于Spark SQL来说,执行的时候,同样也会触发shuffle操作,默认情况下, Spark SQL的shuffle的分区数量默认为 200个
在实际生产中使用的时候,默认为200个,有时候并不合适,当任务比较大的时候,一般可能需要调整分区数量更多,当任务较小的时候,可能需要调小分区数量

  1. 如何来调整Spark SQL的分区数量呢? 参数 : spark.sql.shuffle.partitios
  2. 方案一 : 在配置文件中调整 : spark-defaults.conf (全局的方式)
  3. 添加配置 : spark.sql.shuffle.partitions 100
  4. 方案二 : 在通过 spark-sumbit 提交任务的时候,通过 --conf "spark.sql.shuffle.partitions=100" 主要在生产中
  5. 方案三 : 在代码中,在构建SparkSession对象的时候,来设置shuffle的分区数量 主要在测试中
  6. sparkSession.builder.config("spark.sql.shuffle.partitions",'100')
  7. 在测试环境中,一般都是右键运行,此时需要设置分区数量,可以通过方案三来处理,但是在后续上线部署的时候,需要通过spark-submit提供,为了能够让参数动态传递,会将代码中参数迁移到spark-submit命令上设置

2. Spark SQL的数据写出操作

统一写出的API:
image.png

  1. 上述的完整的API 同样也有简单写法:
  2. df.write.mode().输出类型()
  3. 比如说:
  4. df.write.mode().csv()
  5. df.write.mode().json()
  6. ....

演示:输出到文件

  1. from pyspark import SparkContext, SparkConf
  2. from pyspark.sql import SparkSession
  3. import os
  4. # 锁定远端python版本:
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. if __name__ == '__main__':
  9. print("演示数据写出到文件中")
  10. # 1- 创建SparkSession对象:
  11. spark = SparkSession.builder.master('local[*]').appName('movie').config("spark.sql.shuffle.partitions", "1").getOrCreate()
  12. # 2- 读取HDFS上movie数据集
  13. df_init = spark.read.csv(
  14. path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/stu.csv',
  15. sep=',',
  16. header=True,
  17. inferSchema=True
  18. )
  19. # 3- 对数据进行清洗操作:
  20. # 演示一: 去重API: df.dropDuplicates()
  21. df = df_init.dropDuplicates()
  22. # 演示二: 2- 删除null值数据: df.dropna()
  23. df = df.dropna()
  24. # 4- 将清洗后的数据写出到文件中
  25. # 演示写出为CSV文件
  26. #df.write.mode('overwrite').format('csv').option('header',True).option('sep','|').save('hdfs://node1:8020/sparkwrite/output1')
  27. # 演示写出为 JSON
  28. #df.write.mode('overwrite').format('json').save('hdfs://node1:8020/sparkwrite/output2')
  29. # 演示输出为text
  30. #df.select('name').write.mode('overwrite').format('text').save('hdfs://node1:8020/sparkwrite/output3')
  31. # 演示输出为orc
  32. df.write.mode('overwrite').format('orc').save('hdfs://node1:8020/sparkwrite/output4')

将数据写出到HIVE中

  1. df.write.mode('append|overwrite|ignore|error').saveAsTable('表名','存储类型')
  2. # 说明 :目前无法演示输出到HIVE, 因为 HIVE 和 spark 没有整合

将数据输出到MySQL:

  1. df.write.mode('append|overwrite|ignore|error').format('jdbc')\
  2. .option("url","jdbc:mysql://xxx:3306/库名?useSSL=false&useUnicode=true&characterEncoding=utf-8")\
  3. .option("dbtable","表名")\
  4. .option("user","用户名")\
  5. .option("password","密码")\
  6. .save()
  7. 说明:
  8. 当表不存在的时候, 会自动建表, 对于overwrite来说, 每次都是将表删除, 重建
  9. 如果想要自定义字段的类型, 请先创建表, 然后使用append的方式来添加数据即可
  • 1- 在MySQL中创建一个库

    1. CREATE DATABASE day06_pyspark CHARSET utf8;
  • 2- 编写代码 ```python from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession import os

锁定远端python版本:

os.environ[‘SPARK_HOME’] = ‘/export/server/spark’ os.environ[‘PYSPARK_PYTHON’] = ‘/root/anaconda3/bin/python3’ os.environ[‘PYSPARK_DRIVER_PYTHON’] = ‘/root/anaconda3/bin/python3’

if name == ‘main‘: print(“演示数据写出到文件中”)

  1. # 1- 创建SparkSession对象:
  2. spark = SparkSession.builder.master('local[*]').appName('movie').config("spark.sql.shuffle.partitions", "1").getOrCreate()
  3. # 2- 读取HDFS上movie数据集
  4. df_init = spark.read.csv(
  5. path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/stu.csv',
  6. sep=',',
  7. header=True,
  8. inferSchema=True
  9. )
  10. # 3- 对数据进行清洗操作:
  11. # 演示一: 去重API: df.dropDuplicates()
  12. df = df_init.dropDuplicates()
  13. # 演示二: 2- 删除null值数据: df.dropna()
  14. df = df.dropna()
  15. # 4- 将清洗后的数据写出到文件中
  16. # 演示输出MySQL
  17. df.write.mode('overwrite').format('jdbc')\
  18. .option("url", "jdbc:mysql://node1:3306/day06_pyspark?useSSL=false&useUnicode=true&characterEncoding=utf-8") \
  19. .option("dbtable", "stu") \
  20. .option("user", "root") \
  21. .option("password", "123456") \
  22. .save()
  1. 可能报出的错误:<br />**图片。。。**
  2. ```properties
  3. 原因:
  4. 当前Spark无法找到一个适合的驱动连接MySQL
  5. 解决方案: 添加MySQL的驱动包, 需要在以下几个位置中添加驱动包
  6. 1- 在python的环境中添加mysql的驱动包 (在pycharm本地右键运行的时候, 需要加载)
  7. Base的pyspark库的相关的jar包路径: /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
  8. 虚拟环境: /root/anaconda3/envs/虚拟环境名称/lib/python3.8/site-packages/pyspark/jars/
  9. 2- 需要在 Spark的家目录下jars目录下添加mysql的驱动包 (spark-submit提交到spark集群或者local模式需要使用)
  10. /export/server/spark/jars/
  11. 3- 需要在HDFS的/spark/jars目录下添加mysql的驱动包 (spark-submit提交到yarn环境的时候)
  12. 建议: 如果是常用的jar包, 建议以上三个位置都添加

3. Spark SQL函数定义

3.1 如何使用窗口函数

回顾窗口函数:

  1. 窗口函数格式:
  2. 分析函数 over(partition by xxx order by xxx [asc | desc] [rows between 窗口范围 and 窗口范围 ])
  3. 分析函数:
  4. 第一类函数: row_number() rank() dense_rank() ntile(N)
  5. 第二类函数: 与聚合函数组合使用 sum() max() min() avg() count()
  6. 第三类函数: lead() lag() frist_value() last_value()

这些窗口函数如何在spark SQL中使用呢?

  • 通过SQL的方式 : 与在hive中使用基本是雷同 ```python from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession import os

锁定远端python版本:

os.environ[‘SPARK_HOME’] = ‘/export/server/spark’ os.environ[‘PYSPARK_PYTHON’] = ‘/root/anaconda3/bin/python3’ os.environ[‘PYSPARK_DRIVER_PYTHON’] = ‘/root/anaconda3/bin/python3’

if name == ‘main‘: print(“演示spark SQL的窗口函数使用”)

  1. # 1- 创建SparkSession对象:
  2. spark = SparkSession.builder.master('local[*]').appName('windows').getOrCreate()
  3. # 2- 读取外部数据源
  4. df_init = spark.read.csv(path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/pv.csv',header=True,inferSchema=True)
  5. # 3- 处理数据操作: 演示窗口函数
  6. df_init.createTempView('pv_t1')
  7. # 3.1 SQL方式
  8. spark.sql("""
  9. select
  10. uid,
  11. datestr,
  12. pv,
  13. row_number() over(partition by uid order by pv desc) as rank1,
  14. rank() over(partition by uid order by pv desc) as rank2,
  15. dense_rank() over(partition by uid order by pv desc) as rank3
  16. from pv_t1
  17. """).show()
  1. - DSL方式
  2. ```python
  3. # 3.2 DSL方式:
  4. df_init.select(
  5. df_init['uid'],
  6. df_init['datestr'],
  7. df_init['pv'],
  8. F.row_number().over(win.partitionBy('uid').orderBy(F.desc('pv'))).alias('rank1'),
  9. F.rank().over(win.partitionBy('uid').orderBy(F.desc('pv'))).alias('rank2'),
  10. F.dense_rank().over(win.partitionBy('uid').orderBy(F.desc('pv'))).alias('rank3')
  11. ).show()

3.2 SQL函数的分类说明

回顾 SQL 函数的分类:

  • UDF: 用户自定义函数
    • 特点: 一进一出 大部分的内置函数都是属于UDF函数
    • 比如 substr()
  • UDAF: 用户自定义聚合函数
    • 特点: 多进一出
    • 比如: sum() count() avg()….
  • UDTF: 用户自定义表生成函数
    • 特点: 一进多出 (给一个数据, 返回多行或者多列的数据)
    • 比如: explode() 爆炸函数

其实不管是spark SQL中 内置函数, 还是hive中内置函数, 其实也都是属于这三类中其中一类

自定义函数目的:

  • 扩充函数, 因为在进行业务分析的时候, 有时候会使用某些功能, 但是内置函数并没有提供这样的功能, 此时需要进行自定义函数来解决

目前支持自定义函数:

  1. 对于spark SQL 目前支持定义 UDF UDAF , 但是对于python语言 仅支持定义UDF函数, 如果想要定义UDAF函数, 需要使用pandas UDF实现

image-20211115093809302.png

注意点:

  1. 在使用python 原生 spark SQL UDF方案, 整个执行效率不是特别高, 因为整个内部运算是一个来处理., 一个个返回, 这样会导致频繁进行 序列化和反序列化的操作 从而影响效率
  2. 后续改进版本: 采用java 或者scala来定义, 然后python调用即可
  3. 目前主要使用版本: 是采用pandasUDF函数解决, 同时采用apache arrow 内存数据结构框架 来实现, 提升整个执行效率

3.3 Spark原生自定义UDF函数

如何自定义UDF函数:

  1. 1- 第一步 : 根据业务功能要求,定义一个普通的Python的函数:
  2. 2- 第二步 : 将这个Python的函数注册到Spark SQL中:
  3. 注册方式有以下俩种方案:
  4. 【方式一】: 可适用于 SQL DSL
  5. udf对象 = sparkSession.udf.register(参数1,参数2,参数3)
  6. 参数1udf函数的函数名称,此名称用于SQL当中使用
  7. 参数2:需要将那个python函数注册为udf函数
  8. 参数3:设置python函数返回的类型(spark
  9. 【方式二】: 只能在DSL使用
  10. udf对象 = F.udf(参数1,参数2
  11. 参数1:需要将那个python函数注册为udf函数
  12. 参数2:设置python函数返回的类型
  13. udf对象 主要使用在DSL
  14. 方式二还有一种语法糖写法: @F.udf(参数2 底层走的是装饰器/与方式二不能共用,因为已经注 册过了
  15. (放置在普通的python函数的上面)
  16. 3- SQL或者DSL使用即可

自定义UDF函数演示操作:

  1. from pyspark import SparkContext, SparkConf
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql.types import StringType
  4. import pyspark.sql.functions as F
  5. import os
  6. # 锁定远端python版本:
  7. os.environ['SPARK_HOME'] = '/export/server/spark'
  8. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  9. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  10. if __name__ == '__main__':
  11. print("演示如何自定义UDF函数")
  12. # 1- 创建SparkSession对象
  13. spark = SparkSession.builder.master('local[*]').appName('udf_01').getOrCreate()
  14. # 2- 读取外部的数据集
  15. df_init = spark.read.csv(
  16. path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/student.csv',
  17. schema='id int,name string,age int'
  18. )
  19. df_init.createTempView('t1')
  20. # 3- 处理数据
  21. # 需求: 自定义函数需求 请在name名称后面 添加一个 _itcast
  22. # 3.1 自定义一个python函数, 完成主题功能
  23. # 方式三: 不能和方式二共用
  24. # @F.udf(returnType=StringType())
  25. def concat_udf(name: str) -> str:
  26. return name + '_itcast'
  27. # 3.2 将函数注册给spark SQL
  28. #方式一:
  29. concat_udf_dsl = spark.udf.register('concat_udf_sql',concat_udf,StringType())
  30. # 方式二:
  31. concat_udf_dsl_2 = F.udf(concat_udf,StringType())
  32. # 3.3 使用自定义函数
  33. # SQL使用
  34. spark.sql("""
  35. select id,concat_udf_sql(name) as name ,age from t1
  36. """).show()
  37. # DSL中使用
  38. df_init.select(
  39. df_init['id'],
  40. concat_udf_dsl(df_init['name']).alias('name'),
  41. df_init['age']
  42. ).show()
  43. df_init.select(
  44. df_init['id'],
  45. concat_udf_dsl_2(df_init['name']).alias('name'),
  46. df_init['age']
  47. ).show()
  48. df_init.select(
  49. df_init['id'],
  50. concat_udf(df_init['name']).alias('name'),
  51. df_init['age']
  52. ).show()

演示返回类型为 字典/列表

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import  *
import pyspark.sql.functions as F
import os

# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("演示spark sql的UDF函数: 返回列表/字典")
    # 1- 创建SparkSession对象
    spark = SparkSession.builder.master('local[*]').appName('udf_01').getOrCreate()

    # 2- 读取外部的数据集
    df_init = spark.read.csv(
        path='file:///export/data/workspace/sz30_pyspark_parent/_03_pyspark_sql/data/user.csv',
        header=True,
        inferSchema=True
    )
    df_init.createTempView('t1')

    # 3- 处理数据
    # 需求: 自定义函数 请将line字段切割开, 将其转换 姓名 地址 年龄
    # 3.1 定义一个普通python的函数
    def split_3col_1(line):
        return line.split('|')

    def split_3col_2(line):
        arr = line.split('|')
        return {'name':arr[0],'address':arr[1],'age':arr[2]}


    # 3.2 注册
    # 对于返回列表的注册方式
    # 方式一
    schema = StructType().add('name',StringType()).add('address',StringType()).add('age',StringType())
    split_3col_1_dsl = spark.udf.register('split_3col_1_sql',split_3col_1,schema)

    # 对于返回为字典的方式
    # 方式二:
    schema = StructType().add('name', StringType()).add('address', StringType()).add('age', StringType())
    split_3col_2_dsl = F.udf(split_3col_2,schema)

    # 3.3 使用自定义函数
    spark.sql("""
        select
            userid,
            split_3col_1_sql(line) as 3col,
            split_3col_1_sql(line)['name'] as name,
            split_3col_1_sql(line)['address'] as address,
            split_3col_1_sql(line)['age'] as age
        from t1
    """).show()

    # DSL中
    df_init.select(
        'userid',
        split_3col_2_dsl('line').alias('3col'),
        split_3col_2_dsl('line')['name'].alias('name'),
        split_3col_2_dsl('line')['address'].alias('address'),
        split_3col_2_dsl('line')['age'].alias('age')
    ).show()

3.4 基于pandas的Spark UDF函数

3.4.1 apache arrow基本介绍

    apache arrow 是apache旗下的一款顶级的项目, 是一个跨平台的在内存中以列式存储的数据层, 它设计的目的是作为一个跨平台的数据层, 来加快大数据分析项目的运行效率

    pandas与pyspark SQL 进行交互的时候, 建立在apache arrow上, 带来低开销 高性能的UDF函数

    arrow 并不会自动使用, 需要对配置以及代码做一定小的更改才可以使用并兼容

如何安装?

    pip install pyspark[sql]

    说明: 三个节点要求要安装, 如果使用除base虚拟环境以外的环境, 需要先切换到对应虚拟环境下

    注意: 
        如果安装比较慢, 可以添加一下 清华镜像源
            pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]

        不管是否使用我的虚拟机, 都建议跑一次, 看一下是否存在

如何使用呢?

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

image-20211115114425971.png

3.4.2 如何基于arrow完成pandas DF 与 Spark DF的互转操作

如何将pandas的DF对象转换spark的DF,以及如何从spark df 转换为 pandas 的 df 对象

pandas DF  --> Spark DF : 
    spark_df = sparkSession.createDataFrame(pd_df)

Spark DF  ---> pandas DF:
    pd_df = spark_df.toPandas()

代码演示:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
import os

# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("pySpark模板")

    # 1- 创建SparkSession对象
    spark = SparkSession.builder.master('local[*]').appName('udf_01').getOrCreate()

    # 开启arrow
    spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

    # 2- 创建 pandas的DF对象:
    pd_df = pd.DataFrame({'name':['张三','李四','王五'],'age':[20,18,15]})

    # 3- 可以使用panda的API来对数据进行处理操作
    pd_df = pd_df[pd_df['age'] > 16]
    print(pd_df)

    # 4- 将其转换为Spark DF
    spark_df = spark.createDataFrame(pd_df)


    # 5- 可以使用spark的相关API处理数据
    spark_df = spark_df.select(F.sum('age').alias('sum_age'))

    spark_df.printSchema()
    spark_df.show()

    # 6- 还可以将spark df 转换为pandas df
    pd_df = spark_df.toPandas()

    print(pd_df)

请注意:开启arrow方案,必须先安装arrow,否则无法使用,一执行就会报出:no module ‘pyarrow’(没有此模块)

pandas UDF代码, 请各位先检查 当前虚拟机中pyspark库是否为3.1.2

命令:
    conda list | grep pyspark

3.4.3 pandas UDF

方式一: series TO series

  • 描述: 定义一个python的函数, 接收series类型, 返回series类型, 接收一列返回一列
  • 目的: 用于定义 pandas的UDF函数

image-20220601172138718.png
代码演示:

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os

# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("基于pandas定义UDF函数")
    # 1- 创建SparkSession对象
    spark = SparkSession.builder.master('local[*]').appName('udf_01').getOrCreate()

    # 2- 构建数据集
    df_init = spark.createDataFrame([(1,3),(2,5),(3,8),(5,4),(6,7)],schema='a int,b int')
    df_init.createTempView('t1')
    # 3- 处理数据:
    # 需求: 基于pandas的UDF 完成 对 a 和 b列乘积计算
    # 3.1 自定义一个python的函数: 传入series类型, 返回series类型
    @F.pandas_udf(returnType=IntegerType())  # 装饰器  用于将pandas的函数转换为 spark SQL的函数
    def pd_cj(a:pd.Series,b:pd.Series) -> pd.Series:
        return a * b

    #3.2 对函数进行注册操作
    # 方式一:
    pd_cj_dsl = spark.udf.register('pd_cj_sql',pd_cj)

    #3.3 使用自定义函数
    # SQL
    spark.sql("""
        select a,b, pd_cj_sql(a,b) as cj from  t1
    """).show()

    # DSL
    df_init.select('a','b',pd_cj('a','b').alias('cj')).show()

从series类型 到 标量(python基本数据类型) :

  • 描述: 定义一个python函数, 接收series类型的数据, 输出为标量, 用于定义 UDAF函数 ```properties import pandas as pd from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession from pyspark.sql.types import * import pyspark.sql.functions as F import os

锁定远端python版本:

os.environ[‘SPARK_HOME’] = ‘/export/server/spark’ os.environ[‘PYSPARK_PYTHON’] = ‘/root/anaconda3/bin/python3’ os.environ[‘PYSPARK_DRIVER_PYTHON’] = ‘/root/anaconda3/bin/python3’

if name == ‘main‘: print(“基于pandas定义UDF函数”)

# 1- 创建SparkSession对象
spark = SparkSession.builder.master('local[*]').appName('udf_01').getOrCreate()

# 2- 构建数据集
df_init = spark.createDataFrame([(1,3),(1,5),(1,8),(2,4),(2,7)],schema='a int,b int')
df_init.createTempView('t1')
# 3- 处理数据:
# 需求: pandas的UDAF需求   对 B列 求和
# 3.1 创建python的函数:  接收series类型, 输出基本数据类型(标量)
@F.pandas_udf(returnType=IntegerType())
def pd_b_sum(b:pd.Series) -> int:
    return b.sum()


# 3.2 注册函数
spark.udf.register('pd_b_sum_sql',pd_b_sum)

# 3.3 使用自定义函数
spark.sql("""
    select
        pd_b_sum_sql(b) as avg_b
    from t1
""").show()

spark.sql("""
        select
            a,
            b,
            pd_b_sum_sql(b) over(partition by a order by b) as sum_b
        from t1
    """).show()

```

3.4.4 pandas UDF函数案例(作业)

image-20220601174635026.png
需求: 必须自定义panda的UDF函数使用 (无难度 (连一点点都没有))
image-20220601174644844.png