Spark基础知识
- Spark是什么?有何特点?优缺点是什么?
Spark是一种用于处理 大数据 的 分布式 开源 系统 以内存缓存和优化查询方式 对 数据进行大规模处理 和 快速分析
- 优点:
执行处理速度非常快 基于内存运算
API简单易用 并且提供了多语言的API
通用性 提供了统一的解决方案
可融合性 非常方便和其他框架融合
缺点:
主要还是对硬件要求比较高 需要吃一定量的内存
长时间运行可能会挂掉 Spark长时间运行会经常出错,在架构方面,由于大量数据被缓存在RAM中,Java回收垃圾缓慢的情况严重,导致Spark性能不稳定,在复杂场景中SQL的性能甚至不如现有的Map/Reduce
- Spark集群的部署模式有哪些?有何特点?
1)本地模式 多线程
Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类
local:只启动一个executor
local[k]:启动k个executor
local[*]:启动跟cpu数目相同的 executor
2)standalone模式 分布式
分布式部署集群,自带完整的服务,资源管理和任务监控是Spark自己监控,这个模式也是其他模式的基础。
3)Spark on yarn模式
client 客户端运行 通常用于开发调试
cluster 节点运行 通常生产环境
分布式部署集群,资源和任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式,包含cluster和client运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能,client适合调试,dirver运行在客户端。
4)Spark On Mesos模式。
粗粒度 运行之前一定分配
细粒度 按需分配
官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。用户可选择两种调度模式之一运行自己的应用程序:
(1)粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。
(2)细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。
- Spark集群部署的粗粒度和细粒度模式?
Spark on Yarn 目前仅支持粗粒度的资源分配 即任务运行之前就已经分配好了所需要的资源 在任务运行时 就一直占用这些资源 直到任务结束 优点是 作业比较多 资源复用率高 缺点是 当作业长时间卡在某一个阶段 资源就一直被占用 但远远大于这阶段所需要的资源 就造成了资源的浪费
Spark on Mesos 支持粗粒度和细粒度 细粒度就是按需分配 作业申请需要多少资源就分配多少资源 不会造成资源浪费 但是启动比较麻烦 效率低一些
- Spark和MR的区别?为什么Spark比MR快?
Spark和MR都是基于MR理论实现的并行处理框架
MR:作业单位是job,由多个map和多个reduce组成 能处理的场景比较简单 而且在执行过程中会不断重读的读写HDFS、造成大量的io操作,多个job之间需要自己去管理关系
Spark: 作业单位是application,application可以用多个job,job之下多个stage,stage又分成多个task,task可以分发到不同的executor并行的执行,提高了执行的速度 Spark所有的计算都是在内存进行的 所以执行速度很快 同时也提供了多个算子 能处理多个复杂的计算
为什么Spark比MR快
Spark是基于内存计算的 极大程度减少了磁盘IO操作
高效的调度算法 基于DAG ???为什么DAG就会很快???
容错机制Linage ???为什么血缘也很快???
- Spark生态组件有哪些?有何特点?适用于什么场合?
SparkCore Spark计算的核心 SparkSQL 处理结构化的数据 SparkStreaming 处理流式数据 Spark Mliib 机器学习 Spark GraphX 图计算
- Spark集群中的各组件功能描述?
Application:基于 Spark 的用户程序,即由用户编写的调用 Spark API 的应用程序,它由集群上的一个驱动(Driver)程序和多个执行器(Executor)程序组成。其中应用程序的入口为用户所定义的 main 方法。
SparkContext:是 Spark 所有功能的主要入口点,它是用户逻辑与 Spark 集群主要的交互接口。通过SparkContext,可以连接到集群管理器(Cluster Manager),能够直接与集群 Master 节点进行交互,并能够向 Master 节点申请计算资源,也能够将应用程序用到的 JAR 包或 Python 文件发送到多个执行器(Executor)节点上。
Cluster Manager:即集群管理器,它存在于 Master 进程中,主要用来对应用程序申请的资源进行管理。
Worker Node:任何能够在集群中能够运行 Spark 应用程序的节点。
Task:由SparkContext发送到Executor节点上执行的一个工作单元。
Driver:也即驱动器节点,它是一个运行Application中main()函数并创建SparkContext的进程。Driver节点也负责提交Job,并将Job转化为Task,在各个Executor进程间协调 Task 的调度。Driver节点可以不运行于集群节点机器上。
Executor:也即执行器节点,它是在一个在工作节点(Worker Node)上为Application启动的进程,它能够运行 Task 并将数据保存在内存或磁盘存储中,也能够将结果数据返回给Driver。
- Spark的工作机制是什么样的?Spark执行流程是什么样的?
- Driver将提交的Application构建一个SparkContext
- SparkContext将去向资源管理器去申请任务所需要的资源
- 资源管理器将资源分配到Executor上
- SparkContext将RDD算子任务构建一个DAG图
- DAGScheduler将DAG图根据宽展依赖去区分一个个stage
- stage以TaskSet发往TaskScheduler
- Executor去向SparkContext申请执行Task
- Taskscheduler就会把资源和Spark应用程序发往Executor执行
- Executor去执行对应Task 运行完毕后就释放资源
- Yarn是怎么执行一个任务的?
1)客户端client向ResouceManager提交Application,ResouceManager接受Application并根据集群资源状况选取一个node来启动Application的任务调度器driver(ApplicationMaster)。
2)ResouceManager找到那个node,命令其该node上的nodeManager来启动一个新的 JVM进程运行程序的driver(ApplicationMaster)部分,driver(ApplicationMaster)启动时会首先向ResourceManager注册,说明由自己来负责当前程序的运行。
3)driver(ApplicationMaster)开始下载相关jar包等各种资源,基于下载的jar等信息决定向ResourceManager申请具体的资源内容。
4)ResouceManager接受到driver(ApplicationMaster)提出的申请后,会最大化的满足 资源分配请求,并发送资源的元数据信息给driver(ApplicationMaster)。
5)driver(ApplicationMaster)收到发过来的资源元数据信息后会根据元数据信息发指令给具体机器上的NodeManager,让其启动具体的container。
6)NodeManager收到driver发来的指令,启动container,container启动后必须向driver(ApplicationMaster)注册。
7)driver(ApplicationMaster)收到container的注册,开始进行任务的调度和计算,直到 任务完成。
注意:如果ResourceManager第一次没有能够满足driver(ApplicationMaster)的资源请求 ,后续发现有空闲的资源,会主动向driver(ApplicationMaster)发送可用资源的元数据信息以提供更多的资源用于当前程序的运行。
Spark on Yarn模式有哪些优点?
1)与其他计算框架共享集群资源(Spark框架与MapReduce框架同时运行,如果不用Yarn进行资源分配,MapReduce分到的内存资源会很少,效率低下);资源按需分配,进而提高集群资源利用等。
2)相较于Spark自带的Standalone模式,Yarn的资源分配更加细致。
3)Application部署简化,例如Spark,Storm等多种框架的应用由客户端提交后,由Yarn负责资源的管理和调度,利用Container作为资源隔离的单位,以它为单位去使用内存,cpu等。
4)Yarn通过队列的方式,管理同时运行在Yarn集群中的多个服务,可根据不同类型的应用程序负载情况,调整对应的资源使用量,实现资源弹性管理。谈谈你对Container的理解?
1)Container作为资源分配和调度的基本单位,其中封装了的资源如内存,CPU,磁盘,网络带宽等。 目前yarn仅仅封装内存和CPU
2)Container由ApplicationMaster向ResourceManager申请的,由ResouceManager中的资源调度器异步分配给ApplicationMaster
3)Container的运行是由ApplicationMaster向资源所在的NodeManager发起的,Container运行时需提供内部执行的任务命令
- Spark的shuffle是怎么一回事?和MR的shuffle有何区别?
- 高层次上看 两者没有什么明显的区别 都是mapper任务结束后 数据发往不同分 reducer再去不同分区拉取数据然后进行下一步的处理
- 低层次上看 两者区别还是很大 主要还是采取的shuffle算法不同 mr采取的shuffle算法是sort-based,分区之前先进行排序,然后再发往分区 而Spark采取了hash-based 去掉了排序这一阶段 因为排序还是很耗费资源io的
- 实现层次上看 mr会把shuffle的每一个过程都分的比较清楚 spark并不会去刻意划分每一个过程阶段 而是通过不同stage和一系列的transformation算子来执行处理
1)从 high-level 的角度来看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。
2)从 low-level 的角度来看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作;如果你是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。
3)从实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。
如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read的处理逻辑?以及两个处理逻辑应该怎么高效实现?
Shuffle write由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。
- Spark的shuffle详解
Spark shuffle变化过程
0.8.x 之前是 hash-based shuffle 之后加入了file consolidation机制合并小文件 0.9 引入了ExternalAppendOnlyMap 1.1 引入了 sort-based shuffle 1.2 默认改成了 sort-based shuffle 1.4 引入了unsafe-shuffle 1.6 钨丝shuffle并入到了sort-based shuffle 2.0 hash-based shuffle 就退出了
hash-based shuffle spark的maptask完成之后 不进行排序 而是按照hash来进行重组分区数据 数据发往reducetask时候 会产生大量的小文件 MN的中间文件 导致频繁的磁盘IO 效率低效 同时 reducetask合并操作是放在一个hashmap里面 容易产生oom问题
引入 file consolidation 机制 来解决大量中间文件的产生 maptask的相同的分区文件进行合并 减少了文件数,但如果下游Stage分区数N很大 一个Executor有K个Core,就会开KN个Writer Handler,仍然容易导致OOM。
sort-based shuffle 参照了mr的shuffle处理方式 所有结果写入一个文件 文件按照partitionid排序 partition内部分区key排序 文件中的积累按照Partition Id排序,每个Partition内部再按照Key排序,Map Task运行期间会顺序写每个Partition数据,同时生成一个索引文件记录每个Partition的大小和偏移量。Reduce阶段,Reduce Task拉取数据做Combine时不再采用HashMap,而是采用ExternalAppendOnlyMap,该数据结果做Combine时,如果内存不足,会刷写磁盘,很大程度上保证了鲁棒性,避免了大多数OOM。
总体来看,Sort Shuffle解决了Hash Shuffle所有弊端,但因为Shuffle过程需要对记录排序,性能上有所损失。
SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。
普通机制的Sort Shuffle
这种机制和mapreduce差不多,在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。
一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,由于一个task就只对应一个磁盘文件因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量,由于每个task最终只有一个磁盘文件所以文件个数等于上游shuffle write个数。
bypass机制的Sort Shuffle

