今日内容:

  • 1- spark的入门案例: (操作)
  • 2- Spark on yarn环境构建 (参考部署文档, 集成成功, 同时要理解并会使用两种部署方式)
  • 3- Spark程序和PySpark交互流程 (重要原理)
  • 4- Spark-submit相关参数说明 (记录笔记)

1. 基于Pycharm实施入门案例

1.1 从HDFS上读取文件并实现排序

从HDFS中读取数据, 并对数据进行排序, 最后写入到HDFS上

  1. # 演示: pySpark入门案例: WordCount
  2. # 需求: 从HDFS中读取数据, 对数据进行统计分析(WordCount). 最后将结果根据单词数量进行倒序排序, 并将结果写出HDFS上
  3. from pyspark import SparkContext, SparkConf
  4. import os
  5. # 锁定远端python版本:
  6. os.environ['SPARK_HOME'] = '/export/server/spark'
  7. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  8. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  9. if __name__ == '__main__':
  10. print("WordCount案例: 从HDFS读取数据")
  11. # 1- 创建SparkContext对象:
  12. conf = SparkConf().setMaster('local[*]').setAppName('wd')
  13. sc = SparkContext(conf=conf)
  14. # 2- 读取HDFS上文件数据
  15. rdd_init = sc.textFile('hdfs://node1:8020/pyspark_data/words.txt')
  16. # 3- 对数据执行切割: 每一行都有可能产生多个单词, 所以这里切割, 是一个 1对多操作 采用flatMap()
  17. rdd_flatMap = rdd_init.flatMap(lambda line: line.split())
  18. # 4- 将每一个单词转换为 单词,1 1对 1转换
  19. rdd_map = rdd_flatMap.map(lambda word: (word,1))
  20. # 5- 根据 key 对value数据进行聚合统计
  21. rdd_res = rdd_map.reduceByKey(lambda agg,curr: agg + curr)
  22. # 6 - 对结果数据进行排序
  23. #rdd_sort = rdd_res.sortBy(lambda wd_tup: wd_tup[1],ascending=False)
  24. # 此种没有任何意义, 仅仅为了演示这几个API
  25. #rdd_res = rdd_res.map(lambda res_tup: (res_tup[1],res_tup[0]))
  26. #rdd_sort = rdd_res.sortByKey(ascending=False)
  27. #rdd_sort = rdd_sort.map(lambda res_tup: (res_tup[1], res_tup[0]))
  28. # 7- 写出到HDFS中
  29. print(rdd_res.top(10,lambda wd_tup: wd_tup[1])) # top只能进行降序排序,直接触发执行
  30. #rdd_sort.saveAsTextFile('hdfs://node1:8020/pyspark_data/output')
  31. # 8- 释放资源
  32. sc.stop()

1.2 基于Spark-Submit方式运行

  1. 基础格式:
  2. ./spark-submit --master ... 文件
  3. 说明:
  4. ./spark-submit: 用于将spark程序提交到指定的平台来运行, 同时可以设置各种相关配置参数
  5. 示例:
  6. ./spark-submit --master local[*] /export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py

2. Spark On Yarn 环境搭建

2.1 Spark On Yarn的本质

Spark On Yarn的本质: 指的将Spark程序提交到Yarn集群中, 通过yarn进行统一的调度运行操作

这种操作, 将会是以后主要的提交上线部署的方式

2.2 配置 Spark On Yarn

整个配置操作, 大家可参考<>即可

在配置的时候, 一定要细心, 多校验

2.3 提交应用测试

  • 测试1: 测试一下spark自带的一些测试脚本: pi
  1. cd /export/server/spark/bin
  2. ./spark-submit \
  3. --master yarn \
  4. --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
  5. --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
  6. /export/server/spark/examples/src/main/python/pi.py \
  7. 10
  • 测试2: 测试一下我们自己编写的WordCount案例
  1. cd /export/server/spark/bin
  2. ./spark-submit \
  3. --master yarn \
  4. --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
  5. --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
  6. /export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py
  7. 注意:
  8. python脚本中, 需要将setMaster参数设置为yarn 或者 直接删除此参数配置

可能报出一下错误:

Day02_Spark on yarn|submit|core - 图1

  1. 思考为什么会报找不到python3的错误呢?
  2. 原因: anaconda我只在node1安装了, node2node3没有安装
  3. 解决方案: 在各个节点将anaconda安装成功

