今日内容

  • 1- Spark ON hive(参数配置完成)
  • 2- Spark SQL 分布式执行引擎(会使用)
  • 3- Spark SQL 的运行机制(理解)
  • 4- 综合案例(作业)

    0 作业答案:

    数据说明:
    1. _c0,对手,胜负,主客场,命中,投篮数,投篮命中率,3分命中率,篮板,助攻,得分
    2. 0,勇士,胜,客,10,23,0.435,0.444,6,11,27
    3. 1,国王,胜,客,8,21,0.381,0.286,3,9,28
    4. 2,小牛,胜,主,10,19,0.526,0.462,3,7,29
    5. 3,火箭,负,客,8,19,0.526,0.462,7,9,20
    6. 4,快船,胜,主,8,21,0.526,0.462,7,9,28
    7. 5,热火,负,客,8,19,0.435,0.444,6,11,18
    8. 6,骑士,负,客,8,21,0.435,0.444,6,11,28
    9. 7,灰熊,负,主,10,20,0.435,0.444,6,11,27
    10. 8,活塞,胜,主,8,19,0.526,0.462,7,9,16
    11. 9,76人,胜,主,10,21,0.526,0.462,7,9,28
    需求说明: 要求每一个都要使用 自定义函数方式 ```properties 1- 助攻这列 +10 操作: 自定义 UDF

2- 篮板 + 助攻 的次数: 自定义 UDF

3- 统计 胜负的平均分: 自定义 UDAF

  1. 代码实现:
  2. ```properties
  3. 第一步: 将资料中 data.csv 数据放置到 spark sql的项目的data目录下
  4. 第二步: 编码实现操作:
  5. import pandas as pd
  6. from pyspark import SparkContext, SparkConf
  7. from pyspark.sql import SparkSession
  8. from pyspark.sql.types import *
  9. import pyspark.sql.functions as F
  10. import os
  11. # 目的: 锁定远端操作环境, 避免存在多个版本环境的问题
  12. os.environ["SPARK_HOME"] = "/export/server/spark"
  13. os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
  14. os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"
  15. if __name__ == '__main__':
  16. print("pd 函数的案例")
  17. # 1) 创建 sparkSession对象
  18. spark = SparkSession.builder.master("local[*]").appName("_02_pd_udf").getOrCreate()
  19. # 开启 arrow方案:
  20. spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
  21. # 2) 读取数据:
  22. df = spark.read.format("csv").option("header",True).option("inferSchema",True).load("file:///export/data/workspace/_03_pyspark_sql/data/data.csv")
  23. # 3) 为每一个需求 定义一个函数, 并通过装饰者方案注册即可:
  24. # 需求1 - 助攻这列 + 10 UDF
  25. @F.pandas_udf(returnType=LongType())
  26. def add_zg(n:pd.Series) -> pd.Series:
  27. return n + 10
  28. # 需求2 - 篮板 + 助攻的次数: udf
  29. @F.pandas_udf(returnType=LongType())
  30. def add_lb_zg(lb:pd.Series,zg:pd.Series) -> pd.Series:
  31. return lb + zg
  32. # 需求3 - 统计胜负的平均分: UDAF
  33. @F.pandas_udf(returnType=DoubleType())
  34. def mean_sf(score:pd.Series) -> float:
  35. return score.mean()
  36. # 4) 使用函数完成对应需求
  37. df.withColumn('助攻+10',add_zg('助攻')).show()
  38. df.withColumn('助攻+篮板',add_lb_zg('篮板','助攻')).show()
  39. df.groupby(df['胜负']).agg(mean_sf('得分')).show()

1- Spark On Hive

image.png

1.1 集成原理说明

