- spark有哪些组件?
- 对于Spark中的数据倾斜问题你有什么好的方案?
- RDD创建有哪几种方式?
- Spark并行度怎么设置比较合适
- Spark中数据的位置是被谁管理的?
- 介绍一下cogroup rdd实现原理, 你在什么场景下用过这个rdd?
- Spark应用程序的执行过程是什么?
- hbase预分区个数和spark过程中的reduce个数相同么
- 如何理解Standalone模式下, Spark资源分配是粗粒度的?
- Spark如何自定义partitioner分区器?
- Spark中的Application、Job、Stage与Task区别
- hive中order by、distribute by、sort by和cluster by的区别和联系
- streaming消费kafka的两种方式Receiver/Direct优缺点
spark有哪些组件?
答:主要有如下组件:
- master:管理集群和节点, 不参与计算。
- worker:计算节点, 进程本身不参与计算, 和master汇报。
- Driver:运行程序的main方法, 创建spark context对象。
- spark context:控制整个application的生命周期, 包括dag shedule r和task scheduler等组件。
- client:用户提交程序的入口。
对于Spark中的数据倾斜问题你有什么好的方案?
- 前提是定位数据倾斜, 是OOM了, 还是任务执行缓慢, 看日志, 看Web Ul
2)解决方法,有多个方面
·避免不必要的shuffle, 如使用广播小表的方式, 将reduce-side-join提升为map-side-join
·分拆发生数据倾斜的记录, 分成几个部分进行, 然后合并join后的结果
·改变并行度, 可能并行度太少了, 导致个别task数据压力大
·两阶段聚合,先局部聚合,再全局聚合
·自定义paritioner, 分散key的分布, 使其更加均匀
RDD创建有哪几种方式?
- .使用程序中的集合创建rdd
- .使用本地文件系统创建rdd
- .使用hdfs创建rdd,
- .基于数据库db创建rdd
- .基于No sql创建rdd, 如hbase
- .基于s3创建rdd,
- .基于数据流, 如socket创建rdd 如果只回答了前面三种,是不够的,只能说明你的水平还是入门级的,实践过程中有很多种创建方式。
Spark并行度怎么设置比较合适
答:spark并行度, 每个core承载2~4个partition, 如, 32个core, 那么64~128之间的并行度, 也就是
设置64~128个partion, 并行读和数据规模无关, 只和内存使用量和cpu使用
时间有关
Spark中数据的位置是被谁管理的?
答:每个数据分片都对应具体物理位置, 数据的位置是被block Manager, 无论数据是在磁盘, 内存还是tacyan, 都是由block Manager管理
介绍一下cogroup rdd实现原理, 你在什么场景下用过这个rdd?
答:cogroup的函数实现:这个实现根据两个要进行合并的两个RDD操作, 生成一个Co Grouped RDD的实例, 这个RDD的返回结果是 把相同的key中两个RDD分别进行合并操作, 最后返回的RDD的value是一个Pair的实例, 这个实例包含两个Iterable的值, 第一个值表 示的是RDD 1中相同KEY的值, 第二个值表示的是RDD 2中相同key的值.由于做co group的操作, 需要通过partitioner进行重新分区的 操作, 因此, 执行这个流程时, 需要执行一次shuffle的操作(如果要进行合并的两个RDD的都已经是shuffle后的rdd, 同时他们对应的 partitioner相同时, 就不需要执行shuffle.
场景:表关联查询
Spark应用程序的执行过程是什么?
- 构建Spark Application的运行环境(启动Spark Context) , Spark Context向资源管理器(可以是Standalone、Mesos或 YARN) 注册并申请运行Executor资源;
- .资源管理器分配Executor资源并启动Standalone Executor Backend, Executor运行情况将随着心跳发送到资源管理器上;
- .Spark Context构建成DAG图, 将DAG图分解成Stage, 并把Task set发送给Task Scheduler。Executor向Spark Context申请 Task, Task Scheduler将Task发放给Executor运行同时Spark Context将应用程序代码发放给Executor。
- .Task在Executor上运行, 运行完毕释放所有资源。
hbase预分区个数和spark过程中的reduce个数相同么
答:和spark的map个数相同, reduce个数如果没有设置和reduce前的map数相同。
如何理解Standalone模式下, Spark资源分配是粗粒度的?
答:spark默认情况下资源分配是粗粒度的, 也就是说程序在提交时就分配好资源, 后面执行的时候
使用分配好的资源, 除非资源出现了故障才会重新分配。比如Spark shell启动, 已提交, 一注册, 哪怕没有任务, worker都会分配资源给executor。
Spark如何自定义partitioner分区器?
答:
- spark默认实现了Hash Partitioner和Range Partitioner两种分区策略, 我们也可以自己扩展分区策略, 自定义分区器的时 候继承org.apache.spark.Partitioner类, 实现类中的三个方法
def numPartitions:Int:这个方法需要返回你想要创建分区的个数;
def getPartition(key:Any) :Int:这个函数需要对输入的key做计算, 然后返回该key的分区ID, 范围一定是0到num Partitions- 1;
equals) :这个是Java标准的判断相等的函数, 之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。
- 使用, 调用partion By方法中传入自定义分区对象
参考:http://blog.csdn.net/high2011/article/details/68491115
Spark中的Application、Job、Stage与Task区别
RDD任务切分中间分为:Application、Job、Stage和Task
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
hive中order by、distribute by、sort by和cluster by的区别和联系
order by
order by 会对数据进行全局排序,和oracle和mysql等数据库中的order by 效果一样,它只在一个reduce中进行所以数据量特别大的时候效率非常低。
而且当设置 :set hive.mapred.mode=strict的时候不指定limit,执行select会报错,如下:
LIMIT must also be specified。
sort by
sort by 是单独在各自的reduce中进行排序,所以并不能保证全局有序,一般和distribute by 一起执行,而且distribute by 要写在sort by前面。
如果mapred.reduce.tasks=1和order by效果一样,如果大于1会分成几个文件输出每个文件会按照指定的字段排序,而不保证全局有序。
sort by 不受 hive.mapred.mode 是否为strict ,nostrict 的影响。
distribute by
DISTRIBUTE BY 控制map 中的输出在 reducer 中是如何进行划分的。使用DISTRIBUTE BY 可以保证相同KEY的记录被划分到一个Reduce 中。
cluster by
distribute by 和 sort by 合用就相当于cluster by,但是cluster by 不能指定排序为asc或 desc 的规则,只能是升序排列。
streaming消费kafka的两种方式Receiver/Direct优缺点
Receiver方式:
Receiver从Kafka中获取数据都是存储在Spark Executor内存中的,然后Spark Streaming启动的job会去处理那些数据。
优点:操作简单方便,不用自己管理offset。
缺点:各方面都不如Direct方式。
Direct方式:
它会周期性的查询kafka,来获取每个topic + partition的最新offset,从而定义每一个batch的offset的范围。当处理数据的job启动时,就会使用kafka简单的消费者API来获取kafka指定offset的范围的数据。
优点:
1)它简化了并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对他们进行union操作。Spark会创建跟kafka partition一样多的RDD partition,并且会并行从kafka中读取数据。所以在kafka partition和RDD partition之间有一个一一对应的映射关系。
2)高性能:如果要保证数据零丢失,基于Receiver的机制需要开启WAL机制,这种方式其实很低效,因为数据实际上被copy了2分,kafka自己本身就有可靠的机制,会对数据复制一份,而这里又复制一份到WAL中。基于Direct的方式,不依赖于Receiver,不需要开启WAL机制,只要kafka中做了数据的复制,那么就可以通过kafka的副本进行恢复。
3)一次仅且一次的事务机制
基于Receiver的方式,是使用Kafka High Level的API在zookeeper中保存消费过的offset的。这是消费kafka数据的传统方式,这种方式配合这WAL机制可以保证数据零丢失,但是无法保证数据只被处理一次的且仅且一次,可能会两次或者更多,因为spark和zookeeper可能是不同步的。
4)降低资源
Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
5)降低内存
Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。
6)不会出现数据堆积
Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。
缺点:需要自己管理offset,相对更麻烦复杂。