说明:

  1. Spark程序运行主要由两部分组成:
  2. Driver程序(JVM程序): Spark驱动程序(任务的管家) 或者 类似于MRapplicationMaster
  3. 主要作用: 资源的申请, 任务的分配, 任务的监控管理
  4. Executor程序(JVM程序):执行器, 可以理解为是一个线程池, 内部运行多个线程(Task)
  5. 主要作用: 通过内部多个线程来执行具体的任务

2.4 两种部署方式说明

  1. 指的在将Spark任务提交到集群(YARN, Spark集群为主)的时候,提供两种提交部署方案: client模式 , cluster模式
  1. 本质区别: Spark程序中Driver程序运行在什么位置上
  2. client模式: 客户端, Driver程序运行在执行spark-submit所在节点上 默认就是client模式
  3. 好处: 由于Driver是运行在客户端, 当执行完成后, 需要查看结果, 此时executor会将结果返回给Driver, Driver在客户端, 直接答应, 我们直接在客户端看到执行结果 (方便测试)
  4. 在客户端模式下, 不存在Driver的日志, 因为日志是直接输出客户端
  5. 弊端: 由于Driverexecutor有可能不在同一个环境中,会导致中间网络传输效率比较低, 从而影响整体的效率
  6. 此种方式一般在生产环境中不使用, 主要使用在测试环境
  7. cluster模式: 集群模式, Driver程序运行在提交集群所在的某一个节点上
  8. 好处: Driver程序和executor都在同一个集群环境中, 在进行传输数据的时候, 可以更大利用内部网络带宽优势, 提升效率
  9. 弊端: 不方便测试, Driver运行在集群环境中,所有的内容全部都会记录到日志文件中, 无法会给提交的客户端, 所以客户端想要查看结果, 需要看日志
  10. 此种方式一般用于生产环境

如何使用这两种方式呢?

  1. cd /export/server/spark/bin
  2. ./spark-submit \
  3. --master yarn \
  4. --deploy-mode client | cluster \
  5. --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
  6. --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
  7. /export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py
  8. 演示cluster:
  9. cd /export/server/spark/bin
  10. ./spark-submit \
  11. --master yarn \
  12. --deploy-mode cluster \
  13. --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
  14. --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
  15. /export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py

如何查看Spark日志:

  1. 查看Spark日志一般有二种方式: 一种为通过yarn查看 一种通过SPark18080端口查看
  • Yarn环境中:

Day02_Spark on yarn|submit|core - 图2

Day02_Spark on yarn|submit|core - 图3

  • Spark日志服务器: 18080

Day02_Spark on yarn|submit|core - 图4

Day02_Spark on yarn|submit|core - 图5

查看executor和Driver的日志:

Day02_Spark on yarn|submit|core - 图6

查看对应Task的日志:

Day02_Spark on yarn|submit|core - 图7

Day02_Spark on yarn|submit|core - 图8

Day02_Spark on yarn|submit|core - 图9

日志服务器: 日志告警系统

3. Spark 程序与PySpark交互流程

把我认为的变成你认为的点 就理解

Day02_Spark on yarn|submit|core - 图10

Spark和PySpark交互流程: 提交到Spark集群, 部署方式为Client

  1. 1- 启动Driver程序
  2. 2- Master申请资源
  3. 3- Master根据要申请的资源, 返回对应资源列表
  4. executor1 : node1 1 1GB
  5. executor2: node3 1 1GB
  6. 4- 连接对应worker节点, 通知他们启动Executor,当我worker启动完成后, 需要反向注册回Driver(通知)
  7. 5- Driver开始执行Main函数:
  8. 5.1 初始化sc对象: 构建SparkContext, 基于py4j. python中定义的如何初始化sc对象的代码转换为java代码进行执行处理
  9. 5.2 Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor (任务分配)
  10. 5.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理
  11. 5.4 executor接收到任务后, 根据任务信息, 开始运行处理, Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了
  12. 5.5 Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭, 通知Master执行完成了, Master回收资源

Spark和PySpark交互流程: 提交到Spark集群, 部署方式为Cluster

  1. 1- 首先会先将任务信息提交到Master主节点上
  2. 2- Master收到任务信息后, 首先会根据Driver的资源信息, 随机找一台worker节点用于启动Driver程序,并将任务信息交给Driver
  3. 3- 当对应worker节点收到请求后, 开启启动Driver, 启动后会和Master保持心跳机制,告知Master启动成功了, 启动后立即开始申请资源(executor)
  4. 4- Master根据要申请的资源, 返回对应资源列表
  5. executor1 : node1 1 1GB
  6. executor2: node3 1 1GB
  7. 5- 连接对应worker节点, 通知他们启动Executor,当我worker启动完成后, 需要反向注册回Driver(通知)
  8. 6- Driver开始执行Main函数:
  9. 6.1 初始化sc对象: 构建SparkContext, 基于py4j. python中定义的如何初始化sc对象的代码转换为java代码进行执行处理
  10. 6.2 Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor (任务分配)
  11. 6.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理
  12. 6.4 executor接收到任务后, 根据任务信息, 开始运行处理, Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了
  13. 6.5 Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭, 通知Master执行完成了, Master回收资源