bypass运行机制的触发条件如下:
- shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认值200。
- 不是聚合类的shuffle算子(比如reduceByKey)。
此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:
第一,磁盘写机制不同;
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
钨丝sort-based-shuffle 利用堆外内存 直接二进制数据上进行sort排序 并非java对象 减少了内存使用和gc开销 也减少了频繁的序列化和反序列化
它将数据记录用二进制方式存储,直接在序列化二进制数据上Sort,而非Java对象,这样可以减少内存使用和GC开销,避免Shuffle过程中频繁序列化和反序列化。排序过程中,提供了cache-efficient sorter,使用8 bytes指针,把排序转化成指针数组排序,极大优化了排序性能。但Tungsten-Sort Based Shuffle有几个限制,Shuffle阶段不能有aggregate操作,分区数不能超过可编码的最大PartitionId(2^24-1),所有像reduceByKey类有aggregate操作的算子不能使用,会退化采用Sort Shuffle。
版本后**的改进**
从Spark-1.6.0开始,Sort Shuffle和Tungsten-Sort Based Shuffle全部统一到SortShuffle中。如果检测满足Tungsten-Sort Based Shuffle条件,会自动采用该算法,否则采用Sort Shuffle。
从Spark-2.0.0开始,Spark把Hash Shuffle移除,可以说目前Spark-2.0.0只有Sort Shuffle。
不需要排序的hash shuffle是否一定比需要排序的sort shuffle速度快?(☆☆☆☆☆)
不一定,当数据规模小,Hash shuffle快于Sorted Shuffle数据规模大的时候;当数据量大,sorted Shuffle会比Hash shuffle快很多,因为数量大的有很多小文件,不均匀,甚至出现数据倾斜,消耗内存大,1.x之前spark使用hash,适合处理中小规模,1.x之后,增加了Sorted shuffle,Spark更能胜任大规模处理了。
Sort-based shuffle的缺陷? (☆☆☆☆☆)
1)如果mapper中task的数量过大,依旧会产生很多小文件,此时在shuffle传递数据的过程中reducer段,reduce会需要同时大量的记录进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃。
2)如果需要在分片内也进行排序,此时需要进行mapper段和reducer段的两次排序。
Spark RDD知识
- RDD是什么?
RDD Resillient Distributed DataSet 分布式 弹性 数据集 是Spark 提供的一个核心抽象 Spark所有计算都是基于RDD的 准确来说 是一种 抽象的 数据集合
- RDD的5个特点是什么?
可分区分片 每一个分片都有计算函数 有对其他RDD的依赖列表 是有KV的RDD 优先计算的位置
1)有一个分片列表,就是能被切分,和Hadoop一样,能够切分的数据才能并行计算。<br /> 一组分片(partition),即数据集的基本组成单位,对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。每个分配的存储是由BlockManager实现的,每个分区都会被逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。<br /> 2)由一个函数计算每一个分片,这里指的是下面会提到的compute函数。<br /> Spark中的RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。<br /> 3)对其他RDD的依赖列表,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。<br /> RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。<br /> 4)可选:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的paritioner接口,控制Key分到哪个reduce。<br /> 一个partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。<br /> 5)可选:每一分片的优先计算位置,比如HDFS的block的所在位置应该是优先计算的位置。<br /> 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。<br />
- RDD是如何体现分布式的?
RDD是由partition组成的 partition是可以分布在不同的计算节点上的
- RDD是如何体现弹性的?
RDD之间具有依赖关系,可以基于上一个RDD来重新计算出RDD
- RDD的底层是怎么实现的?
RDD是一个分布式数据集,顾名思义,其数据应该分部存储于多台机器上。事实上,每个RDD的数据都以Block的形式存储于多台机器上,下图是Spark的RDD存储架构图,其中每个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver节点的BlockManagerMaster保存。BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系,当RDD不再需要存储的时候,将向BlockManagerSlave发送指令删除相应的Block。
介绍parition和block有什么关联关系?(☆☆☆☆☆)
1)hdfs中的block是分布式存储的最小单元,等分,可设置冗余,这样设计有一部分磁盘空间的浪费,但是整齐的block大小,便于快速找到、读取对应的内容;
2)Spark中的partion是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partion组成的。partion是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partion大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定;
3)block位于存储空间、partion位于计算空间,block的大小是固定的、partion大小是不固定的,是从2个不同的角度去看数据。
RDD的常见创建方式是哪些?
1)使用程序中的集合创建rdd
2)使用本地文件系统创建rdd
3)使用hdfs创建rdd
4)基于数据库db创建rdd
5)基于Nosql创建rdd,如hbase
6)基于s3创建rdd
7)基于数据流,如socket创建rddRDD的常用算子是哪些?哪些算子是会shuffle的?算子的底层实现是什么
转换算子
| 转换 | 含义 |
|---|---|
| map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
| filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
| flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
| mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
| mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
| union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
| intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
| distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
| groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
| reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
| sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
| sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
| join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
| cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD |
| coalesce(numPartitions) | 减少 RDD 的分区数到指定值。 |
| repartition(numPartitions) | 重新给 RDD 分区 |
| repartitionAndSortWithinPartitions(partitioner) | 重新给 RDD 分区,并且每个分区内以记录的 key 排序 |
动作算子
| 动作 | 含义 |
|---|---|
| reduce(func) | reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。 |
| collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
| count() | 返回RDD的元素个数 |
| first() | 返回RDD的第一个元素(类似于take(1)) |
| take(n) | 返回一个由数据集的前n个元素组成的数组 |
| takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
| saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
| saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
| saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
| countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
| foreach(func) | 在数据集的每一个元素上,运行函数func |
| foreachPartition(func) | 在数据集的每一个分区上,运行函数func |
会触发shuffle的算子
去重
distinct
聚合
reduceByKey
groupBy
groupByKey
aggregateByKey
combineByKey
排序
sortByKey
sortBy
重分区
coalesce
repartition
集合或者表操作
intersection
subtract
subtractByKey
join
leftOuterJoin
- RDD的map和flatmap的区别
map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象。
flatMap:对RDD每个元素转换,然后再扁平化。
将所有的对象合并为一个对象,文件中的所有行数据仅返回一个数组对象,会抛弃值为null的值。
- RDD的collect算子是怎么回事?
driver通过collect把集群中各个节点的内容收集过来汇总成结果,collect返回结果是Array类型的,collect把各个节点上的数据抓过来, 抓过来数据是Array型,collect对Array抓过来的结果进行合并,合并后Array中只有一个元素,是tuple类型(KV类型的)的。
- RDD的coalesce、reparation区别
coalesce功能:改变分区数量,只能减少,不能增加
repartition功能:改变分区数量,减少增加都可以
为什么repartition可以增加分区数量,而coalesce不可以,两者又有什么区别,我们来看一下源码:
repartition方法的源码(源码在RDD.scala搜索即可):
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
从源码可以看出,repartition方法体调用了coalesce方法,该coalesce方法有2个参数,第2个参数是shuffle = true。再来看一下coalesce方法的源码:
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] = withScope {…..}
可看到,coalesce方法的第二个参数是shuffle,但是值却是false,这与repartition方法体里调用的coalesce参数值刚好相反。
因此,可以推断出,repartition能够的增加分区的数量的根本原因是将shuffle参数设为了true。
使用建议:因为shuffle是比较消耗资源的,所以如果要减少分区的数量时,尽量使用coalesce。
- RDD的foreach、foreachPartition的区别
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))//foreach实现对rdd1里的每一个元素乘10然后打印输出rdd1.foreach(x=>println(x 10))//foreachPartition实现对rdd1里的每一个元素乘10然后打印输出rdd1.foreachPartition(iter => iter.foreach(x=>println(x 10)))//foreach:用于遍历RDD,将函数f应用于每一个元素,无返回值(action算子)。//foreachPartition: 用于遍历操作RDD中的每一个分区。无返回值(action算子)。
总结:一般使用mapPartitions或者foreachPartition算子比map和foreach更加高效,推荐使用。
- RDD的map、mapPartition、mapPartitionWithIndex 区别
val rdd1=sc.parallelize(1 to 10,5) rdd1.map(x => x10)).collect rdd1.mapPartitions(iter => iter.map(x=>x10)).collect//map:用于遍历RDD,将函数f应用于每一个元素,返回新的RDD(transformation算子)。//mapPartitions:用于遍历操作RDD中的每一个分区,返回生成一个新的RDD(transformation算子)。
总结:
如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效
比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。
mapPartitionsWithIndex的使用:
scala> val rdd1=sc.parallelize(1 to 5,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at
- RDD的宽依赖、窄依赖?
RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)
1)窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用
2)宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition
为什么要设置宽窄依赖?
窄依赖可以支持在同一个节点上链式执行多条命令,例如在执行了 map 后,紧接着执行 filter。相反,宽依赖需要所有的父分区都是可用的,可能还需要调用类似 MapReduce 之类的操作进行跨节点传递。
从失败恢复的角度考虑,窄依赖的失败恢复更有效,因为它只需要重新计算丢失的父分区即可,而宽依赖牵涉到 RDD 各级的多个父分区。
- RDD持久化和缓存
为什么要缓存、持久化
整体计算一次所消耗资源是非常大,比如shuffle,为了达到重用,提高spark的读写性能
缓存后 对一个RDD重复计算多个操作的场景 就只计算一次就行了 不用反复使用RDD 因为每一次计算RDD Spark都会重新计算 带来的开销无疑是非常大
cache persist 两个算子 做到了持久化和缓存 但并不是算子执行就立马持久化、缓存 而是等待后续的第一个action算子才开始执行缓存、持久化
cache 底层也调用的persist 等于 persist( persist(StorageLevel.MEMORY_ONLY))
| Storage Level(存储级别) | Meaning(含义) |
|---|---|
MEMORY_ONLY |
默认的缓存级别,将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。如果内存空间不够,则部分分区数据将不再缓存。 |
MEMORY_AND_DISK |
将 RDD 以反序列化的 Java 对象的形式存储 JVM 中。如果内存空间不够,将未缓存的分区数据存储到磁盘,在需要使用这些分区时从磁盘读取。 |
MEMORY_ONLY_SER |
将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式比反序列化对象节省存储空间,但在读取时会增加 CPU 的计算负担。仅支持 Java 和 Scala 。 |
MEMORY_AND_DISK_SER |
类似于 MEMORY_ONLY_SER,但是溢出的分区数据会存储到磁盘,而不是在用到它们时重新计算。仅支持 Java 和 Scala。 |
DISK_ONLY |
只在磁盘上缓存 RDD |
MEMORY_ONLY_2, MEMORY_AND_DISK_2 |
与上面的对应级别功能相同,但是会为每个分区在集群中的两个节点上建立副本。 |
OFF_HEAP |
与 MEMORY_ONLY_SER 类似,但将数据存储在堆外内存中。这需要启用堆外内存。 |
启动堆外内存需要配置两个参数:
- spark.memory.offHeap.enabled :是否开启堆外内存,默认值为 false,需要设置为 true;
- spark.memory.offHeap.size : 堆外内存空间的大小,默认值为 0,需要设置为正值。
缓存移除
Spark 会自动监视每个节点上的缓存使用情况,并按照最近最少使用(LRU)的规则删除旧数据分区。当然,你也可以使用 RDD.unpersist() 方法进行手动删除。
获取 cached partitions 的存储位置
partition 被 cache 后所在节点上的 blockManager 会通知 driver 上的 blockMangerMaster 说某 rdd 的 partition 已经被我 cache 了,这个信息会存储在 blockMangerMaster 的 blockLocations: HashMap中。等到 task 执行需要 cached rdd 的时候,会调用 blockManagerMaster 的 getLocations(blockId) 去询问某 partition 的存储位置,这个询问信息会发到 driver 那里,driver 查询 blockLocations 获得位置信息并将信息送回。
读取其他节点上的 cached partition:task 得到 cached partition 的位置信息后,将 GetBlock(blockId) 的请求通过 connectionManager 发送到目标节点。目标节点收到请求后从本地 blockManager 那里的 memoryStore 读取 cached partition,最后发送回来。
- RDD的checkpoint、容错
**
Spark中对于数据的保存除了持久化操作之外,还提供了检查点的机制;检查点本质是通过将RDD写入高可靠的磁盘,主要目的是为了容错。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。Lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。
cache 和 checkpoint 是有显著区别的,缓存把 RDD 计算出来然后放在内存中,但是 RDD 的依赖链不能丢掉, 当某个点某个 executor 宕了,上面 cache 的RDD就会丢掉, 需要通过依赖链重放计算。不同的是,checkpoint 是把RDD 保存在 HDFS中,是多副本可靠存储,此时依赖链可以丢掉,所以斩断了依赖链。
以下场景适合使用检查点机制:
DAG中的Lineage过长,如果重算,则开销太大
在宽依赖上做 Checkpoint 获得的收益更大
与cache类似 checkpoint 也是 lazy 的
RDD通过Linage(记录数据更新)的方式为何很高效?
1)lazy记录了数据的来源,RDD是不可变的,且是lazy级别的,且RDD之间构成了链条,lazy是弹性的基石。由于RDD不可变,所以每次操作就产生新的rdd, 不存在全局修改的问题,控制难度下降,所有有计算链条将复杂计算链条存储下来,计算的时候从后往前回溯 900步是上一个stage的结束,要么就checkpoint。
2)记录原数据,是每次修改都记录,代价很大如果修改一个集合,代价就很小,官方说rdd是粗粒度的操作,是为了效率,为了简化,每次都是操作数据集合, 写或者修改操作,都是基于集合的rdd的写操作是粗粒度的,rdd的读操作既可以是粗粒度的也可以是细粒度,读可以读其中的一条条的记录。
3)简化复杂度,是高效率的一方面,写的粗粒度限制了使用场景如网络爬虫,现实世界中,大多数写是粗粒度的场景。
一些杂项
union操作是产生宽依赖还是窄依赖?
产生窄依赖。
窄依赖父RDD的partition和子RDD的parition是不是都是一对一的关系?
不一定,除了一对一的窄依赖,还包含一对固定个数的窄依赖(就是对父RDD的依赖的Partition的数量不会随着RDD数量规模的改变而改变), 比如join操作的每个partiion仅仅和已知的partition进行join,这个join操作是窄依赖,依赖固定数量的父rdd,因为是确定的partition关系。
Hadoop中,Mapreduce操作的mapper和reducer阶段相当于spark中的哪几个算子?
相当于spark中的map算子和reduceByKey算子,当然还是有点区别的,MR会自动进行排序的,spark要看你用的是什么partitioner。
什么是shuffle,以及为什么需要shuffle?
shuffle中文翻译为洗牌,需要shuffle的原因是:某种具有共同特征的数据汇聚到一个计算节点上进行计算。
RDD的数据本地性
数据本地性是在哪个环节确定的?(☆☆☆☆☆)
具体的task运行在那他机器上,dag划分stage的时候确定的
Spark的数据本地性有哪几种?(☆☆☆☆☆)
Spark中的数据本地性有三种:
1)PROCESS_LOCAL是指读取缓存在本地节点的数据
2)NODE_LOCAL是指读取本地节点硬盘数据
3)ANY是指读取非本地节点数据
通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关,如果RDD经常用的话将该RDD cache到内存中,注意,由于cache是lazy的,所以必须通过一个action的触发,才能真正的将该RDD cache到内存中。
RDD的缺陷有哪些?
RDD有哪些缺陷?(☆☆☆☆☆)
1)不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的。所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是说可以一条条的读。
2)不支持增量迭代计算,Flink支持Spark 调度相关
1、为什么要进行序列化序列化?
可以减少数据的体积,减少存储空间,高效存储和传输数据,不好的是使用的时候要反序列化,非常消耗CPU。
2、Yarn中的container是由谁负责销毁的,在Hadoop Mapreduce中container可以复用么?
ApplicationMaster负责销毁,在Hadoop Mapreduce不可以复用,在spark on yarn程序container可以复用。
3、提交任务时,如何指定Spark Application的运行模式?
1)cluster模式:./spark-submit —class xx.xx.xx —master yarn —deploy-mode cluster xx.jar
2)client模式:./spark-submit —class xx.xx.xx —master yarn —deploy-mode client xx.jar
4、不启动Spark集群Master和work服务,可不可以运行Spark程序?
可以,只要资源管理器第三方管理就可以,如由yarn管理,spark集群不启动也可以使用spark;spark集群启动的是work和master,这个其实就是资源管理框架, yarn中的resourceManager相当于master,NodeManager相当于worker,做计算是Executor,和spark集群的work和manager可以没关系,归根接底还是JVM的运行, 只要所在的JVM上安装了spark就可以。
5、spark on yarn Cluster 模式下,ApplicationMaster和driver是在同一个进程么?
是,driver 位于ApplicationMaster进程中。该进程负责申请资源,还负责监控程序、资源的动态情况。
6、运行在yarn中Application有几种类型的container?
1)运行ApplicationMaster的Container:这是由ResourceManager(向内部的资源调度器)申请和启动的,用户提交应用程序时, 可指定唯一的ApplicationMaster所需的资源;
2)运行各类任务的Container:这是由ApplicationMaster向ResourceManager申请的,并由ApplicationMaster与NodeManager通信以启动之。
7、Executor启动时,资源通过哪几个参数指定?
1)num-executors是executor的数量
2)executor-memory 是每个executor使用的内存
3)executor-cores 是每个executor分配的CPU
8、为什么会产生yarn,解决了什么问题,有什么优势?
1)为什么产生yarn,针对MRV1的各种缺陷提出来的资源管理框架
2)解决了什么问题,有什么优势,参考这篇博文:http://www.aboutyun.com/forum.php?mod=viewthread&tid=6785
9、一个task的map数量由谁来决定?
一般情况下,在输入源是文件的时候,一个task的map数量由splitSize来决定的
那么splitSize是由以下几个来决定的
goalSize = totalSize / mapred.map.tasks
inSize = max {mapred.min.split.size, minSplitSize}
splitSize = max (minSize, min(goalSize, dfs.block.size))
一个task的reduce数量,由partition决定。
10、列出你所知道的调度器,说明其工作原理?
1)FiFo schedular 默认的调度器 先进先出
2)Capacity schedular 计算能力调度器 选择占用内存小 优先级高的
3)Fair schedular 调度器 公平调度器 所有job 占用相同资源
11、导致Executor产生FULL gc 的原因,可能导致什么问题?
可能导致Executor僵死问题,海量数据的shuffle和数据倾斜等都可能导致full gc。以shuffle为例,伴随着大量的Shuffle写操作,JVM的新生代不断GC, Eden Space写满了就往Survivor Space写,同时超过一定大小的数据会直接写到老生代,当新生代写满了之后,也会把老的数据搞到老生代,如果老生代空间不足了, 就触发FULL GC,还是空间不够,那就OOM错误了,此时线程被Blocked,导致整个Executor处理数据的进程被卡住。
- Spark 分区器
13、spark hashParitioner的弊端是什么?
HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是 这个key所属的分区ID;弊端是数据不均匀,容易导致数据倾斜,极端情况下某几个分区会拥有rdd的所有数据。
14、RangePartitioner分区的原理?
RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小 或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。其原理是水塘抽样。
15、rangePartioner分区器特点?
rangePartioner尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大; 但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。RangePartitioner作用:将一定范围内的数映射到某一个分区内, 在实现中,分界的算法尤为重要。算法对应的函数是rangeBounds。
16、如何理解Standalone模式下,Spark资源分配是粗粒度的?
spark默认情况下资源分配是粗粒度的,也就是说程序在提交时就分配好资源,后面执行的时候使用分配好的资源,除非资源出现了故障才会重新分配。 比如Spark shell启动,已提交,一注册,哪怕没有任务,worker都会分配资源给executor。
- Spark 调度器
5、Spark中standalone模式特点,有哪些优点和缺点?
1)特点:
(1)standalone是master/slave架构,集群由Master与Worker节点组成,程序通过与Master节点交互申请资源,Worker节点启动Executor运行;
(2)standalone调度模式使用FIFO调度方式;
(3)无依赖任何其他资源管理系统,Master负责管理集群资源。
2)优点:
(1)部署简单;
(2)不依赖其他资源管理系统。
3)缺点:
(1)默认每个应用程序会独占所有可用节点的资源,当然可以通过spark.cores.max来决定一个应用可以申请的CPU cores个数;
(2)可能有单点故障,需要自己配置master HA。
6、FIFO调度模式的基本原理、优点和缺点?
基本原理:按照先后顺序决定资源的使用,资源优先满足最先来的job。第一个job优先获取所有可用的资源,接下来第二个job再获取剩余资源。
以此类推,如果第一个job没有占用所有的资源,那么第二个job还可以继续获取剩余资源,这样多个job可以并行运行,如果第一个job很大,占用所有资源, 则第二job就需要等待,等到第一个job释放所有资源。
优点和缺点:
1)适合长作业,不适合短作业;
2)适合CPU繁忙型作业(计算时间长,相当于长作业),不利于IO繁忙型作业(计算时间短,相当于短作业)。
7、FAIR调度模式的优点和缺点?
所有的任务拥有大致相当的优先级来共享集群资源,spark多以轮训的方式为任务分配资源,不管长任务还是端任务都可以获得资源,并且获得不错的响应时间, 对于短任务,不会像FIFO那样等待较长时间了,通过参数spark.scheduler.mode 为FAIR指定。
8、CAPCACITY调度模式的优点和缺点?
1)原理:
计算能力调度器支持多个队列,每个队列可配置一定的资源量,每个队列采用 FIFO 调度策略,为了防止同一个用户的作业独占队列中的资源,该调度器会对 同一用户提交的作业所占资源量进行限定。调度时,首先按以下策略选择一个合适队列:计算每个队列中正在运行的任务数与其应该分得的计算资源之间的 比值(即比较空闲的队列),选择一个该比值最小的队列;然后按以下策略选择该队列中一个作业:按照作业优先级和提交时间顺序选择, 同时考虑用户资源量限制和内存限制
2)优点:
(1)计算能力保证。支持多个队列,某个作业可被提交到某一个队列中。每个队列会配置一定比例的计算资源,且所有提交到队列中的作业 共享该队列中的资源;
(2)灵活性。空闲资源会被分配给那些未达到资源使用上限的队列,当某个未达到资源的队列需要资源时,一旦出现空闲资源资源,便会分配给他们;
(3)支持优先级。队列支持作业优先级调度(默认是FIFO);
(4)多重租赁。综合考虑多种约束防止单个作业、用户或者队列独占队列或者集群中的资源;
(5)基于资源的调度。 支持资源密集型作业,允许作业使用的资源量高于默认值,进而可容纳不同资源需求的作业。不过,当前仅支持内存资源的调度。
17、hbase region多大会分区,spark读取hbase数据是如何划分partition的?
region超过了hbase.hregion.max.filesize这个参数配置的大小就会自动裂分,默认值是1G。
默认情况下,hbase有多少个region,Spark读取时就会有多少个partition
- Spark 写一个 wordcount
**
10、使用scala代码实现WordCount?
val conf = new SparkConf()
val sc = new SparkContext(conf)
val line = sc.textFile(“xxxx.txt”) line.flatMap(.split(“ “)).map((,1)).reduceByKey(+). collect().foreach(println) sc.stop()
Spark性能优化
数据压缩
常见的数压缩方式,你们生产集群采用了什么压缩方式,提升了多少效率?
1)数据压缩,大片连续区域进行数据存储并且存储区域中数据重复性高的状况下,可以使用适当的压缩算法。数组,对象序列化后都可以使用压缩,数更紧凑, 减少空间开销。常见的压缩方式有snappy,LZO,gz等
2)Hadoop生产环境常用的是snappy压缩方式(使用压缩,实际上是CPU换IO吞吐量和磁盘空间,所以如果CPU利用率不高,不忙的情况下, 可以大大提升集群处理效率)。snappy压缩比一般20%~30%之间,并且压缩和解压缩效率也非常高(参考数据如下):
(1)GZIP的压缩率最高,但是其实CPU密集型的,对CPU的消耗比其他算法要多,压缩和解压速度也慢;
(2)LZO的压缩率居中,比GZIP要低一些,但是压缩和解压速度明显要比GZIP快很多,其中解压速度快的更多;
(3)Zippy/Snappy的压缩率最低,而压缩和解压速度要稍微比LZO要快一些。
提升了多少效率可以从2方面回答:1)数据存储节约多少存储,2)任务执行消耗时间节约了多少,可以举个实际例子展开描述。spark.default.parallelism这个参数有什么意义,实际生产中如何设置?
1)参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能;
2)很多人都不会设置这个参数,会使得集群非常低效,你的cpu,内存再多,如果task始终为1,那也是浪费, spark官网建议task个数为CPU的核数*executor的个数的2~3倍。spark.shuffle.memoryFraction参数的含义,以及优化经验?
1)spark.shuffle.memoryFraction是shuffle调优中 重要参数,shuffle从上一个task拉去数据过来,要在Executor进行聚合操作, 聚合操作时使用Executor内存的比例由该参数决定,默认是20%如果聚合时数据超过了该大小,那么就会spill到磁盘,极大降低性能;
2)如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例, 避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用, 那么同样建议调低这个参数的值。资源调优
2、资源运行中的集中情况
(1)实践中跑的Spark job,有的特别慢,查看CPU利用率很低,可以尝试减少每个executor占用CPU core的数量,增加并行的executor数量,同时配合增加分片,整体上增加了CPU的利用率,加快数据处理速度。
(2)发现某job很容易发生内存溢出,我们就增大分片数量,从而减少了每片数据的规模,同时还减少并行的executor数量,这样相同的内存资源分配给数量更少的executor,相当于增加了每个task的内存分配,这样运行速度可能慢了些,但是总比OOM强。
(3)数据量特别少,有大量的小文件生成,就减少文件分片,没必要创建那么多task,这种情况,如果只是最原始的input比较小,一般都能被注意到;但是,如果是在运算过程中,比如应用某个reduceBy或者某个filter以后,数据大量减少,这种低效情况就很少被留意到。
3、运行资源优化配置
一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。
一个应用提交的时候设置多大的内存?设置多少Core?设置几个Executor?
./bin/spark-submit \
—master yarn-cluster \
—num-executors 100 \
—executor-memory 6G \
—executor-cores 4 \
—driver-memory 1G \
—conf spark.default.parallelism=1000 \
—conf spark.storage.memoryFraction=0.5 \
—conf spark.shuffle.memoryFraction=0.3 \
3.1 运行资源优化配置 -num-executors
参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
3.2 运行资源优化配置 -executor-memory
参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同事的作业无法运行。
3.3 运行资源优化配置 -executor-cores
参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同事的作业运行。
3.4 运行资源优化配置 -driver-memory
参数说明:该参数用于设置Driver进程的内存。
参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理(或者是用map side join操作),那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。
3.5 运行资源优化配置 -spark.default.parallelism
参数说明:该参数用于设置每个stage的默认task数量,也可以认为是分区数。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多人常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。
3.6 运行资源优化配置 -spark.storage.memoryFraction
参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
3.7 运行资源优化配置 -spark.shuffle.memoryFraction
参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
总结:
1、num-executors:应用运行时executor的数量,推荐50-100左右比较合适
2、executor-memory:应用运行时executor的内存,推荐4-8G比较合适
3、executor-cores:应用运行时executor的CPU核数,推荐2-4个比较合适
4、driver-memory:应用运行时driver的内存量,主要考虑如果使用map side join或者一些类似于collect的操作,那么要相应调大内存量
5、spark.default.parallelism:每个stage默认的task数量,推荐参数为num-executors executor-cores的2~3倍较为合适
6、spark.storage.memoryFraction:每一个executor中用于RDD缓存的内存比例,如果程序中有大量的数据缓存,可以考虑调大整个的比例,默认为60%
7、spark.shuffle.memoryFraction:每一个executor中用于Shuffle操作的内存比例,默认是20%,如果程序中有大量的Shuffle类算子,那么可以考虑其它的比例
- 程序开发调优
1、程序开发调优 :避免创建重复的RDD
需要对名为“hello.txt”的HDFS文件进行一次map操作,再进行一次reduce操作。也就是说,需要对一份数据执行两次算子操作。
错误的做法:
对于同一份数据执行多次算子操作时,创建多个RDD。//这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,然后分别对每个RDD都执行了一个算子操作。
这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;//第二次加载HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。
val rdd1 = sc.textFile(“hdfs://master:9000/hello.txt”)
rdd1.map(…)
val rdd2 = sc.textFile(“hdfs://master:9000/hello.txt”)
rdd2.reduce(…)
正确的用法:
对于一份数据执行多次算子操作时,只使用一个RDD。
2、程序开发调优 :尽可能复用同一个RDD
错误的做法:
有一个
接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。
JavaPairRDD
JavaRDD
分别对rdd1和rdd2执行了不同的算子操作。
rdd1.reduceByKey(…)
rdd2.map(…)
正确的做法:
rdd2的数据完全就是rdd1的子集而已,却创建了两个rdd,并对两个rdd都执行了一次算子操作。
此时会因为对rdd1执行map算子来创建rdd2,而多执行一次算子操作,进而增加性能开销。
其实在这种情况下完全可以复用同一个RDD。
我们可以使用rdd1,既做reduceByKey操作,也做map操作。
JavaPairRDD
rdd1 = …rdd1.reduceByKey(…)
rdd1.map(tuple._2…)
3、程序开发调优 :对多次使用的RDD进行持久化
正确的做法:
cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。
第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。
val rdd1 = sc.textFile(“hdfs://192.168.0.1:9000/hello.txt”).cache()
rdd1.map(…)
rdd1.reduce(…)
序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。
val rdd1 = sc.textFile(“hdfs://192.168.0.1:9000/hello.txt”) .persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(…)
rdd1.reduce(…)
注意:通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,导致网络较大开销
4、程序开发调优 :尽量避免使用shuffle类算子
如果有可能的话,要尽量避免使用shuffle类算子,最消耗性能的地方就是shuffle过程。
shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。
尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。
传统的join操作会导致shuffle操作。
因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
Broadcast+map的join操作,不会导致shuffle操作。
使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
val rdd3 = rdd1.map(rdd2DataBroadcast…)
注意:以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
5、程序开发调优 :使用map-side预聚合的shuffle操作
如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。
建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子
6、程序开发调优 :使用高性能的算子
使用reduceByKey/aggregateByKey替代groupByKey : map-side
使用mapPartitions替代普通map : 函数执行频率
使用foreachPartitions替代foreach : 函数执行频率
使用filter之后进行coalesce操作 : filter后对分区进行压缩
使用repartitionAndSortWithinPartitions替代repartition与sort类操作
repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子
7、程序开发调优 :广播大变量
有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。
默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。
广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。
8、程序开发调优 :使用Kryo优化序列化性能
1)在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。
2)将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
3)使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
Spark默认使用的是Java的序列化机制,你可以使用Kryo作为序列化类库,效率要比Java的序列化机制要高
// 创建SparkConf对象。
val conf = new SparkConf().setMaster(…).setAppName(…)
// 设置序列化器为KryoSerializer。
conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
9、程序开发调优 :分区Shuffle优化
当遇到userData和events进行join时,userData比较大,而且join操作比较频繁,这个时候,可以先将userData调用了 partitionBy()分区,可以极大提高效率。
cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup()等都能够受益
总结:如果遇到一个RDD频繁和其他RDD进行Shuffle类操作,比如 cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup()等,那么最好将该RDD通过partitionBy()操作进行预分区,这些操作在Shuffle过程中会减少Shuffle的数据量
10、程序开发调优 :优化数据结构
Java中,有三种类型比较耗费内存:
1)对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
2)字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
3)集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry
Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。
- 数据倾斜调优
1、数据倾斜
数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
数据倾斜俩大直接致命后果。
1、数据倾斜直接会导致一种情况:Out Of Memory。
2、运行速度慢。
主要是发生在Shuffle阶段。同样Key的数据条数太多了。导致了某个key(下图中的80亿条)所在的Task数据量太大了。远远超过其他Task所处理的数据量。
一个经验结论是:一般情况下,OOM的原因都是数据倾斜
2、如何定位数据倾斜
数据倾斜一般会发生在shuffle过程中。很大程度上是你使用了可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
原因:查看任务->查看Stage->查看代码
某个task执行特别慢的情况
某个task莫名其妙内存溢出的情况
查看导致数据倾斜的key的数据分布情况
也可从以下几种情况考虑:
1、是不是有OOM情况出现,一般是少数内存溢出的问题
2、是不是应用运行时间差异很大,总体时间很长
3、需要了解你所处理的数据Key的分布情况,如果有些Key有大量的条数,那么就要小心数据倾斜的问题
4、一般需要通过Spark Web UI和其他一些监控方式出现的异常来综合判断
5、看看代码里面是否有一些导致Shuffle的算子出现
3、数据倾斜的几种典型情况
3.1 数据源中的数据分布不均匀,Spark需要频繁交互
3.2 数据集中的不同Key由于分区方式,导致数据倾斜
3.3 JOIN操作中,一个数据集中的数据分布不均匀,另一个数据集较小(主要)
3.4 聚合操作中,数据集中的数据分布不均匀(主要)
3.5 JOIN操作中,两个数据集都比较大,其中只有几个Key的数据分布不均匀
3.6 JOIN操作中,两个数据集都比较大,有很多Key的数据分布不均匀
3.7 数据集中少数几个key数据量很大,不重要,其他数据均匀
注意:
1、需要处理的数据倾斜问题就是Shuffle后数据的分布是否均匀问题
2、只要保证最后的结果是正确的,可以采用任何方式来处理数据倾斜,只要保证在处理过程中不发生数据倾斜就可以
4、数据倾斜的处理方法
4.1 数据源中的数据分布不均匀,Spark需要频繁交互
解决方案:避免数据源的数据倾斜
实现原理:通过在Hive中对倾斜的数据进行预处理,以及在进行kafka数据分发时尽量进行平均分配。这种方案从根源上解决了数据倾斜,彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。
方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。
方案缺点:治标不治本,Hive或者Kafka中还是会发生数据倾斜。
适用情况:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。
总结:前台的Java系统和Spark有很频繁的交互,这个时候如果Spark能够在最短的时间内处理数据,往往会给前端有非常好的体验。这个时候可以将数据倾斜的问题抛给数据源端,在数据源端进行数据倾斜的处理。但是这种方案没有真正的处理数据倾斜问题。
4.2 数据集中的不同Key由于分区方式,导致数据倾斜
解决方案1:调整并行度
实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。
方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。
方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。
实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,都无法处理。
总结:调整并行度:适合于有大量key由于分区算法或者分区数的问题,将key进行了不均匀分区,可以通过调大或者调小分区数来试试是否有效
解决方案2:
缓解数据倾斜(自定义Partitioner)
适用场景:大量不同的Key被分配到了相同的Task造成该Task数据量过大。
解决方案: 使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀分配到不同的Task中。
优势: 不影响原有的并行度设计。如果改变并行度,后续Stage的并行度也会默认改变,可能会影响后续Stage。
劣势: 适用场景有限,只能将不同Key分散开,对于同一Key对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的Partitioner,不够灵活。
4.3 JOIN操作中,一个数据集中的数据分布不均匀,另一个数据集较小(主要)
解决方案:Reduce side Join转变为Map side Join
方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M),比较适用此方案。
方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。
方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。
方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。
4.4 聚合操作中,数据集中的数据分布不均匀(主要)
解决方案:两阶段聚合(局部聚合+全局聚合)
适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案
实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。
优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。
缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案将相同key的数据分拆处理
4.5 JOIN操作中,两个数据集都比较大,其中只有几个Key的数据分布不均匀
解决方案:为倾斜key增加随机前/后缀
适用场景:两张表都比较大,无法使用Map侧Join。其中一个RDD有少数几个Key的数据量过大,另外一个RDD的Key分布较为均匀。
解决方案:将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(笛卡尔积,相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join后去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。
优势:相对于Map侧Join,更能适应大数据集的Join。如果资源充足,倾斜部分数据集与非倾斜部分数据集可并行进行,效率提升明显。且只针对倾斜部分的数据做数据扩展,增加的资源消耗有限。
劣势:如果倾斜Key非常多,则另一侧数据膨胀非常大,此方案不适用。而且此时对倾斜Key与非倾斜Key分开处理,需要扫描数据集两遍,增加了开销。
注意:具有倾斜Key的RDD数据集中,key的数量比较少
4.6 JOIN操作中,两个数据集都比较大,有很多Key的数据分布不均匀
解决方案:随机前缀和扩容RDD进行join
适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义。
实现思路:将该RDD的每条数据都打上一个n以内的随机前缀。同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。最后将两个处理后的RDD进行join即可。和上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。
优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。
缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。
实践经验:曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。
注意:将倾斜Key添加1-N的随机前缀,并将被Join的数据集相应的扩大N倍(需要将1-N数字添加到每一条数据上作为前缀)
4.7 数据集中少数几个key数据量很大,不重要,其他数据均匀
解决方案:过滤少数倾斜Key
适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。
优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。
缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。
实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。
**
- shuffle调优
**
1、Shuffle优化配置 -spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
2、Shuffle优化配置 -spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
3、Shuffle优化配置 -spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
4、Shuffle优化配置 -spark.shuffle.io.retryWait
默认值:5s
参数说明: shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
5、Shuffle优化配置 -spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
6、Shuffle优化配置 -spark.shuffle.manager
默认值:sort
参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。
7、Shuffle优化配置 -spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
8、Shuffle优化配置 -spark.shuffle.consolidateFiles
默认值:false
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
总结:
1、spark.shuffle.file.buffer:主要是设置的Shuffle过程中写文件的缓冲,默认32k,如果内存足够,可以适当调大,来减少写入磁盘的数量。
2、spark.reducer.maxSizeInFight:主要是设置Shuffle过程中读文件的缓冲区,一次能够读取多少数据,如果内存足够,可以适当扩大,减少整个网络传输次数。
3、spark.shuffle.io.maxRetries:主要是设置网络连接失败时,重试次数,适当调大能够增加稳定性。
4、spark.shuffle.io.retryWait:主要设置每次重试之间的间隔时间,可以适当调大,增加程序稳定性。
5、spark.shuffle.memoryFraction:Shuffle过程中的内存占用,如果程序中较多使用了Shuffle操作,那么可以适当调大该区域。
6、spark.shuffle.manager:Hash和Sort方式,Sort是默认,Hash在reduce数量 比较少的时候,效率会很高。
7、spark.shuffle.sort. bypassMergeThreshold:设置的是Sort方式中,启用Hash输出方式的临界值,如果你的程序数据不需要排序,而且reduce数量比较少,那推荐可以适当增大临界值。
8、spark. shuffle.cosolidateFiles:如果你使用Hash shuffle方式,推荐打开该配置,实现更少的文件输出。
- 未完待续
- 未完待续
Spark的内存管理机制
- Driver的内存管理
- Executor的内存管理
堆外内存管理 1.6版本开始
堆外内存管理 在JVM上的内存管理
包括
Exectution Memory 执行内存 主要用于存放shuffle、joins、sorts产生的临时数据
Storage Memory 存储内存 主要用于存放RDD的cache或者persist的数据
User Memory 用户内存 主要用于存储用于自定义的数据结构和Spark的内部元数据
Reserved Memory 保留内存 主要用于程序的保留内存 默认300M
内存管理机制 1.6 之前 是静态划分的 预先设置好了 不能能动了
1.6之后 动态划分了 主要是Execution 和Storage 之间可以相互借用内存 避免内存浪费和内存不足的问题
1、Spark内存管理
首先我们看一下Spark的内存使用,主要分为两类:Execution Memory和Storage Memory。其中,Execution Memory主要用于计算,如shuffles、joins、sorts及aggregations等操作,Storage Memory主要用于cache数据和在集群内部传输数据。
Executor默认只使用堆内内存(On-heap Memory)。为了进一步优化内存的使用,Spark引入了堆外内存(Off-heap Memory),默认是关闭状态。接下来,将详细说明Spark Executor堆内内存与堆外内存的具体情况。
1.1 On-heap Memory
Spark Executor通过spark.executor.memory或—executor-memory配置的内存为堆内内存,可以分为以下四块区域:
- Execution Memory:主要用于shuffles、joins、sorts及aggregations等计算操作,又称为Shuffle Memory。
- Storage Memory:主要用于cache数据、unroll数据,有时也被称为Cache Memory。
- User Memory:用户内存,主要用于存储内部元数据、用户自定义的数据结构等,根据用户实际定义进行使用。
- Reserved Memory:默认300M的系统预留内存,主要用于程序运行,参见SPARK-12081。
各个区域内存情况,如下图所示:
以上图解中,参数说明如下:
参数spark.memory.fraction在Spark 1.6版本中默认0.75,即Spark Memory(Execution Memory + Storage Memory)默认占整个usableMemory(systemMemory - Reserved Memory)内存的75%,而在Spark 2.x版本中默认0.6,默认占usableMemory内存的60%。
参数spark.memory.storageFraction默认0.5,代表Storage Memory占用Spark Memory百分比,(1 - spark.memory.storageFraction)代表Execution Memory占用Spark Memory百分比,默认值0.5表示Spark Memory中Execution Memory和Storage Memory各占一半。
1.2 Off-heap Memory
为了进一步优化内存的使用,减小GC开销,Spark 1.6版本还增加了对Off-heap Memory的支持,参见SPARK-11389,但Off-heap Memory默认是关闭的,开启须设置参数spark.memory.offHeap.enabled为true,并通过参数spark.memory.offHeap.size设置堆外内存大小,单位为字节。
堆外内存划分上没有了用户内存与预留内存,只包含Execution Memory和Storage Memory两块区域,内存情况如下图所示:

以上图解中,maxOffHeapMemory大小就是spark.memory.offHeap.size的值,spark.memory.storageFraction默认值不变。
3、动态内存分配
在如上两个图解中,我们可以看到Execution Memory与Storage Memory之间有一条可以上下移动的虚线,说明Execution Memory与Storage Memory不是固定不变的,彼此之间可以相互共享,这便是Spark动态内存管理的含义。Spark 1.5及之前版本,两者是固定不变的,即前文提及的静态内存管理。
意思是说,当Execution Memory有空闲,Storage Memory不足时,Storage Memory可以借用Execution Memory,反之亦然。Execution Memory可以让Storage Memory写到磁盘,收回被占用的空间。如果Storage Memory被Execution Memory借用,因为实现上的复杂度,却收回不了空间。
4、Legacy Mode
Spark 1.6版本开始,默认使用动态(统一)内存管理模型,但之前的静态内存管理模型(StaticMemoryManager)仍然保留,通过称为Legacy模式的参数spark.memory.useLegacyMode控制,默认false为不开启静态内存管理。
5、总 结
Apache Spark从1.6.0版本开始,其内存管理模块默认采用了动态内存管理模型,一直延续使用到Spark 2.x。本文参考了社区的一些分享,结合相关图解,从Spark总体内存使用、堆内内存、堆外内存等几个方面,重点对Spark的动态内存管理这块做了简单介绍。
- 对对对的内存管理
- 对对对的内存管理
