一、spark工作原理

spark两个最重要的概念:RDD(弹性分布式数据集)和算子
RDD:
将一个大的数据集合以分布式的方式存储在集群服务器的内存中。
从物理上来讲,RDD是将一个大的数据集分块为一系列数组,这些数组以分布式的方式存储在不同的节点上,每个数据块有一个标识(blockID),这样就可以通过每个分块的元数据(mate date)对数据块进行管理,每个数据块可以存储在节点上,也可以持久化在磁盘上。
然后可以使用算子对RDD做运算,得到新的RDD
算子:
算子是spark中定义的函数,用于对RDD中的数据进行操作和转换
算子一般分为四种:
创建算子:用于将内存中的数据集或外部文件转换为RDD对象
转换算子:用于将一种RDD转换为另一种RDD
缓存算子:用于将RDD缓存于内存或者磁盘,以便后续计算重复使用
行动算子:会触发spark作业执行,将计算结果RDD转换为scala集合或标量,或保存到外部文件或数据库中
spark两个重要概念:
(1)在spark程序中,只支持对RDD粗粒度的操作,即只能对整个RDD进行整体性的操作,不能对RDD中的某个数据元素进行特殊操作,不提供只操作RDD中某个数据元素的函数接口。
(2)在spark中,RDD是只读的,读取上一次生成的RDD,多次通过某种转换算子,生成新的RDD,最终达到计算的目的。

二、spark架构及运行机制

2.1、Spark系统架构与节点角色

总的分为两种节点:
clusterMaster和slave,还有个client节点是为提交spark程序的节点
clusterMaster:
clusterMaster是逻辑上的概念,在不同的运行模式下,名称不同,比如:在yarn模式下,clusterMaster就是ResourceManage,在
clusterMaster是不执行程序的,它是管理资源、分配资源的,管理除了clusterMaster之外,其它程序占用的cpu等资源。
每个节点都要clusterMaster上面进行注册,clusterMaster通过注册的信息实时监控其他节点的运行状态和所占用的资源,以便对节点进行合理的资源分配。
slave:
slave才是spark程序中执行业务逻辑的节点
slave分为两种节点:driver(驱动节点)和worker(执行节点)
driver:
如果节点上运行了spark程序的main函数所在的进程,那么该节点就是dirver节点,driver节点作为整个应用程序的起点,负责创建sparkContext、定义一个或者多个RDD,driver可以看作是整个应用程序的大脑。
主要作用:
1、分割任务:将整个应用程序分割成多个task任务,task是物理上可执行的最小单元,一个应用程序可以有成百上千个独立的task。
2、分配任务:将task分配到可执行的worker节点上,并协调这些task在worker上完成运行。
worker:
运行exceutor进程的节点为worker节点。spark为每个应用程序在worker节点上创建一个exceutor进程,exceutor进程是实际物理任务的执行者,
exceutor主要任务:
1、执行任务:执行task任务,并将执行结果反馈给driver
2、分配内存:为RDD分配内存空间

spark程序可以运行的多种模式:
spark可以在单机上运行,也可以在集群分布式上运行,具体哪种模式可以通过设置和程序传递到sparkContext的MASTER环境变量值确定
(1)本地模式(local):本地模式下,spark作业是在单机使用非并行模式执行
(2)单机模式(standalone):单机模式运行的spark集群是对不同的应用程序采用FIFO的顺序进行调度。默认情况下,每个应用程序会单独占用所有可用节点的资源
(3)伪分布式(local-cluster)模式:伪分布式就是在单机上模拟standalone的运行模式,其调度方式与standalone完全相同,只是调度的不是实际物理节点,而是运行在单机上的为分布spark集群。
(4)yarn模式:spark集群运行在yarn资源管理平台上,在yarn模式下,可以对指定的应用程序分配一定数量的Executor,也可以设定Executor所占用的内存大小和Executor内核数量
(5)mesos模式:spark集群运行在mesos资源管理平台上,在mesos模式下,不仅可以设置spark集群使用静态资源的分配策略,也可以配置集群动态共享cpu内核的分配策略。