Spark和PySpark交互流程: 提交到Yarn集群, 部署方式为Client

Day02_Spark on yarn|submit|core - 图11

  1. 1- 启动Driver程序
  2. 2- 连接Yarn的主节点(resourceManager),向主节点提交一个资源的任务(目标: 启动executor)
  3. 3- YarnresourceManager接收到任务后, 开始随机在某一个nodemanager节点上启动appMaster, AappMaster启动后会和resourceManager建立心跳机制, 报告已经启动完成了
  4. 4- AppMaster根据资源信息要求,向resourceManager申请资源, 通过心跳的方式将申请资源信息发送到RM,然后不断的询问RM是否已经准备好了
  5. 5- AppMaster一旦检测到RM中已经将资源准备好了,那么就会立即将资源分配结果信息获取到, 根据分配资源信息在对应的nodemanager节点上启动进程(executor)
  6. 6- executor启动完成后, 通知appMaster 同时反向注册到Driver端(通知)
  7. 5- Driver收到各个executor的注册信息后, 开始执行Main函数:
  8. 5.1 初始化sc对象: 构建SparkContext, 基于py4j. python中定义的如何初始化sc对象的代码转换为java代码进行执行处理
  9. 5.2 Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor (任务分配)
  10. 5.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理
  11. 5.4 executor接收到任务后, 根据任务信息, 开始运行处理, Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了, 同时会执行完成的状态通知appMaster, AppMaster收到全部的节点都结果完成后, 通知RM 任务已经执行完成, RM通知appMaster可以退出(自杀过程),并且回收yarn的资源
  12. 5.5 Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭

Spark和PySpark交互流程: 提交到Yarn集群, 部署方式为Client

Day02_Spark on yarn|submit|core - 图12

  1. Yarnclient模式和集群模式的区别:
  2. client模式下:
  3. Driver程序和 appMaster程序都是独立的两个程序
  4. Driver程序负责任务的分配, 任务的监控的工作,与任务相关的管理操作
  5. appMaster程序负责申请资源, 启动executor
  6. cluster模式下:
  7. Driver程序和appMaster程序合二为一: 共同负责 资源申请, 任务的分配 任务的监控等各项工作

4. Spark-Submit相关参数说明

  1. spark-submit 这个命令 是我们spark提供的一个专门用于提交spark程序的客户端, 可以将spark程序提交到各种资源调度平台上: 比如说 **local(本地)**, spark集群,**yarn集群**, 云上调度平台(k8s ...)
  2. spark-submit在提交的过程中, 设置非常多参数, 调整任务相关信息
  • 基本参数设置

Day02_Spark on yarn|submit|core - 图13

  • Driver的资源配置参数

Day02_Spark on yarn|submit|core - 图14

  • executor的资源配置参数

Day02_Spark on yarn|submit|core - 图15

5. Spark Core

5.1 RDD的基本介绍

RDD: 弹性分布式数据集

目的: 为了支持高效的迭代计算


  1. 早期计算模型: 单机模型
  2. 比如说: python计算, MySQL
  3. 适用于: 小规模数据的处理操作,中间结果也可以保存的内存或者磁盘
  4. 随着发展, 单机模型并不能够应对未来数据不断的增长, 因为单机总有资源的上限, 无法无限制的扩展,思考如何解决庞大数据的处理操作, 想到方案: 分布式计算操作
  5. 产生了MapReduce计算框架, 通过分而治之思想, 将任务拆解多个部分, 进行分布式运算, 从而解决大规模数据计算的操作
  6. MR的计算流程:
  7. 首先读取数据,将数据读取到内存中, 对数据进行转换处理, 将处理后的数据写入到环形缓冲区(内存),当环形缓冲区写满后, 将数据溢写到磁盘上, 接着将磁盘上多个溢写的文件进行合并操作(从磁盘到内存, 从内存到磁盘) 最后reduce来拉取处理又是从磁盘到内存, 内存到磁盘操作
  8. 这种处理模型最大好处, 可以在有限的资源下, 处理大规模的数据
  9. 同样也存在着一些弊端, 比如说 IO比较大, 运行效率比较低的
  10. 如果在计算过程中, 需要进行迭代计算操作, 对于MR来说, 需要使用多个MR串行处理, 第一个MR执行完成后, 将结果保存下来, 然后第二个MR读取第一个MR执行结果, 进行处理操作, 依次类推
  11. 这种方式同样效率比较低, 因为每一次运行MR都要重新申请资源,而且整个IO同样也在增大
  12. 思考: 那么能不能有一种技术可以支持高效计算, 同时也可以更好的迭代计算操作, 并且也可以处理大规模的数据 ?
  13. 那么这个技术就是RDD(弹性分布式数据集),而SparkRDD进行实现, 所有说学习Spark 主要学习RDD

