今日内容:
- 1- spark的入门案例: (操作)
- 2- Spark on yarn环境构建 (参考部署文档, 集成成功, 同时要理解并会使用两种部署方式)
- 3- Spark程序和PySpark交互流程 (重要原理)
- 4- Spark-submit相关参数说明 (记录笔记)
1. 基于Pycharm实施入门案例
1.1 从HDFS上读取文件并实现排序
从HDFS中读取数据, 并对数据进行排序, 最后写入到HDFS上
# 演示: pySpark入门案例: WordCount
# 需求: 从HDFS中读取数据, 对数据进行统计分析(WordCount). 最后将结果根据单词数量进行倒序排序, 并将结果写出HDFS上
from pyspark import SparkContext, SparkConf
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("WordCount案例: 从HDFS读取数据")
# 1- 创建SparkContext对象:
conf = SparkConf().setMaster('local[*]').setAppName('wd')
sc = SparkContext(conf=conf)
# 2- 读取HDFS上文件数据
rdd_init = sc.textFile('hdfs://node1:8020/pyspark_data/words.txt')
# 3- 对数据执行切割: 每一行都有可能产生多个单词, 所以这里切割, 是一个 1对多操作 采用flatMap()
rdd_flatMap = rdd_init.flatMap(lambda line: line.split())
# 4- 将每一个单词转换为 单词,1 1对 1转换
rdd_map = rdd_flatMap.map(lambda word: (word,1))
# 5- 根据 key 对value数据进行聚合统计
rdd_res = rdd_map.reduceByKey(lambda agg,curr: agg + curr)
# 6 - 对结果数据进行排序
#rdd_sort = rdd_res.sortBy(lambda wd_tup: wd_tup[1],ascending=False)
# 此种没有任何意义, 仅仅为了演示这几个API
#rdd_res = rdd_res.map(lambda res_tup: (res_tup[1],res_tup[0]))
#rdd_sort = rdd_res.sortByKey(ascending=False)
#rdd_sort = rdd_sort.map(lambda res_tup: (res_tup[1], res_tup[0]))
# 7- 写出到HDFS中
print(rdd_res.top(10,lambda wd_tup: wd_tup[1])) # top只能进行降序排序,直接触发执行
#rdd_sort.saveAsTextFile('hdfs://node1:8020/pyspark_data/output')
# 8- 释放资源
sc.stop()
1.2 基于Spark-Submit方式运行
基础格式:
./spark-submit --master ... 文件
说明:
./spark-submit: 用于将spark程序提交到指定的平台来运行, 同时可以设置各种相关配置参数
示例:
./spark-submit --master local[*] /export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py
2. Spark On Yarn 环境搭建
2.1 Spark On Yarn的本质
Spark On Yarn的本质: 指的将Spark程序提交到Yarn集群中, 通过yarn进行统一的调度运行操作
这种操作, 将会是以后主要的提交上线部署的方式
2.2 配置 Spark On Yarn
整个配置操作, 大家可参考<
在配置的时候, 一定要细心, 多校验
2.3 提交应用测试
- 测试1: 测试一下spark自带的一些测试脚本: pi
cd /export/server/spark/bin
./spark-submit \
--master yarn \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/server/spark/examples/src/main/python/pi.py \
10
- 测试2: 测试一下我们自己编写的WordCount案例
cd /export/server/spark/bin
./spark-submit \
--master yarn \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py
注意:
在python脚本中, 需要将setMaster参数设置为yarn 或者 直接删除此参数配置
可能报出一下错误:
思考为什么会报找不到python3的错误呢?
原因: anaconda我只在node1安装了, node2和node3没有安装
解决方案: 在各个节点将anaconda安装成功
说明:
Spark程序运行主要由两部分组成:
Driver程序(JVM程序): Spark驱动程序(任务的管家) 或者 类似于MR中applicationMaster
主要作用: 资源的申请, 任务的分配, 任务的监控管理
Executor程序(JVM程序):执行器, 可以理解为是一个线程池, 内部运行多个线程(Task)
主要作用: 通过内部多个线程来执行具体的任务
2.4 两种部署方式说明
指的在将Spark任务提交到集群(YARN, Spark集群为主)的时候,提供两种提交部署方案: client模式 , cluster模式
本质区别: Spark程序中Driver程序运行在什么位置上
client模式: 客户端, Driver程序运行在执行spark-submit所在节点上 默认就是client模式
好处: 由于Driver是运行在客户端, 当执行完成后, 需要查看结果, 此时executor会将结果返回给Driver, Driver在客户端, 直接答应, 我们直接在客户端看到执行结果 (方便测试)
在客户端模式下, 不存在Driver的日志, 因为日志是直接输出客户端
弊端: 由于Driver和executor有可能不在同一个环境中,会导致中间网络传输效率比较低, 从而影响整体的效率
此种方式一般在生产环境中不使用, 主要使用在测试环境
cluster模式: 集群模式, Driver程序运行在提交集群所在的某一个节点上
好处: Driver程序和executor都在同一个集群环境中, 在进行传输数据的时候, 可以更大利用内部网络带宽优势, 提升效率
弊端: 不方便测试, Driver运行在集群环境中,所有的内容全部都会记录到日志文件中, 无法会给提交的客户端, 所以客户端想要查看结果, 需要看日志
此种方式一般用于生产环境
如何使用这两种方式呢?
cd /export/server/spark/bin
./spark-submit \
--master yarn \
--deploy-mode client | cluster \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py
演示cluster:
cd /export/server/spark/bin
./spark-submit \
--master yarn \
--deploy-mode cluster \
--conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \
--conf "spark.pyspark.python=/root/anaconda3/bin/python3" \
/export/data/workspace/sz30_pyspark_parent/_01_pyspark_base/src/_02_pySpark_wd.py
如何查看Spark日志:
查看Spark日志一般有二种方式: 一种为通过yarn查看 一种通过SPark的18080端口查看
- Yarn环境中:
- Spark日志服务器: 18080
查看executor和Driver的日志:
查看对应Task的日志:
日志服务器: 日志告警系统
3. Spark 程序与PySpark交互流程
把我认为的变成你认为的点 就理解
Spark和PySpark交互流程: 提交到Spark集群, 部署方式为Client
1- 启动Driver程序
2- 向Master申请资源
3- Master根据要申请的资源, 返回对应资源列表
executor1 : node1 1核 1GB
executor2: node3 1核 1GB
4- 连接对应worker节点, 通知他们启动Executor,当我worker启动完成后, 需要反向注册回Driver(通知)
5- Driver开始执行Main函数:
5.1 初始化sc对象: 构建SparkContext, 基于py4j. 将python中定义的如何初始化sc对象的代码转换为java代码进行执行处理
5.2 当Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor上 (任务分配)
5.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理
5.4 当executor接收到任务后, 根据任务信息, 开始运行处理, 当Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了
5.5 当Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭, 通知Master执行完成了, Master回收资源
Spark和PySpark交互流程: 提交到Spark集群, 部署方式为Cluster
1- 首先会先将任务信息提交到Master主节点上
2- 当Master收到任务信息后, 首先会根据Driver的资源信息, 随机找一台worker节点用于启动Driver程序,并将任务信息交给Driver
3- 当对应worker节点收到请求后, 开启启动Driver, 启动后会和Master保持心跳机制,告知Master启动成功了, 启动后立即开始申请资源(executor)
4- Master根据要申请的资源, 返回对应资源列表
executor1 : node1 1核 1GB
executor2: node3 1核 1GB
5- 连接对应worker节点, 通知他们启动Executor,当我worker启动完成后, 需要反向注册回Driver(通知)
6- Driver开始执行Main函数:
6.1 初始化sc对象: 构建SparkContext, 基于py4j. 将python中定义的如何初始化sc对象的代码转换为java代码进行执行处理
6.2 当Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor上 (任务分配)
6.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理
6.4 当executor接收到任务后, 根据任务信息, 开始运行处理, 当Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了
6.5 当Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭, 通知Master执行完成了, Master回收资源
Spark和PySpark交互流程: 提交到Yarn集群, 部署方式为Client
1- 启动Driver程序
2- 连接Yarn的主节点(resourceManager),向主节点提交一个资源的任务(目标: 启动executor)
3- Yarn的resourceManager接收到任务后, 开始随机在某一个nodemanager节点上启动appMaster, AappMaster启动后会和resourceManager建立心跳机制, 报告已经启动完成了
4- AppMaster根据资源信息要求,向resourceManager申请资源, 通过心跳的方式将申请资源信息发送到RM,然后不断的询问RM是否已经准备好了
5- 当AppMaster一旦检测到RM中已经将资源准备好了,那么就会立即将资源分配结果信息获取到, 根据分配资源信息在对应的nodemanager节点上启动进程(executor)
6- 当executor启动完成后, 通知appMaster 同时反向注册到Driver端(通知)
5- Driver收到各个executor的注册信息后, 开始执行Main函数:
5.1 初始化sc对象: 构建SparkContext, 基于py4j. 将python中定义的如何初始化sc对象的代码转换为java代码进行执行处理
5.2 当Driver程序构建好SC对象后, 开始将后续的所有的RDD相关的算子全部合并在一起, 根据rdd之间的依赖的关系, 形成一个DAG执行流程图 , 划分出一共有多少个阶段 以及每个阶段需要运行多少个线程, 最后每个线程需要运行在那个executor上 (任务分配)
5.3 当确定每个节点中有多少个线程 以及每个线程应该运行在那个executor后, 接下来将这些任务信息发送给对应executor, 让其进行执行处理
5.4 当executor接收到任务后, 根据任务信息, 开始运行处理, 当Task运行完成后, 如果需要将结果返回给Driver(比如: 执行collect()),直接通过网络返回执行, 如果不需要返回(执行:saveasTextFile()),线程直接处理, 当内部Task执行完成后, executor会通知Driver已经执行完成了, 同时会执行完成的状态通知appMaster, AppMaster收到全部的节点都结果完成后, 通知RM 任务已经执行完成, RM通知appMaster可以退出(自杀过程),并且回收yarn的资源
5.5 当Driver程序接收到所有的节点都运行完成后, 执行 后续的非RDD的代码, 并最终将sc对象关闭
Spark和PySpark交互流程: 提交到Yarn集群, 部署方式为Client
Yarn的client模式和集群模式的区别:
client模式下:
Driver程序和 appMaster程序都是独立的两个程序
Driver程序负责任务的分配, 任务的监控的工作,与任务相关的管理操作
appMaster程序负责申请资源, 启动executor
cluster模式下:
Driver程序和appMaster程序合二为一: 共同负责 资源申请, 任务的分配 任务的监控等各项工作
4. Spark-Submit相关参数说明
spark-submit 这个命令 是我们spark提供的一个专门用于提交spark程序的客户端, 可以将spark程序提交到各种资源调度平台上: 比如说 **local(本地)**, spark集群,**yarn集群**, 云上调度平台(k8s ...)
spark-submit在提交的过程中, 设置非常多参数, 调整任务相关信息
- 基本参数设置
- Driver的资源配置参数
- executor的资源配置参数
5. Spark Core
5.1 RDD的基本介绍
RDD: 弹性分布式数据集
目的: 为了支持高效的迭代计算
早期计算模型: 单机模型
比如说: python计算, MySQL
适用于: 小规模数据的处理操作,中间结果也可以保存的内存或者磁盘
随着发展, 单机模型并不能够应对未来数据不断的增长, 因为单机总有资源的上限, 无法无限制的扩展,思考如何解决庞大数据的处理操作, 想到方案: 分布式计算操作
产生了MapReduce计算框架, 通过分而治之思想, 将任务拆解多个部分, 进行分布式运算, 从而解决大规模数据计算的操作
MR的计算流程:
首先读取数据,将数据读取到内存中, 对数据进行转换处理, 将处理后的数据写入到环形缓冲区(内存),当环形缓冲区写满后, 将数据溢写到磁盘上, 接着将磁盘上多个溢写的文件进行合并操作(从磁盘到内存, 从内存到磁盘) 最后reduce来拉取处理又是从磁盘到内存, 内存到磁盘操作
这种处理模型最大好处, 可以在有限的资源下, 处理大规模的数据
同样也存在着一些弊端, 比如说 IO比较大, 运行效率比较低的
如果在计算过程中, 需要进行迭代计算操作, 对于MR来说, 需要使用多个MR串行处理, 第一个MR执行完成后, 将结果保存下来, 然后第二个MR读取第一个MR执行结果, 进行处理操作, 依次类推
这种方式同样效率比较低, 因为每一次运行MR都要重新申请资源,而且整个IO同样也在增大
思考: 那么能不能有一种技术可以支持高效计算, 同时也可以更好的迭代计算操作, 并且也可以处理大规模的数据 ?
那么这个技术就是RDD(弹性分布式数据集),而Spark将RDD进行实现, 所有说学习Spark 主要学习RDD
MR的迭代流程图:
Spark的迭代流程图:
算子: 在spark中将一些具有某种功能的RDD函数称为算子
5.2 RDD的五大特性(知道)
RDD的五大特性:
1) (必须)可分区的: 每一个RDD必须能够进行分区操作, 每一个分区代表就是一个Task线程,可分区也是分布式计算要求
2) (必须)存在计算函数: 每一个RDD都是由一个计算函数而得出的 计算函数是作用在每一个分区的
3) (必须)RDD之间存在依赖关系
4) (可选)对于kv类型的RDD都是存在分区函数的 :
5) (可选)移动存储不如移动计算(让计算离数据越近越好)
5.3 RDD的五大特点(知道)
1) 分区: RDD可以分区的
2) 只读: 不允许进行修改, 如果要修改, 处理后会得到一个新的RDD, 原有RDD保持现状
3) 依赖: RDD之间有依赖关系(血缘关系) : 两种依赖关系(宽依赖| 窄依赖)
4) 缓存(cache): 在进行RDD计算操作的时候, 当一个RDD的结果被重复使用多次的时候, 可以将RDD的结果进行缓存,后续使用的时候, 可以直接读取缓存的数据 不需要重新计算, 提升效率
5) 检查点(checkpoint): 当RDD之间的依赖链条越长后, 为了减少中间处理失败整个回溯效率低的问题, 可以进行checkpoint操作, 在RDD依赖链条中保存几个快照(checkpoint), 将阶段的结果保存到磁盘(HDFS)上, 当后续RDD出现失败了 可以直接从磁盘上读取阶段结果, 不需要进行回溯整个流程, 从而提升效率
5.4 如何构建一个RDD
构建RDD主要有二种方式:
一种是通过本地集合模拟数据方式 (Test 测试使用)
一种通过读取外部文件的方式(正常使用方式)
方式一: 通过本地集合模拟数据方式
# 演示如何创建RDD对象" 方式一
from pyspark import SparkContext, SparkConf
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示:如何创建RDD")
# 1- 构建SparkContext对象
conf = SparkConf().setMaster("local[3]").setAppName('create_rdd')
sc = SparkContext(conf=conf)
# 2- 构建RDD对象
rdd_init = sc.parallelize(['张三','李四','王五','赵六','田七','周八','李九'],5)
# 需求:给每一个元素添加一个 10序号
rdd_map = rdd_init.map(lambda name: name + '10')
# 如何查看RDD中每个分区的数据,以及分区的数量
print(rdd_init.getNumPartitions())
print(rdd_init.glom().collect())
print(rdd_map.glom().collect())
说明:
验证RDD是存在分区的, 而且每一个计算函数作用在每个分区上进行处理
如何查看RDD有多少个分区: rdd.getNumPartitions()
如何查看RDD中每一个分区的数据: rdd.glom()
当采用本地模拟数据(并行方式)构建RDD的时候, RDD分区数量取决于:
1- 默认: setMaster("local[N]") 取决于 N的值, 当N为*的时候, 表示为当前节点的CPU核心数
2- 同时也可以通过parallelize算子来设置分区数量, 设置多少就为多少个分区