1 spark集群各角色分析

Master进程
Worker进程
当我们提交spark任务的时候(spark-shell ,spark-submit)
生成了一个Applications,而且会占用所有WOrker的cores,每一个占用了1g内存。
Spark算子 - 图1
通过jps命令,可以查看到
在执行spark-submit的节点上,有spark-submit(dirver)进程,
在任务执行的节点上(worker节点上),有 CoarseGrainedExecutorBackend(executor) 进程。
然后,当我们的任务执行完毕之后,这两个进程都会退出了。

2 弹性分布式数据集RDD

2.1 RDD概述

2.1.1 产生背景

为了解决开发人员能在大规模的集群中以一种容错的方式进行内存计算,提出了RDD的概念,而当前的很多框架对迭代式算法场景与交互性数据挖掘场景的处理性能非常差, 这个是 RDDs 的提出的动机。

2.1.2 什么是RDD

RDD是Spark的计算模型。RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、只读的,被分区的数据集。
RDDs 并不要始终被具体化, 一个 RDD 有足够的信息知道自己是从哪个数据集计算而来的(就是所谓的依赖血统)
用户可以控制RDD的两个方面:数据存储和分区。对于需要复用的RDD,用户可以明确的选择一种数据存储策略(比如:内存缓存,内存+磁盘等),也可以基于一个元素的key来为RDD所有的元素在机器节点间进行数据分区。
Spark算子 - 图2
RDD是spark中的一个基本抽象(可以理解为代理)
有了RDD,就可以像操作本地的集合一样,有很多的方法可以调用,使用方便,而无需关心底层的调度细节。

2.1.3 RDD操作类型

RDD支持两种类型的操作:
转换(Transformation) 现有的RDD通过转换生成一个新的RDD。lazy模式,延迟执行。
转换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join, coalesce等等。
动作(Action) 在RDD上运行计算,并返回结果给驱动程序(Driver)或写入文件系统。
动作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。
RDD操作流程示意:
Spark算子 - 图3
Spark算子 - 图4
RDD的转换与操作
Spark算子 - 图5
wordcount示例,查看lazy特性。
只有在执行action时,才会真正开始运算,并得到结果或存入文件中。

2.2 创建RDD

  1. 集合并行化创建(通过scala集合创建)

val arr = Array(1 to 10)
// val arr = Array(1,2,3,4,5)
val rdd = sc.parallelize(arr)
val rdd =sc.makeRDD(arr)
Spark算子 - 图6
通过集合并行化方式创建RDD,适用于本地测试,做实验

  1. 外部文件系统,比如HDFS等