说明:

  1. HIVEHiveServer2 本质上作用: 接收SQL语句 SQL 翻译为 MR程序,需要使用到相关元数据的时候,连接metastore来获取即可
  2. Spark on HIVE的本质: HIVEMR执行引擎替换为 Spark RDD
  3. 思考: 对于HIVE来说,本质上将SQL翻译为MR,这个操作是有hiveServer2来负责的,所以说Spark On HIVE主要目的,将HiveServer2替换掉,将其更换为Spark所提供的SparkServer2
  4. 认为Spark On Hive本质: 替换掉HIVEHiveServer2 Spark提供一个sparkhiveServer2,对接metastore 从而完成将SQL翻译为Spark RDD
  5. 好处:
  6. 1- 对于Spark来说,也不需要自己维护元数据,可以利用hive的么他store来维护
  7. 2- 集成HIVE后,整个HIVE的执行效率也提高了
  8. Spark集成目的: 抢占HIVE的时长
  9. 所以Spark后续会提供一个分布式执行引擎,次引擎就是为了模拟HiveServer2,一旦模拟后,会让用户感觉跟操作原来HIVE基本上雷同的,比如说,端口号,连接的方式,配置的方式全部都一致
  10. 思考: HIVE不启动hiveServer2是否可以执行呢? 可以的 通过 ./hive
  11. 启动hiveServer2的目的: 为了可能让更多的外部客户端连接,比如说 datagrip

1.2 配置操作

大前提: 要保证之前hive的配置没有问题

  1. 建议:
  2. on hive配置前, 尝试先单独启动hive 看看能不能启动成功, 并连接
  3. 启动hive的命令:
  4. cd /export/server/hive/bin
  5. 启动metastore:
  6. nohup ./hive --service metastore &
  7. 启动hiveserver2:
  8. nohup ./hive --service hiveserver2 &
  9. 基于beeline连接:
  10. ./beeline 进入客户端
  11. 输入: !connect jdbc:hive2://node1:10000
  12. 输入用户名: root
  13. 输入密码: 密码可以不用输入
  14. 注意:
  15. 启动hive的时候, 要保证 hadoop肯定是启动良好了
  16. 启动完成后,如果能正常连接,那么就可以退出客户端,然后将HIVE直接通过 kill -9 杀掉了

配置操作:

  1. 1) 确保 hiveconf目录下的hive-site.xml中配置了metastore服务地址
  2. <property>
  3. <name>hive.metastore.uris</name>
  4. <value>thrift://node1:9083</value>
  5. </property>
  6. 2) 需要将hiveconf目录下的hive-site.xml 拷贝到 spark conf 目录下
  7. 如果spark没有配置集群版本, 只需要拷贝node1即可
  8. 如果配置spark集群, 需要将文件拷贝每一个spark节点上
  9. 3) 启动 hivemetastore服务:
  10. cd /export/server/hive/bin
  11. nohup ./hive --service metastore &
  12. 启动后, 一定要看到有runjar的出现
  13. 4) 启动 hadoop集群, 以及spark集群(如果采用local模式, 此集群不需要启动)
  14. 5) 使用sparkbin目录下: spark-sql 脚本 或者 pyspark 脚本打开一个客户端, 进行测试即可
  15. 测试小技巧:
  16. 同时也可以将hivehiveserver2服务启动后, 然后通过hivebeeline连接hive, 然后通过hive创建一个库, spark-sql 脚本 或者 pyspark 脚本 通过 show databases 查看, 如果能看到, 说明集成成功了...
  17. 测试完成后, 可以将hivehiveserver2 直接杀掉即可, 因为后续不需要这个服务:
  18. 首先查看hiveserver2服务的进程id是多少:
  19. ps -ef | grep hiveserver2 或者 jps -m
  20. 查看后,直接通过 kill -9 杀死对应服务即可

1.3 如何在代码中集成HIVE

2- Spark SQL 分布式执行引擎

  1. 目前, 我们已经完成了spark集成hive的操作, 但是目前集成后, 如果需要连接hive, 此时需要启动一个spark的客户端(pyspark,spark-sql, 或者代码形式)才可以, 这个客户端底层, 相当于启动服务项, 用于连接hive服务, 进行处理操作, 一旦退出了这个客户端, 相当于这个服务也不存在了, 同样也就无法使用了
  2. 此操作非常类似于在hive部署的时候, 有一个本地模式部署(在启动hive客户端的时候, 内部自动启动了一个hivehiveserver2服务项)
  1. 大白话:
  2. 目前后台没有一个长期挂载的spark的服务(spark hiveserver2 服务), 导致每次启动spark客户端,都行在内部构建一个服务项, 这种方式 ,仅仅适合于测试, 不适合后续开发