MR的迭代流程图:

Day02_Spark on yarn|submit|core - 图16

Spark的迭代流程图:

Day02_Spark on yarn|submit|core - 图17

算子: 在spark中将一些具有某种功能的RDD函数称为算子

5.2 RDD的五大特性(知道)

  1. RDD的五大特性:
  2. 1) (必须)可分区的: 每一个RDD必须能够进行分区操作, 每一个分区代表就是一个Task线程,可分区也是分布式计算要求
  3. 2) (必须)存在计算函数: 每一个RDD都是由一个计算函数而得出的 计算函数是作用在每一个分区的
  4. 3) (必须)RDD之间存在依赖关系
  5. 4) (可选)对于kv类型的RDD都是存在分区函数的 :
  6. 5) (可选)移动存储不如移动计算(让计算离数据越近越好)

5.3 RDD的五大特点(知道)

  1. 1) 分区: RDD可以分区的
  2. 2) 只读: 不允许进行修改, 如果要修改, 处理后会得到一个新的RDD, 原有RDD保持现状
  3. 3) 依赖: RDD之间有依赖关系(血缘关系) : 两种依赖关系(宽依赖| 窄依赖)
  4. 4) 缓存(cache): 在进行RDD计算操作的时候, 当一个RDD的结果被重复使用多次的时候, 可以将RDD的结果进行缓存,后续使用的时候, 可以直接读取缓存的数据 不需要重新计算, 提升效率
  5. 5) 检查点(checkpoint): RDD之间的依赖链条越长后, 为了减少中间处理失败整个回溯效率低的问题, 可以进行checkpoint操作, RDD依赖链条中保存几个快照(checkpoint), 将阶段的结果保存到磁盘(HDFS)上, 当后续RDD出现失败了 可以直接从磁盘上读取阶段结果, 不需要进行回溯整个流程, 从而提升效率

5.4 如何构建一个RDD

Day02_Spark on yarn|submit|core - 图18

  1. 构建RDD主要有二种方式:
  2. 一种是通过本地集合模拟数据方式 (Test 测试使用)
  3. 一种通过读取外部文件的方式(正常使用方式)

方式一: 通过本地集合模拟数据方式

  1. # 演示如何创建RDD对象" 方式一
  2. from pyspark import SparkContext, SparkConf
  3. import os
  4. # 锁定远端python版本:
  5. os.environ['SPARK_HOME'] = '/export/server/spark'
  6. os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
  7. os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
  8. if __name__ == '__main__':
  9. print("演示:如何创建RDD")
  10. # 1- 构建SparkContext对象
  11. conf = SparkConf().setMaster("local[3]").setAppName('create_rdd')
  12. sc = SparkContext(conf=conf)
  13. # 2- 构建RDD对象
  14. rdd_init = sc.parallelize(['张三','李四','王五','赵六','田七','周八','李九'],5)
  15. # 需求:给每一个元素添加一个 10序号
  16. rdd_map = rdd_init.map(lambda name: name + '10')
  17. # 如何查看RDD中每个分区的数据,以及分区的数量
  18. print(rdd_init.getNumPartitions())
  19. print(rdd_init.glom().collect())
  20. print(rdd_map.glom().collect())

说明:

  1. 验证RDD是存在分区的, 而且每一个计算函数作用在每个分区上进行处理
  2. 如何查看RDD有多少个分区: rdd.getNumPartitions()
  3. 如何查看RDD中每一个分区的数据: rdd.glom()
  4. 当采用本地模拟数据(并行方式)构建RDD的时候, RDD分区数量取决于:
  5. 1- 默认: setMaster("local[N]") 取决于 N的值, N为*的时候, 表示为当前节点的CPU核心数
  6. 2- 同时也可以通过parallelize算子来设置分区数量, 设置多少就为多少个分区