2.2、spark作业执行过程

简略的过程:
(1)用户编写spark程序,向集群提交spark程序
(2)启动driver节点(有两种启动方式:driver客户端启动;master节点指定一个worker节点来启动driver,充当driver节点),进入用户自定义的函数,里面包含创建RDD、转换RDD等工作,来达到自己的计算目的
(3)drver节点向master请求申请资源
(4)每个worker节点都会在master节点上进行注册节点信息,以便master节点合理的的管理分配资源,master会通过心跳检测worker节点是否存活。
(5)master节点在收到driver节点的请求后,会启动存活worker节点的Executor进程
(6)master将这些启动的资源通知到dirver节点,使driver节点用这些资源来完成spark程序
(7)driver节点收到通知后,会根据RDD在程序中的转换和执行情况对程序进行分割,将分割后的任务分配到已经申请到的多个Executor资源中
(8)每个Executor进程负责执行分配给他们的任务,并将执行结果通知给driver节点
(9)worker节点上运行的Executor进程是作业的真正执行者,在每个worker中可以启动多个Executor,每个Executor单独运行在JVM进程中,每个计算任务是运行在Executor中的一个线程
(10)Executor将计算结果保存到磁盘
(11)dirver通知client应用程序执行完成
最重要的三个步骤:
(1)生成RDD的过程:
client向集群提交spark作业后,driver会根据用户自定义的主函数创建RDD,并通过用户指定的算子来转换RDD,spark本身对RDD的操作模式是惰性计算,即只有出现行动算子时才会触发之前所有的转换算子,会将之前的转换算子形成一个整体的操作序列,然后对操作序列进程优化,选择最优的计算过程,形成DAG(有向无环图),这样可以顺序执行每个算子对RDD做出顺序的转换,也可以节省空间(不用为每次生成的中间结果分配分散的内存存储空间)
(2)生成stage的过程:
在执行DAG时,driver节点中的DAGScheduler实例会对DAG中节点见得依赖关系进行遍历,并将所有操作切分成多个调度阶段(stage)
(3)生成task的过程:
每个stage需要转换成task在集群中的worker节点执行,所以需要driver节点中的TaskScheduler实例来将stage转换成task,并提交到worker节点的Executor进程中执行。

2.3、应用初始化

将每个应用程序提交到spark集群时,都需先完成一个应用初始化的过程,就是加载配置和作业初始化,然后生成sparkContext实例,以用来连接spark集群、创建RDD、转化RDD等后续的一系列操作,初始化spark应用程序有两种情况:
(1)用spark-shell,在启动spark-shell交互式环境的时候,会自动为用户完成spark配置工作,并自动创建sparkContext实例来连接spark集群。
(2)使用spark-submit,在提交应用程序的时候需要我们来配置一些相关的参数,首先将应用程序打成jar包,然后通过配置好的spark-submit脚本将应用程序提交给spark集群处理。
spark-submit的脚本示例:
spark-submit —cluster zjyprc-hadoop-spark2.1 —master yarn-client —queue root.production.cloud_group.bigdata.caifeng —driver-memory 4g —num-executors 10 —executor-cores 1 —executor-memory 4g —conf spark.yarn.job.owners=xiaoming —keytab /etc/s_caifeng.keytab —principal s_caifeng@XIAOMI.HADOOP —conf spark.sql.hive.caseSensitiveInferenceMode=INFER_ONLY —conf spark.files.localize=hdfs://zjyprc-hadoop/spark/zjyprc-hadoop-spark2.1/cache/hive-site.xml —class com.xiaomi.npsnlp.questionnaire.PainDataToMysql /home/work/chenmengyue/test/eagle-dataflow-1.0-SNAPSHOT-shaded.jar 20220321
参数含义
spark-submit:代表的是spark-submit提交方式
—cluster:猜测是指定spark版本
—master:前面说过spark程序可以有几种不同的运行模式,通过在master中配置,这个就是指定spark程序的运行模式
—queue:这个spark程序运行所需要使用的队列
—driver-memory:driver的运行内存
—num-executors:分配的Executor数量(个人理解:其实就是并行数,因为任务都是在Executor上运行的,Executor才是执行的真正物理节点,所以可以理解为并行的进程数)
—executor-cores:每个Executor运行时所占用的核数(也就是分配的cpu数量)
—executor-memory:指明需要配置的Executor内存大小,spark对RDD的操作都是基于内存的
—conf:在conf中就可以配置一些其他信息
spark.sql.hive.caseSensitiveInferenceMode:
spark.files.localize:
—class:spark程序所在的主类
还有deploy-mode:指明driver的运行地点(driver的启动方式),前面说driver有两种启动方式,默认情况下是client方式启动
jar包存放的路径 和 主程序需要的参数分别在这个脚本的倒数第二和倒数第一