val rdd2 = sc.textFile(“hdfs://hdp-01:9000/words.txt”)
// 读取本地文件
val rdd2 = sc.textFile(“file:///root/words.txt”)
最常用的方式。
3)从父RDD转换成新的子RDD
Transformation类的方法
Spark算子 - 图7

2.3 RDD的分区

rdd中和文件切片相关的概念叫做分区,也就是说对rdd进行操作,实际上是操作的rdd中的每一个分区,分区的数量决定了并行的数量。
使用rdd.partitions.size或者rdd.partitions.length查看分区数量。
Spark算子 - 图8

使用并行化方式创建RDD,RDD的分区数量和指定的核数是相同的(—total-executor-cores)
如果从外部创建RDD,比如从hdfs中读取数据,那么分区的数量和读取的数据量切片一致。而且不能设置分区数量小于切片数量。
如果文件只有一个block块,默认情况下(集群模式),分区数量是2。
# data.partitions.size

使用集合并行化方式创建RDD:
默认的分区数量,是由启动任务所分配的cores来决定的。—total-executor-cores 2
可以任意的增加或者减少分区数量。
val data = sc.makeRDD(Array(1,2,4,5,6,8,8),1)

2.4 RDD编程API

2.4.1 算子综述

算子是RDD中定义的方法,分为转换(transformantion)和动作(action)。Tranformation算子并不会触发Spark提交作业,直至Action算子才提交任务执行,这是一个延迟计算的设计技巧,可以避免内存过快被中间计算占满,从而提高内存的利用率。
RDD拥有的操作比MR丰富的多,不仅仅包括Map、Reduce操作,还包括filter、sort、join、save、count等操作,并且中间结果不需要保存,所以Spark比MR更容易方便完成更复杂的任务。

2.4.2 Transformation

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

常用的Transformation:

转换 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 先map,再flatten压平
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
subtract(otherDataset) 求差集后返回新的RDD,出现在源rdd中,不在otherrdd中
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 笛卡尔积
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 针对分区内部使用seqOp方法,针对最后的结果使用combOp方法。
coalesce(numPartitions) 用于对RDD进行重新分区,第一个参数是分区的数量,第二个参数是是否进行shuffle,可不传,默认不shuffle
repartition(numPartitions) 用于对RDD进行重新分区,相当于shuffle版的calesce
repartitionAndSortWithinPartitions(partitioner) 在分区中按照key进行排序,这种方式比先分区再sort更高效,因为相当于在shuffle阶段就进行排序。
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是
(Int, Interator[T]) => Iterator[U]

2.4.3 Action

动作 含义
reduce(func) 通过func函数聚集RDD中的所有元素
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
collectAsMap 类似于collect。该函数用于Pair RDD,最终返回Map类型的结果。
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统
Top(n) 按照默认排序(降序) 取数据
takeOrdered(n, [ordering]) 与top类似,顺序相反
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。foreach,任务在executor中运行,打印信息也会在executor中显示
Foreach 对分区进行操作

coalesce和repartition
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
将RDD分区的数量修改为numPartitions,对于经过过滤后的大数据集的在线处理更加有效。常用于减少分区
第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false
当需要调大分区时,必须设置shuffle为true,才能有效,否则分区数不变

Spark算子 - 图9
随机重新shuffle RDD中的数据,并创建numPartitions个分区。这个操作总会通过网络来shuffle全部数据。常用于扩大分区
分区数调大调小,都会shuffle全部数据,是重量级算子
使用coalesce重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,否则,分区数不变,设置无效
所以不需要shuffle,或者分区数调小时,或者block块小时,首选coalesce,需要shuffle时,首选repartition
val rdd1 = sc.parallelize(1 to 10, 10)
val rdd2 = rdd1.coalesce(2, false)
rdd2.partitions.length

实际工作中怎么用呢? coalesce(10,true)=repartition(10)
如果说想减少分区数量: coalesce(num)
需要重新的shuffle,那么就使用repartition(10) 扩大分区,也用这个
为什么扩大分区呢?
提升任务的并行度

mapPartitions和mapPartitionsWithIndex
def mapPartitionsU => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。
如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效。
该方法,看上去是操作的每一条数据,实际上是对RDD中的每一个分区进行ieterator,

mapPartitions( it: Iterator => {it.map(x => x * 10)})

mapPartitionsWithIndex
def mapPartitionsWithIndexU => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
类似于mapPartitions, 不过提供了两个参数,第一个参数为分区的索引。
mapPartitionsWithIndex的func接受两个参数,第一个参数是分区的索引,第二个是一个数据集分区的迭代器。而输出的是一个包含经过该函数转换的迭代器。

val func = (index: Int, iter: Iterator[Int]) => {
iter.map(x => “[partID:” + index + “, val: “ + x + “]”)
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func).collect

2.5 RDD的属性

Spark算子 - 图10

  1. 一组分片列表(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

  2. 一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

  3. RDD之间的依赖关系列表。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

  4. 一个Partitioner,即RDD的分区函数。当前Spark中实现了两种类型的分区函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分区数量,也决定了parent RDD Shuffle输出时的分区数量。

  5. 一个数据存储列表,存储每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。