如何启动spark的分布式执行引擎呢? 这个引擎可以理解为 spark的hiveserver2服务

  1. cd /export/server/spark
  2. ./sbin/start-thriftserver.sh \
  3. --hiveconf hive.server2.thrift.port=10000 \
  4. --hiveconf hive.server2.thrift.bind.host=node1 \
  5. --hiveconf spark.sql.warehouse.dir=hdfs://node1:8020/user/hive/warehouse \
  6. --master local[*]

Day07_Spark SQL_On hive、运行机制、案例 - 图2

启动后: 可以通过 beeline的方式, 连接这个服务, 直接编写SQL即可:

  1. cd /export/server/spark/bin
  2. ./beeline
  3. 输入:
  4. !connect jdbc:hive2://node1:10000

Day07_Spark SQL_On hive、运行机制、案例 - 图3

相当于模拟了一个HIVE的客户端, 但是底层运行是spark SQL 将其转换为RDD来运行的

方式二: 如何通过 datagrip 或者 pycharm 连接 spark进行操作:

Day07_Spark SQL_On hive、运行机制、案例 - 图4

Day07_Spark SQL_On hive、运行机制、案例 - 图5

Day07_Spark SQL_On hive、运行机制、案例 - 图6

Day07_Spark SQL_On hive、运行机制、案例 - 图7

注意事项: 在使用download下载驱动的时候, 可能下载比较慢, 此时可以通过手动方式, 设置一个驱动:

Day07_Spark SQL_On hive、运行机制、案例 - 图8

Day07_Spark SQL_On hive、运行机制、案例 - 图9

Day07_Spark SQL_On hive、运行机制、案例 - 图10

Day07_Spark SQL_On hive、运行机制、案例 - 图11

Day07_Spark SQL_On hive、运行机制、案例 - 图12

3- Spark SQL的运行机制

回顾:Spark RDD中Driver的运行机制

  1. 1- Driver启动后,首先会创建SparkContext对象,此对象创建完成后,在其底层同事创建: DAGSchedule TaskScheduler
  2. 2- 当遇到一个action算子后,就会触发启动一个JOB任务,首先DAGSchedule将所有action依赖的RDD算子全部合并在一个stage中,然后从后往前进行回溯,遇到RDD之间有宽依赖(shuffle),就会拆分为一个新的stage阶段,通过这种方式形成最终的一个DAG执行流程图,并且划分好stage,根据每个阶段内部的分区的数量,形成每个阶段需要运行多少个线程(Task),将每个阶段的Task线程放置到一个TaskSet列表中,最后将各个阶段的TaskSet列表发送给TaskSchedule
  3. 3- TaskSchedule接收到DAGSchedule传递过来TaskSet,一次的运行每一个stage,将其各个线程分配给各个executor来进行运行执行
  4. 4- 后续不断监控各个线程执行状态,直到整个任务执行完成