还有更多没有使用到的参数可以对应着百度查看

通过spark-submit方式,用户将spark应用程序提交到spark集群之后,spark集群的处理逻辑如下:
先通过deploy-mode参数得到用户给的启动driver节点的方式,然后driver找到class参数来获取程序的主函数入口,每个应用程序的主函数中都有一个sparkContext实例,sparkContext是整个应用程序和集群连接的接口,他会告诉应用程序如何访问一个集群,sparkContext的主要工作如下:
(1)接受sparkConf参数:spark运行离不开参数的配置,在实例化sparkContext后,就会接受营养程序中的sparkConf参数,包括Master的运行模式、应用程序的名称等参数,用来运行spark应用程序。在spark-submit的脚本中也可以配置这些参数,但是sparkConf的优先级高于spark-submit脚本,如果同时在sparkConf和spark-submit脚本配置参数,则sparkConf的参数会覆盖掉spark-submit脚本中的参数
(2)创建sparkEvn运行环境,spark的运行离不开一些重要的管理模块,sparkEvn会根据配置的参数创建不同的管理模块
(3)申请资源:sparkContext将应用程序和集群连接,并向集群申请运行Executor资源,每个应用程序会获得分布在不同节点上的Executor资源,然后sparkContext将应用程序的代码分配到Executor上,由Executor实际完成应用程序的计算任务
(4)创建sparkUI:sparkContext会为每个应用程序单独创建一个Web UI管理界面
(5)创建TaskScheduler:TaskScheduler的初始化会根据spark运行模式的不同而不同,TaskScheduler是负责每个任务的实际物理调度
(6)创建DAGScheduler:DAGScheduler是在创建TaskScheduler的基础上进行创建的,DAGScheduler负责接受提交的计算任务,并负责任务的逻辑调度
(7)提供方法:sparkContext提供了多种重要的处理数据的函数,比如fileText函数用于从HDFS路径读取数据,将读取到的数据转成RDD
一旦sparkContext创建成功,就完成了spark的初始化,就可以通过访问driver节点的4040端口来查看spark应用程序的运行状态。

2.4、创建RDD有向无环图(DAG)

