- 1. Spark和Hadoop的区别
- 2. Spark的核心组件
- 3. Spark的运行流程
- 4. Spark在YarnClient模式和YarnCluster模式有什么不同
- 5. RDD是什么?五大特性?
- 6. RDD的算子
- 7. RDD持久化原理,CheckPoint机制,区别
- 8. DAG,宽窄依赖,划分Stage
- 9. Spark 广播变量和累加器介绍一下?
- 10. Spark Streaming 以及基本工作原理?
- 11. SparkSQL的执行流程">
11. SparkSQL的执行流程 - 12. Spark数据倾斜
- 13. Spark堆内存与堆外内存参数
- 14. Spark SQL 程序优化的参数设置
- 15. Spark程序核内比如何设置比较合理,怎样避免OOM
- 16. SQL在Spark中的具体解析流程
- 17. Spark SQL逻辑计划转物理计划时候用到了哪些优化策略
- 18. Spark3.0新特性
- 19. Spark的shuffle过程
1. Spark和Hadoop的区别
问题:Spark 与 MapReduce 相比,Spark 运行效率更高。请说明效率更高来源于 Spark 内置的哪些机制?
- Spark把运算的中间数据(shuffle阶段产生的数据)存放在内存,迭代计算效率更高,mapreduce的中间结果需要落地,保存到磁盘。
- Spark容错性高,它通过弹性分布式数据集RDD来实现高效容错,RDD是一组分布式的存储在节点内存中的只读性的数据集,这些集合是弹性的,某一部分丢失或者出错,可以通过整个数据集的计算流程的血缘关系来实现重建,mapreduce的容错只能重新计算。
- Spark更通用,提供了transformation和action这两大类的多功能api,另外还有流式处理sparkstreaming模块、图计算等等,mapreduce只提供了map和reduce两种操作,流计算及其他的模块支持比较缺乏。
- Spark框架和生态更为复杂,有RDD,血缘lineage、执行时的有向无环图DAG, stage划分等,很多时候spark作业都需要根据不同业务场景的需要进行调优以达到性能要求,mapreduce框架及其生态相对较为简单,对性能的要求也相对较弱,运行较为稳定,适合长期后台运行。
- Spark计算框架对内存的利用和运行的并行度比mapreduce高,Spark运行容器为executor,内部ThreadPool中线程运行一个Task, mapreduce在线程内部运行container,container容器分类为MapTask和ReduceTask.程序运行并行度高。
- Spark对于executor的优化,在JVM虚拟机的基础上对内存弹性利用:storage memory与Execution memory的弹性扩容,使得内存利用效率更高。
问题:hadoop和spark使用场景?
Hadoop/MapReduce和Spark最适合的都是做离线型的数据分析,但Hadoop特别适合是单次分析的数据量“很大”的情景,而Spark则适用于数据量不是很大的情景。
问题:hadoop和spark的相同点和不同点?
- Hadoop底层使用MapReduce计算架构,只有map和reduce两种操作,表达能力比较欠缺,而且在MR过程中会重复的读写hdfs,造成大量的磁盘io读写操作,所以适合高时延环境下批处理计算的应用;
- Spark是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括map、reduce、filter、flatmap、groupbykey、reducebykey、union和join等,数据分析更加快速,所以适合低时延环境下计算的应用;
- spark与hadoop最大的区别在于迭代式计算模型。基于mapreduce框架的Hadoop主要分为map和reduce两个阶段,两个阶段完了就结束了,所以在一个job里面能做的处理很有限;spark计算模型是基于内存的迭代式计算模型,可以分为n个阶段,根据用户编写的RDD算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以spark相较于mapreduce,计算模型更加灵活,可以提供更强大的功能。
- 但是spark也有劣势,由于spark基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的轻局昂下,可能会出现各种各样的问题,比如OOM内存溢出等情况,导致spark程序可能无法运行起来,而mapreduce虽然运行缓慢,但是至少可以慢慢运行完。
2. Spark的核心组件
Spark运行角色:
Spark由4类角色组成整个Spark的运行环境
- Master角色,管理整个集群的资源,类比YARN的ResourceManager
- Worker角色,管理单个服务器端的资源,类比YARN的NodeManager
- Driver角色,管理单个Spark任务在运行时候的工作,类比YARN的ApplicationMaster
- Executor角色,单个任务运行的时候的一堆工作者,类比于YARN的容器内运行的Task
从两个层面划分:
资源管理层面:
- 管理者:Spark是Master角色,YARN是ResourceManager
- 工作者:Spark是Worker角色,YARN是NodeManager
从任务执行方面:
- 某任务管理者:Spark是Driver角色,YARN是ApplicationMaster
- 某任务执行者:Spark是Worker角色,YARN是容器中运行的具体工作进程
Driver:
Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。Driver 在 Spark 作业执行时主要负责:
- 将用户程序转化为作业(job)
- 在 Executor 之间调度任务(task)
- 跟踪 Executor 的执行情况
- 通过 UI 展示查询运行情况
Executor:
Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。
Executor 有两个核心功能:
- 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
- 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
3. Spark的运行流程
大方向上:
- 提交代码
- 生成Driver,DAG Scheduler规划逻辑任务
- 生成Executor(被Driver生成),执行任务
- Driver内TaskScheduler去监控整个Spark程序的执行
4. Spark在YarnClient模式和YarnCluster模式有什么不同
Spark 应用程序提交到 Yarn 环境中执行的时候,一般会有两种部署执行的方式:Client 和 Cluster。两种模式主要区别在于:Driver 程序的运行节点位置。
Client 模式将用于监控和调度的 Driver 模块在客户端执行,而不是在 Yarn 中,所以一般用于测试。
Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于实际生产环境。
- 启动ApplicationMaster
- AM 启动Driver
- Driver构建DAG调度器规划任务
- Driver和AM通讯,AM得知要多少容器去申请
- Driver在申请的容器内部启动Executor
- Driver内的Task调度器调度任务执行
无论是大方向还是提交YARN集群,都是先有Driver后有Executor,然后DAG调度器规划逻辑任务,Task调度器监控任务执行。
5. RDD是什么?五大特性?
RDD (Resilient Distributed Dataset) 叫做弹性分布式数据集,
- 分布式:RDD的数据是分散在各个分区上的,各个分区被托管在多个Executor上
- 弹性:RDD的分区可以动态的增减
- 数据集:存储数据的集合
特点:
- RDD是有分区的
- 算子作用在每一个分区上
- RDD之间是有血缘关系的
- (可选) 针对KV型RDD,可以自定义分区器,默认的分区规则是Hash规则
- (可选)如有可能,RDD加载器将会就近取值,在数据所在机器上启动Executor加载数据——移动数据不如移动计算
6. RDD的算子
RDD的算子分为两种,一种是Transformation,一种是Action
- Transformation: 返回值是一个新RDD,用于规划RDD的执行链条(执行的计划蓝图)
- Action: 返回值不是RDD,用于将Transformation生成的蓝图进行调用开启工作
- transformation 操作常用算子如下:
Map、MapPartitions、FlatMap、Filter、distinct、sortBy、union、reduceByKey、groupByKey、sortByKey、join
2. action 操作常用算子如下:
reduce、collect、count、save、take、aggregate、countByKey等。
Aaction算子中有两个特殊算子:
- 多数的Action算子都是将结果向Driver汇聚
- foreach和saveAsText由RDD的分区(线程)直接执行,和Driver无交互
7. RDD持久化原理,CheckPoint机制,区别
RDD的数据是过程数据,一旦产生新的RDD,旧的RDD的数据就不存在了,如果多次使用某个RDD,如果没有缓存,就需要通过依赖链条重新生成(计算)这个RDD
spark非常重要的一个功能特性就是可以将RDD持久化在内存中。
调用cache()和persist()方法即可。cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用persist()的无参版本persist(MEMORY_ONLY),将数据持久化到内存中。
CheckPoint机制阶段性的把应用数据存储到诸如HDFS等可靠存储系统中,以供恢复时使用。
- 缓存:保留血缘关系,设计是不安全的,可以存储在本地硬盘(不能是HDFS),当前Executor的内存中
- checkpoint:不保留血缘关系,设计上是安全的,可以存储在本地硬盘,HDFS,不能存在内存中
8. DAG,宽窄依赖,划分Stage
DAG(Directed Acyclic Graph 有向无环图) 指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程)
- 开始:通过 SparkContext 创建的 RDD;
- 结束:触发 Action,一旦触发 Action 就形成了一个完整的 DAG。
宽窄依赖划分:
- 宽依赖:父RDD的一个分区,全部将数据发给子RDD的一个分区,别名Shuffle
- 窄依赖:父RDD的一个分区,将数据发给子RDD的多个分区
DAG 中为什么要划分 Stage?
并行计算。 一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。
- shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个DAG 划分成多 个 Stage/阶段
- 在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline流水线,流水线内的多个平行的分区可以并行执行。
回溯算法:从后往前回溯/反向解析,遇到窄依赖加入本 Stage,遇见宽依赖进行 Stage 切分。
9. Spark 广播变量和累加器介绍一下?
- 广播变量 broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。
- 累加器 accumulators:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)。
广播变量解决了什么问题?
分布式集合RDD和本地集合进行关联使用的时候,降低内存占用以及减少网络IO传输,提高性能
累加器解决了什么问题?
分布式代码执行中,进行全局累加
10. Spark Streaming 以及基本工作原理?
Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
Spark Streaming 如何执行流式计算的?
Spark Streaming 中的流式计算其实并不是真正的流计算,而是微批计算。Spark Streaming 的 RDD 实际是一组小批次的 RDD 集合,是微批(Micro-Batch)的模型,以批为核心。
Spark Streaming 在流计算实际上是分解成一段一段较小的批处理数据(Discretized Stream),其中批处理引擎使用 Spark Core,每一段数据都会被转换成弹性分布式数据集 RDD,然后 Spark Streaming 将对 DStream 的转换操作变为 Spark 对 RDD 的转换操作,并将转换的中间结果存入内存中,整个流式计算依据业务的需要可以对中间数据进行叠加。
Spark Streaming 整合 Kafka 的两种模式?
11. SparkSQL的执行流程
12. Spark数据倾斜
数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于 其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈
数据倾斜俩大直接致命后果:
1)数据倾斜直接会导致一种情况:Out Of Memory
2)运行速度慢
主要是发生在Shuffle阶段。同样Key的数据条数太多了。导致了某个key(下图中的80亿条)所在的Task数据量太大了,远远超过其他Task所处理的数据量 。
1. 对于 Spark 中的数据倾斜问题你有什么好的方案?
前提是定位数据倾斜,是 OOM 了,还是任务执行缓慢,看日志,看 WebUI
解决方法,有多个方面:
- 避免不必要的 shuffle,如使用广播小表的方式,将 reduce-side-join 提升为 map-side-join
- 分拆发生数据倾斜的记录,分成几个部分进行,然后合并 join 后的结果
- 改变并行度,可能并行度太少了,导致个别 task 数据压力大
- 两阶段聚合,先局部聚合,再全局聚合
- 自定义 paritioner,分散 key 的分布,使其更加均匀
具体实现:
- 当小表join大表造成数据倾斜导致执行缓慢:
1)广播join
Spark join策略中,如果当一张小表足够小并且可以先缓存到内存中,那么可以使用Broadcast Hash Join,其原理就是先将小表聚合到driver端,再广播到各个executor中,那么再次进行join的时候,就相当于大表的各自分区的数据与小表进行本地join,从而规避了shuffle。
广播join默认值为10MB,由spark.sql.autoBroadcastJoinThreshold参数控制。即当表的数据量小于等于10MB时自动触发广播join。 spark sql中的广播join可以直接规避shuffle阶段,来优化掉数据倾斜的问题。 可以通过参数适当调大,也可以通过API和Hint触发。
2)打散大表扩容小表
当大表之间join进行优化:
在表与表进行join时,如果两张表都是非常大的数据量,那么可以考虑使用分桶进行join。SMB Join时得保证两表桶数量相等,join列等于排序列等于分桶列。
JOIN操作中,两个数据集都比较大,其中只有几个Key的数据分布不均匀
解决方案:为倾斜key增加随机前/后缀
JOIN操作中,两个数据集都比较大,有很多Key的数据分布不均匀
解决方案:随机前缀和扩容RDD进行join
- 数据集中的不同Key由于分区方式,导致数据倾斜
解决方案1:调整并行度
实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。
解决方案2:
自定义Partitioner(缓解数据倾斜)
适用场景:大量不同的Key被分配到了相同的Task造成该Task数据量过大。
解决方案:使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀分配到不同的Task中。
- 聚合操作中,数据集中的数据分布不均匀(主要)
解决方案:两阶段聚合(局部聚合+全局聚合)
适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案
实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。
2. Spark 中的 OOM 问题?
- map 类型的算子执行中内存溢出如 flatMap,mapPatitions
原因:map 端过程产生大量对象导致内存溢出:这种溢出的原因是在单个 map 中产生了大量的对象导致的针对这种问题。
解决方案:
- 增加堆内内存。
- 在不增加内存的情况下,可以减少每个 Task 处理数据量,使每个 Task 产生大量的对象时,Executor的内存也能够装得下。具体做法可以在会产生大量对象的 map 操作之前调用 repartition 方法,分区成更小的块传入 map。
- shuffle 后内存溢出如 join,reduceByKey,repartition。
shuffle 内存溢出的情况可以说都是 shuffle 后,单个文件过大导致的。在 shuffle 的使用,需要传入一个partitioner,大部分 Spark 中的 shuffle 操作,默认的 partitioner 都是HashPatitioner,默认值是父 RDD 中最大的分区数.这个参数 spark.default.parallelism 只对HashPartitioner 有效.如果是别的 partitioner 导致的 shuffle 内存溢出就需要重写partitioner 代码了.
- driver 内存溢出
- 用户在 Dirver 端口生成大对象,比如创建了一个大的集合数据结构。解决方案:将大对象转换成 Executor 端加载,比如调用sc.textfile 或者评估大对象占用的内存,增加 dirver 端的内存。 如若无法避免, 自我评估该大对象占用的内存, 相应增加driver-memory的值
- 从 Executor 端收集数据(collect)回 Dirver 端,建议将 driver 端对 collect 回来的数据所作的操作,转换成 executor 端 rdd 操作。
13. Spark堆内存与堆外内存参数

