Spark性能优化点
分配更多的资源
Executor-memory<br /> Executor-cores<br /> Driver-memory
提交任务的脚本
在实际的生产环境中,提交spark任务时,使用spark‐submit shell脚本,在里面调整对应的参数。<br /> spark‐submit \ <br /> ‐‐master spark://node1:7077 \ <br /> ‐‐class cn.itcast.WordCount \ <br /> ‐‐num‐executors 3 \ 配置executor的数量 <br /> ‐‐driver‐memory 1g \ 配置driver的内存(影响不大) <br /> ‐‐executor‐memory 1g \ 配置每一个executor的内存大小 <br /> ‐‐executor‐cores 3 \ 配置每一个executor的cpu个数 <br /> /export/servers/wordcount.jar
参数调节
参数调节到多大,算是最大
Standalone模式
第一种情况:standalone模式 先计算出公司spark集群上的所有资源 每台节点的内存大小和cpu核数, 比如:一共有20台worker节点,每台节点8g内存,10个cpu。 实际任务在给定资源的时候,可以给20个executor、每个executor的内存8g、每个executor的使用的cpu个数 10.
YARN模式
第二种情况:Yarn 先计算出yarn集群的所有大小,比如一共500g内存,100个cpu;这个时候可以分配的大资源,比如给定50个executor、每个executor的内存大小10g,每个executor使用的cpu 个数为2。 <br /> 使用原则:你能使用的资源有多大,就尽量去调节到大的大小(executor的数量:几十个到上百个不等;executor的 内存;exector的cpu个数)
提高并行度
Spark的并行度指的是什么
spark作业中,各个stage的task的数量,也就代表了spark作业在各个阶段stage的并行度! 当分配完所能分配的大资源了,然后对应资源去调节程序的并行度,如果并行度没有与资源相匹配,那么导致你 分配下去的资源都浪费掉了。同时并行运行,还可以让每个task要处理的数量变少(很简单的原理。合理设置并行度, 可以充分利用集群资源,减少每个task处理数据量,而增加性能加快运行速度。)
设置task的数量
至少设置成与spark Application 的总cpu core 数量相同(理想情况,150个core,分配150task,一起运 行,差不多同一时间运行完毕)官方推荐,task数量,设置成spark Application 总cpu core数量的2~3倍 。 <br /> 比如150个cpu core ,基本设置task数量为300~500. 与理想情况不同的,有些task会运行快一点,比如50s就完 了,有些task 可能会慢一点,要一分半才运行完,所以如果你的task数量,刚好设置的跟cpu core 数量相同,可能会 导致资源的浪费
如何设置task数量来提高并行度
设置参数spark.defalut.parallelism 默认是没有值,如果设置了值为10,它会在shuffle的过程才会起作用。
通过在构建SparkConf对象的时候设置,如:new SparkConf().set(“spark.defalut.parallelism”,”500”)
通过设置参数 spark.sql.shuffle.partitions=500 默认为200; 可以适当增大,来提高并行度。 比如设置为 spark.sql.shuffle.partitions=500
RDD的重用和持久化
可以调用rdd的cache或者persist方法。
cache方法
cache方法默认是把数据持久化到内存中 ,例如:rdd.cache ,其本质还是调用了persist方法
persist方法
persist方法中有丰富的缓存级别,这些缓存级别都定义在StorageLevel这个object中,可以结合实际的应用场 景合理的设置缓存级别。例如: rdd.persist(StorageLevel.MEMORY_ONLY),这是cache方法的实现
RDD持久化的时可以采用序列化
如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许会导致OOM内存溢出。
- 当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑使用序列化的方式在纯内存中存储。将RDD的每个 partition的数据,序列化成一个字节数组;序列化后,大大减少内存的空间占用。
- 序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。但是可以减少占用的空间和便于网络传输
- 如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。
- 为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化
广播变量的使用
Spark中分布式执行的代码需要传递到各个executor的task上运行。对于一些只读、固定的数据,每次都需要Driver 广播到各个Task上,这样效率低下。广播变量允许将变量只广播(提前广播)给各个executor。该executor上的各 个task再从所在节点的BlockManager(负责管理某个executor对应的内存和磁盘上的数据)获取变量,而不是从 Driver获取变量,从而提升了效率。<br /> executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本,网络距离 越近越好。
如何使用广播变量
例如:
(1) 通过sparkContext的broadcast方法把数据转换成广播变量,类型为Broadcast, val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6))
(2) 然后executor上的BlockManager就可以拉取该广播变量的副本获取具体的数据。 获取广播变量中的值可以通过调用其value方法 val array: Array[Int] = broadcastArray.value使用Kryo序列化(默认Java)
Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大 概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少;在集群中耗费的内存资源大大减 少。