spark应用程序初始化后,sparkContext会根据程序创建一个RDD,后面的都是由程序中的算子来对RDD进行一系列的转换,达到最终的计算目的,所以一般分为三个步骤:创建RDD - > 转换RDD - > 存储RDD
有向无环图就是对一个RDD做算子操作,生成一个新的RDD(原始RDD不会改变,只会通过算子计算后生成一个新的RDD),对一个RDD的操作可能有很多个算子,将这些算子连接起来,形成一个有向无环图。
从一个子RDD出发可以找到与它相关的所有父RDD,这成为世系关系。
世系关系的作用:
为了容错机制而保留的,如果后面某一个RDD数据丢失时,此时可以通过世系关系找到它的父RDD及更上级的RDD(原始数据),然后进行重新计算
因此创建RDD的有向无环图的过程就是将spark代码中一系列RDD转化操作以世系关系的形式记录下来
在这个过程中,spark并不会真正的执行这些转换,只有当行动算子触发时才会执行这些转换算子(就是spark的惰性计算
这样做的目的是:
(1)节省存储中间:节省了为每个有向无环图中节点分配独立的内存空间
(2)可以节省内存的占用时间:一些大型的、复杂的作业在需要时才执行,可以减少他们占用内存的时间
在spark中有向无环图:
顶点代表RDD及转换成RDD的算子,有向边代表了算子之间的转换

2.5、RDD有向无环图拆分

什么是有向无环图拆分:
应用程序会被拆分成多个作业提交给spark集群,spark集群不会直接对作业进行执行,spark集群会进行两次拆分和规划:第一步是:按照作业中RDD之间的转换将作业分成多个任务(task);第二步是:对任务进行规划,生成包含多个任务的阶段(stage),这一过程是由DAGScheduler完成的,输入的是一个DAG,输出的是一系列任务,所以称之为有向无环图的拆分

可以对一个数据单元RDD进行更小的拆分,就是分区(partition),在执行RDD转换的过程中就是对分区进行操作的,由于RDD之间的转换存在不同的依赖关系,所以RDD之间的依赖分为两种类型:
(1)窄依赖:一个RDD转换成另一个RDD中的分区是一对一或多对一
(2)宽依赖:一个RDD转换成另一个RDD中的分区是一对多
spark并不是直接将一个个转换操作对应的任务直接进行分配,而是进行规划,将合适的任务放在一个stage执行:首先将多个算子操作一起处理,最后再进行统一的同步操作,将窄依赖放在同一个stage,遇到宽依赖的转换就在生成一个stage,放入下一个stage中操作。
DAGScheduler负责stage的提交,维护了3个集合来存储stage的执行状态:
(1)waitingStage:存放正在执行的stage的子stage
(2)runningStage:为了防止重复提交执行stage,所以runningStage集合存放正在执行的stage
(3)failedStage:存放执行失败的stage,需要重复提交的stage
stage的提交顺序:
会生成一个stageID,stageId越小的优先级越高,越先被提交,但是由于DAGScheduler对DAG是从后往前遍历的,即使最后一个stage的id最小,提交之后,它所依赖的父stage还没执行成功,它依然不能被提交,如果它所依赖的父stage也依赖的有stage也没执行成功,它所依赖的父stage也要等到它所依赖的父stage的父stage执行成功才可以提交,就这样一次迭代,到最后所有stage都执行成功。

2.6、Task调度

2.7、Task执行

三、RDD算子

3.1、创建算子

创建算子有两种方式:
(1)基于scala集合的方式(比如list和set集合)
有两个方法:
①makeRDD
有两个参数:数据集,分区数,如果不给分区数,就采用默认的
val rdd = sparkContext.makeRdd(1 to 6) //1到6的一个集合
②parallelize
有两个参数:数据集,分区数,如果不给分区数,就采用默认的
val rdd = sparkContext.parallelize(1 to 6) //1到6的一个集合

(2)基于读取外部文件数据或者HDFS文件数据或其它Hadoop支持的文件系统的方式
(1)基于文本文件创建RDD
有两种方法:
①textFile:读取该文本文件的数据创建RDD
有两个参数:path(文本文件路径),分区数,如果不给分区数,就采用默认的2
②wholeTextFIle:读取该路径下的所有文本文件数据创建RDD
有两个参数:path(文本文件路径),分区数,如果不给分区数,就采用默认的2
(2)基于新的Hadoop API 从Hadoop 文件数据创建RDD
有两个方法(方法自行查看)

3.2、变换算子


3.3、行动算子


3.4、缓存算子