1 spark集群各角色分析
Master进程
Worker进程
当我们提交spark任务的时候(spark-shell ,spark-submit)
生成了一个Applications,而且会占用所有WOrker的cores,每一个占用了1g内存。
通过jps命令,可以查看到
在执行spark-submit的节点上,有spark-submit(dirver)进程,
在任务执行的节点上(worker节点上),有 CoarseGrainedExecutorBackend(executor) 进程。
然后,当我们的任务执行完毕之后,这两个进程都会退出了。
—executor-cores 单独指定每一个executor使用的cores。
—name 指定任务运行的名称
—jars 可以指定使用第三方的jar包
如果使用配置—executor-cores,超过了每个worker可以的cores,任务处于等待状态。
如果使用—total- executor-cores ,即使超过可以的cores,默认使用所有的。
可以使用的所有的参数:
2 弹性分布式数据集RDD
2.1 RDD概述
RDD论文,中文版 :http://spark.apachecn.org/paper/zh/spark-rdd.html
2.1.1 产生背景
为了解决开发人员能在大规模的集群中以一种容错的方式进行内存计算,提出了RDD的概念,而当前的很多框架对迭代式算法场景与交互性数据挖掘场景的处理性能非常差, 这个是 RDDs 的提出的动机。
2.1.2 什么是RDD
RDD是Spark的计算模型。RDD(Resilient Distributed Dataset)叫做分布式数据集合,是Spark中最基本的数据抽象,它代表一个不可变、只读的,被分区的数据集。
RDDs 并不要始终被具体化, 一个 RDD 有足够的信息知道自己是从哪个数据集计算而来的(就是所谓的依赖血统)
用户可以控制RDD的两个方面:数据存储和分区。对于需要复用的RDD,用户可以明确的选择一种数据存储策略(比如:内存缓存,内存+磁盘等),也可以基于一个元素的key来为RDD所有的元素在机器节点间进行数据分区。
RDD是spark中的一个基本抽象(可以理解为代理)
有了RDD,就可以像操作本地的集合一样,有很多的方法可以调用,使用方便,而无需关心底层的调度细节。
2.1.3 RDD操作类型
基于RDD的所有的操作,可以分为两类
RDD支持两种类型的操作:
转换(Transformation) 现有的RDD通过转换生成一个新的RDD。lazy模式,延迟执行。
转换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join, coalesce等等。
动作(Action) 在RDD上运行计算,并返回结果给驱动程序(Driver)或写入文件系统。
动作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。
collect ,把数据收集为本地集合Array driver
saveAsTextFile -🡪 存储到hdfs中
RDD操作流程示意:
RDD的转换与操作
wordcount示例,查看lazy特性。
只有在执行action时,才会真正开始运算,并得到结果或存入文件中。
2.2 创建RDD
- 集合并行化创建(通过scala集合创建)
val arr = Array(1 to 10)
// val arr = Array(1,2,3,4,5)
val rdd = sc.parallelize(arr)
val rdd =sc.makeRDD(arr)
通过集合并行化方式创建RDD,适用于本地测试,做实验
- 外部文件系统,比如HDFS等
val rdd2 = sc.textFile(“hdfs://hdp-01:9000/words.txt”)
// 读取本地文件
val rdd2 = sc.textFile(“file:///root/words.txt”)
最常用的方式。
3)从父RDD转换成新的子RDD
Transformation类的方法
2.3 RDD的分区
rdd中和文件切片相关的概念叫做分区,也就是说对rdd进行操作,实际上是操作的rdd中的每一个分区,分区的数量决定了并行的数量。
使用rdd.partitions.size或者rdd.partitions.length查看分区数量。
使用并行化方式创建RDD,RDD的分区数量和指定的核数是相同的(—total-executor-cores)
如果从外部创建RDD,比如从hdfs中读取数据, 正常情况下,分区的数量是和我们读取的文件的block块数量是一致的,但是如果只有一个block 块,那么分区数量是2。 也就是说最低的分区数量是2。
也就是,最小的分区数量是2。
# data.partitions.size
使用集合并行化方式创建RDD:
默认的分区数量,是由启动任务所分配的cores来决定的。—total-executor-cores 2
可以任意的增加或者减少分区数量。
val data = sc.makeRDD(Array(1,2,4,5,6,8,8),1)
读取hdfs上的数据的时候,是lazy执行的,所以RDD中是没有数据的。
但是,如果是通过集合并行化方式创建的RDD,rdd中是有数据的。
2.4 RDD编程API
2.4.1 算子综述
算子是RDD中定义的方法,分为转换(transformantion)和动作(action)。Tranformation算子并不会触发Spark提交作业,直至Action算子才提交任务执行,这是一个延迟计算的设计技巧,可以避免内存过快被中间计算占满,从而提高内存的利用率。
RDD拥有的操作比MR丰富的多,不仅仅包括Map、Reduce操作,还包括filter、sort、join、save、count等操作,并且中间结果不需要保存,所以Spark比MR更容易方便完成更复杂的任务。
2.4.2 Transformation
RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
对RDD中的元素执行的操作,实际上就是对RDD中的每一个分区的数据进行操作,不需要关注数据在哪个分区中。
可以使用reduceBykey的地方,坚决不用groupByKey。
效率不一样。
常用的Transformation:
转换 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 先map,再flatten压平 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
subtract(otherDataset) | 求差集后返回新的RDD,出现在源rdd中,不在otherrdd中 |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable |
cartesian(otherDataset) | 笛卡尔积 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 针对分区内部使用seqOp方法,针对最后的结果使用combOp方法。 |
coalesce(numPartitions) | 用于对RDD进行重新分区,第一个参数是分区的数量,第二个参数是是否进行shuffle,可不传,默认不shuffle |
repartition(numPartitions) | 用于对RDD进行重新分区,相当于shuffle版的calesce |
repartitionAndSortWithinPartitions(partitioner) | 在分区中按照key进行排序,这种方式比先分区再sort更高效,因为相当于在shuffle阶段就进行排序。 |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是 (Int, Interator[T]) => Iterator[U] |
2.4.3 Action
动作 | 含义 |
---|---|
reduce(func) | 通过func函数聚集RDD中的所有元素 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
collectAsMap | 类似于collect。该函数用于Pair RDD,最终返回Map类型的结果。 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统 |
Top(n) | 按照默认排序(降序) 取数据 |
takeOrdered(n, [ordering]) | 与top类似,顺序相反 默认是升序 |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func进行更新。foreach,任务在executor中运行,打印信息也会在executor中显示 |
foreachPartition | 对分区进行操作 |
foreach和foreachPartition方法中,执行的打印内容会在executor的日志中显示出来。
foreach是迭代每一个元素,foreachPartition迭代每一个分区。
和map,mapPartitions一样。
map有返回值,foreach的返回值是Unit
coalesce和repartition
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
将RDD分区的数量修改为numPartitions,对于经过过滤后的大数据集的在线处理更加有效。常用于减少分区
第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false
当需要调大分区时,必须设置shuffle为true,才能有效,否则分区数不变
随机重新shuffle RDD中的数据,并创建numPartitions个分区。这个操作总会通过网络来shuffle全部数据。常用于扩大分区
分区数调大调小,都会shuffle全部数据,是重量级算子
使用coalesce重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,否则,分区数不变,设置无效
所以不需要shuffle,或者分区数调小时,或者block块小时,首选coalesce,需要shuffle时,首选repartition
val rdd1 = sc.parallelize(1 to 10, 10)
val rdd2 = rdd1.coalesce(2, false)
rdd2.partitions.length
实际工作中怎么用呢? coalesce(10,true)=repartition(10)
如果说想减少分区数量,数据不需要shuffle: coalesce(num)
数据需要重新的shuffle,那么就使用repartition(10)
扩大分区,作用:提升并行度(业务逻辑比较复杂,需要提升并行度)
mapPartitions和mapPartitionsWithIndex
def mapPartitionsU => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。
如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效。
该方法,看上去是操作的每一条数据,实际上是对RDD中的每一个分区进行ieterator,
mapPartitions( it: Iterator => {it.map(x => x * 10)})
mapPartitionsWithIndex
def mapPartitionsWithIndexU => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
类似于mapPartitions, 不过提供了两个参数,第一个参数为分区的索引。
mapPartitionsWithIndex的func接受两个参数,第一个参数是分区的索引,第二个是一个数据集分区的迭代器。而输出的是一个包含经过该函数转换的迭代器。
val func = (index: Int, iter: Iterator[Int]) => {
iter.map(x => “[partID:” + index + “, val: “ + x + “]”)
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1.mapPartitionsWithIndex(func).collect
2.5 RDD的属性
一组分片列表(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
RDD之间的依赖关系列表。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
可选,一个Partitioner,即RDD的分区函数。当前Spark中实现了两种类型的分区函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分区数量,也决定了parent RDD Shuffle输出时的分区数量。
一个数据存储列表,存储每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
以MapPartitionsRDD类为例,其compute方法如下
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
MapPartitionsRDD类的compute方法调用当前RDD内的第一个父RDD的iterator方法,该方法的目的是拉取父RDD对应分区的数据,iterator方法会返回一个迭代器对象,迭代器内部存储的每一个元素即父RDD对应分区内的数据记录。
RDD的粗粒度转换体现在map方法上,f函数是map转换操作函数,RDD会对一个分区(而不是一条一条数据记录)内的数据执行单的的操作f,最终返回包含所有经过转换过的数据记录的新迭代器,即新的分区。
其他RDD子类的compute方法与之类似,在需要用用到父RDD的分区数据时,就会调用iterator方法,然后根据需求在得到的数据上执行相应的操作。换句话说,compute函数负责的是父RDD分区数据到子RDD分区数据的变换逻辑。
Driver:负责提交任务
Master:负责管理Worker,并接受driver端的请求,然后调用worker启动executor
Worker:负责管理自己机器上的executor,向master发送心跳。
worker能启动多少的executor:取决于worker的资源,以及任务要求的资源。
eg:任务要求,每个executor要有1g内存,2个cores(—executor-cores)
worker有4个cores,2g内存,那么,该worker就能启动2个executor。
如果使用了—total-executor-cores ,每个worker会启动一个executor,资源是平均分配的。
Executor:任务最终会运行在executor中,executor可以理解为一个容器,
最终的执行任务的小弟,叫做Task。就是说task是运行在executor中。
executor能够并行执行的最大的Task数量,取决于我们executor有多少的cores。 每一个cores能运行一个task。
分阶段
Stages 一旦遇到shuffle的转换算子,都会进行切分阶段。(切分stage)
Task:
是任务执行的最小单位,有几个分区,就有几个task
task的总数 = 分区的数量 * 阶段的数量
eg: 3个分区,2个阶段,一个有6个task
task拥有的特性:
同一个阶段的所有task,执行的业务逻辑是一样的,但是处理的数据是不一样的。
如果有12个分区 : 每一个分区的数据,是固定的,
3个executor,4cores 1g
每阶段的task数量:12 12可以同时并行
如果超过最大的并行task数量呢?
等待,一旦有task执行完成,就会进行剩下分配数据的处理。
Application:应用程序对象
当我们提交一个spark-submit或者spark-shell时,就会启动一个Application。
(提交Application的地方,一般称作为Driver)
Job
算子有两类,transformation和action,每执行一次action,就会生成一个job。
有一类特殊的transformation: shuffle
凡是涉及到对数据进行跨分区传输的。
数据重新分区
groupBy
groupByKey
reduceByKey
aggregateByKey
可以在spark-shell中查看某一个rdd所有依赖的rdd(rdd的依赖关系图)
实际测试:
executor,3个,1g内存,4个cores
Spark任务提供了多层分解的概念
Application(应用程序):Spark Application的概念和Hadoop MapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver 功能的代码和分布在集群中多个节点上运行的Executor代码
Job(作业):包含多个Task组成的并行计算,一个JOB包含多个RDD及作用于相应RDD上的各种Operation,一个Application可以包含多个Job,每个Job由Action操作触发
Stage(阶段):每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet(一组关联的相互之间没有shuffle依赖关系的task组成,他们处理的数据输入不同),一个Job会根据RDD之间的依赖关系被划分为多个Stage,每个Stage中只存有RDD之间的窄依赖,即Transformation算子。
Task(任务): 被送到某个Executor上的工作任务,任务最小的工作单位