问题:spark.executor.memoryOverhead 和 spark.memory.offHeap.size之间的区别
spark.memory.offHeap.size和spark.executor.memoryOverhead两个参数都是控制堆外内存大小
spark.executor.memoryOverhead由 YARN 等资源管理使用,而spark.memory.offHeap.size由 Spark 核心(内存管理器)使用。关系因版本而有所不同。
Spark 2.4.5 及之前版本:spark.executor.memoryOverhead应该包括spark.memory.offHeap.size. 这意味着如果指定offHeap.size,则需要手动将此部分添加到memoryOverheadYARN。
在 Spark 3.0 中发生了变化:spark.executor.memoryOverhead不再包括spark.memory.offHeap.size。两者相互独立。
问题:Spark参数executor.memory与executor.memoryOverhead的区别
提到堆外内存,必须先去提到在yarn申请资源的单位,容器。在Spark on yarn模式中,一个容器到底会去申请多少内存资源是由堆外加上堆内两块内存决定的。yarn的配置项里一个容器可以申请多大内存由
yarn.scheduler.maximum-allocation-mb决定。 而Spark当中运行 executor 的容器最大内存大小由 spark.executor.memoryOverhead、spark.executor.memory、spark.memory.offHeap.siz 的总和决定。
- spark.executor.memory: spark提交任务时指定的堆内内存。
- spark.executor.memoryOverhead:spark堆外内存参数,内存额外开销,默认开启,默认值为spark.executor.memory*0.1并且会与最小值384mb做对比,取两个值较大的值。
- spark.memory.offHeap.size:堆外内存参数,spark中默认关闭,需要将spark.memory.enable.offheap.enable参数设置为true
14. Spark SQL 程序优化的参数设置
问题:大数据集下,应该调整哪些配置参数的值来更好地进行Spark SQL程序的优化
Spark sql默认shuffle分区个数为200,参数由spark.sql.shuffle.partitions控制,此参数只能控制Spark sql、DataFrame、DataSet分区个数。不能控制RDD分区个数 。 所以如果两表进行join产生shuffle形成一张新表,如果新表的分区不进行缩小分区操作,那么就会有200份文件插入到hdfs上,这样就有可能导致小文件过多的问题。
解决小文件过多问题也非常简单,在spark当中一个分区最终落盘形成一个文件,那么解决小文件过多问题只需将分区缩小即可。
在插入表前,添加coalesce算子指定缩小后的分区个数。那么使用此算子需要注意,coalesce算子缩小分区后那么实际处理插入数据的任务只有一个,可能会导致oom,所以需要适当控制,并且coalesce算子里的参数只能填写比原有数据分区小的值,比如当前表的分区是200,那么填写参数必须小于200,否则无效。当然缩小分区后任务的耗时肯定会变久。
同时,如果想要让任务运行得最快当然是一个task对应一个vcore,但是离线任务一般不会这样设置,为了合理利用资源,一般会将分区(也就是task)设置成vcore的2倍到3倍。
总结:跑离线任务时我们可以合理控制分区数来提高效率,可以将分区数设置为executor一共申请vcore数的2倍或3倍。Spak Sql当中改变分区的方式有repartition、coalesce算子和spark.sql.shuffle.partitions参数,并且分区和task是同一个东西,一个分区对应一个文件。
spark.sql.autoBroadcastJoinThreshold:广播join
15. Spark程序核内比如何设置比较合理,怎样避免OOM
核内比的大小是由硬件环境以及应用程序的特征决定的,基于经验的话,我们实验室的鲲鹏ARM机器上(288核 360G内存),1:1.5的核内比就可以,在华为云(192核 300G内存)的X86机器上, 1:3以及1:4比较合适。
16. SQL在Spark中的具体解析流程
- 分析,Unresolved Logical plan —> Logical plan。Spark SQL的查询计划首先起始于由SQL解析器返回的AST,或者是由API构建的DataFrame对象。在这两种情况下,都会存在未处理的属性引用(某个查询字段可能不存在,或者数据类型错误),比如查询语句:SELECT col FROM sales,关于字段col的类型,或者该字段是否是一个有效的字段,只有等到查看该sales表时才会清楚。当不能确定一个属性字段的类型或者没能够与输入表进行匹配时,称之为未处理的。Spark SQL使用Catalyst的规则以及Catalog对象(能够访问数据源的表信息)来处理这些属性。首先会构建一个Unresolved Logical Plan树,然后作用一系列的规则,最后生成Logical Plan。
- 逻辑优化,Logical plan —> Optimized Logical Plan。逻辑优化阶段使用基于规则的优化策略,比如谓词下推、投影裁剪等。经过一些列优化过后,生成优化的逻辑计划Optimized Logical Plan。
- 物理执行计划:物理计划是从逻辑计划生成的,定义了如何执行计算,是可执行的。在物理计划阶段,Spark SQL会将优化的逻辑计划生成多个物理执行计划,然后使用Cost Model计算每个物理计划的成本,最终选择一个物理计划。在这个阶段,如果确定一张表很小(可以持久化到内存),Spark SQL会使用broadcast join。需要注意的是,物理计划器也会使用基于规则的优化策略,比如将投影、过滤操作管道化一个Spark的map算子。此外,还会将逻辑计划阶段的操作推到数据源端(支持谓词下推、投影下推)。
- 代码生成,将查询部分编译成Java字节码
Spark SQL还支持读取和写入存储在Apache Hive中的数据。但是,由于Hive具有大量依赖项,因此这些依赖项不包含在默认的Spark发布包中。如果可以在类路径上找到Hive依赖项,Spark将自动加载它们。请注意,这些Hive依赖项也必须存在于所有工作节点(worker nodes)上,因为它们需要访问Hive序列化和反序列化库(SerDes)才能访问存储在Hive中的数据。
总结:
Spark SQL在Spark集群中是如何执行的?
Spark SQL会经过以下过程,
- Parser组件将SQL转换为Unresolved逻辑执行计划
- Analyzer组件通过获取Catalog存储库将Unresolved逻辑执行计划处理为Resolved逻辑执行计划
- Catalyst Optimizer组件,将Resolved逻辑执行计划转换为Optimized逻辑执行计划
- Planner组件将Optimized逻辑执行计划转换为物理执行计划
- Planner组件对上一步的物理执行计划进行评估,选择出最终的物理执行计划
- Code Generation对物理执行计划进一步优化,将一些操作串联在一起
- 生成Job(DAG)由scheduler调度到spark executors中执行
Unresolved执行计划和Resolved执行计划的区别什么?
Unresolved执行计划对SQL语法解析,而Resolved执行计划会从Catalog中拉取元数据,解析表名和列名。
逻辑执行计划和物理执行计划的区别?
逻辑执行计划只是对SQL语句中以什么样的执行顺序做一个整体描述,而物理执行计划中包含了具体要进行什么的操作。例如:是BroadcastJoin、还是SortMergeJoin等等。
Spark SQL是如何对SQL进行优化的?
由Catalyst Optimizer组件根据一系列规则对SQL进行优化,是对逻辑执行计划进行优化。例如:我们常听说的谓词下推就是其中一个规则。
17. Spark SQL逻辑计划转物理计划时候用到了哪些优化策略
SparkPlanner是一个具体的Catalyst Query Planner,它使用执行计划策略( execution planning strategies)将逻辑计划转换为一个或多个物理计划,并支持额外的策略(ExperimentalMethods)和extraPlanningStrategies。
- HashAggregate 运算符表示数据聚合,一般 HashAggregate 是成对出现,第一个 HashAggregate 是将执行节点本地的数据进行局部聚合,另一个 HashAggregate 是将各个分区的数据进一步进行聚合计算。
- Exchange 运算符其实就是 shuffle,表示需要在集群上移动数据。很多时候 HashAggregate 会以Exchange 分隔开来。
- Project 运算符是 SQL 中的投影操作,就是选择列(例如:select name, age…)。
- BroadcastHashJoin 运算符表示通过基于广播方式进行 HashJoin。
- LocalTableScan 运算符就是全表扫描本地的表。
18. Spark3.0新特性
Spark 3.0增加的新特性包括 动态分区修剪(Dynamic Partition Pruning)、 自适应查询执行(Adaptive Query Execution)、 感知调度(Accelerator-aware Scheduling)、 支持 Catalog 的数据源API(Data Source API with Catalog Supports)、 SparkR 中的向量化(Vectorization in SparkR)、 支持 Hadoop 3/JDK 11/Scala 2.12 等等。
1、动态分区裁剪(Dynamic Partition Pruning)
在3.0以前,spark是不支持动态分区的,所谓动态分区就是针对分区表中多个表进行join的时候基于运行时(runtime)推断出来的信息,在on后面的条件语句满足一定的要求后就会进行自动动态分区裁减优化。经过这个优化,查询扫描的数据大大减少,也减少了join时的内存计算开销.
2、自适应查询执行(Adaptive Query Execution)
自适应查询是指对执行计划按照实际数据分布和组织情况,评估其执行所消耗的时间和资源,从而选择代价最小的计划去执行。一般数据库的优化器有两种,一种是基于规则的优化器(RBO),一种是基于代价的优化器(CBO),自适应查询指的就是对CBO的优化。
Spark以前的调度规则是执行计划一旦确定,即使发现后续执行计划可以优化,也不可更改,而自适应查询功能则是在执行查询计划的同时,基于表和列的统计信息,对各个算子产生的中间结果集大小进行估算,根据估算结果来动态地选择最优执行计划 。
参考:大数据开发面试题之Spark 3.0特性
19. Spark的shuffle过程
Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂。
在DAG调度的过程中,Stage阶段的划分是根据是否有shuffle过程,也就是存在宽依赖的时候,需要进行shuffle,这时候会将作业job划分成多个Stage;并且在划分Stage的时候,构建Shuffle Dependency的时候进行shuffle注册,获取后续数据读取所需要的Shuffle Handle,最终每一个job提交后都会生成一个Result Stage和若干个Shuffle Map Stage,其中Result Stage表示生成作业的最终结果所在的Stage。Result Stage与Shuffle Map Stage中的task分别对应着Result Task与Shuffle Map Task。
RDD 的 Transformation 函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操作。窄依赖跟宽依赖的区别是是否发生shuffle(洗牌) 操作。宽依赖会发生shuffle操作。窄依赖是子RDD的各个分片(partition)不依赖于其他分片,能够独立计算得到结果,宽依赖指子RDD的各个分片会依赖于父RDD的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片。
Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。
优化后的HashShuffle与之前的不同点在于:
- 在一个Executor内,不同Task共享Buffer缓冲区
- 这样减少了缓冲区乃至写入磁盘文件的数量,提高性能
SortShuffle的运行机制主要分为两种,一种是普通运行机制,另一种是bypass运行机制
bypass运行机制的触发条件如下:
- shuffle reduce task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200
- 不是聚合类的shuffle算子(比如reduceByKey)
该机制与普通SortShuffleManager运行机制的不同在于:不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
参考:大数据开发面试题之介绍下Spark Shuffle