Spark SQL底层依然需要将SQL语句翻译为Spark RDD操作 所以 Spark SQL也是存在上述的流程,只不过在上述流程中加入了从Spark SQL翻译为Spark RDD的过程
image-20220602113353620.png
spark运行底层原理.png
内部详细流程:大白话梳理

  1. 1- 当客户端讲SQL提交到Spark中,首先会将SQL提交给catalyst优化器
  2. 2- 优化器首先会解析SQL,根据SQL的执行顺序形成一个逻辑语法树(AST
  3. 3- 接着对这个逻辑语法树加入相关的元数据(连接metastore服务),标识出需要从哪里读取数据,数据的存储格式,用到哪些表,表的字段类型 等等...,形成一个未优化的逻辑优化
  4. 4- catalyst开始执行RBO(规则优化)工作 : 将未优化的逻辑计划根据spark提供的优化方案对计划进行优化操作,比如说可以执行谓词下推,列值裁剪,提前聚合... 优化后得到一个优化后的逻辑执行计划
  5. 5- 开始执行 CBO(成本优化)工作: 在进行优化的时候,由于优化的规则比较多,可能匹配出多种优化方案,最终可能会产生多个优化后的逻辑执行计划,此时通过代价函数(选最优),选择出一个效率最高的物理计划
  6. 6- 将物理计划,通过代码生成器工具 将其转换为RDD程序
  7. 7- RDD提交到集群中运行,后续的运行流程 跟之前RDD运行流程就完全一致了

专业术语:

  1. 1- sparkSQL底层解析是有RBO CBO优化完成的
  2. 2- RBO是基于规则优化, 对于SQLDSL的语句通过执行引擎得到未执行逻辑计划, 在根据元数据得到逻辑计划, 之后加入列值裁剪或谓词下推等优化手段形成优化的逻辑计划
  3. 3- CBO是基于优化的逻辑计划得到多个物理执行计划, 根据代价函数选择出最优的物理执行计划
  4. 4- 通过codegenaration代码生成器完成RDD的代码构建
  5. 5- 底层依赖于DAGScheduler TaskScheduler 完成任务计算执行

如何查看物理执行计划呢?

  • 方式一 : 通过spark thrift server的服务界面: 大概率是 4040界面

image-20220602115644571.png
image-20220602115803986.png

  • 方式二: 通过SQL的命令
    1. explain SQL语句
    image-20220602115919042.png

    4. 综合案例-今日作业

4.1 新零售综合案例

数据结构介绍:

  1. InvoiceNo string 订单编号(退货订单以C 开头)
  2. StockCode string 产品代码
  3. Description string 产品描述
  4. Quantity integer 购买数量(负数表示退货)
  5. InvoiceDate string 订单日期和时间 12/1/2010 8:26
  6. UnitPrice double 商品单价
  7. CustomerID integer 客户编号
  8. Country string 国家名字
  9. 字段与字段之间的分隔符号为 逗号

E_Commerce_Data.csv

Day07_Spark SQL_On hive、运行机制、案例 - 图18

拿到数据之后, 首先需要对数据进行过滤清洗操作: 清洗目的是为了得到一个更加规整的数据

  1. 清洗需求:
  2. 需求一: 将客户id(CustomerID) 0的数据过滤掉
  3. 需求二: 将商品描述(Description) 为空的数据过滤掉
  4. 需求三: 将日期格式进行转换处理:
  5. 原有数据信息: 12/1/2010 8:26
  6. 转换为: 2010-01-12 08:26

相关的需求(DSL和SQL):

  1. (1) 客户数最多的10个国家
  2. (2) 销量最高的10个国家
  3. (3) 各个国家的总销售额分布情况
  4. (4) 销量最高的10个商品
  5. (5) 商品描述的热门关键词Top300
  6. (6) 退货订单数最多的10个国家
  7. (7) 月销售额随时间的变化趋势
  8. (8) 日销量随时间的变化趋势
  9. (9) 各国的购买订单量和退货订单量的关系
  10. (10) 商品的平均单价与销量的关系

2.1.1 完成数据清洗过滤的操作

2.1.2 需求统计分析操作

4.2 在线教育案例

数据结构基本介绍:

  1. student_id string 学生id
  2. recommendations string 推荐题目(题目与题目之间用逗号分隔)
  3. textbook_id string 教材id
  4. grade_id string 年级id
  5. subject_id string 学科id
  6. chapter_id strig 章节id
  7. question_id string 题目id
  8. score integer 点击次数
  9. answer_time string 注册时间
  10. ts timestamp 时间戳
  11. 字段与字段之间的分隔符号为 \t

Day07_Spark SQL_On hive、运行机制、案例 - 图19

需求:

  1. 需求一: 找到TOP50热点题对应科目. 然后统计这些科目中, 分别包含几道热点题目
  2. 需求二: 找到Top20热点题对应的饿推荐题目. 然后找到推荐题目对应的科目, 并统计每个科目分别包含推荐题目的条数

数据存储在 资料中: eduxxx.csv