spark有哪些组件?

答:主要有如下组件:

  1. master:管理集群和节点, 不参与计算。
  2. worker:计算节点, 进程本身不参与计算, 和master汇报。
  3. Driver:运行程序的main方法, 创建spark context对象。
  4. spark context:控制整个application的生命周期, 包括dag shedule r和task scheduler等组件。
  5. client:用户提交程序的入口。

对于Spark中的数据倾斜问题你有什么好的方案?

  1. 前提是定位数据倾斜, 是OOM了, 还是任务执行缓慢, 看日志, 看Web Ul

2)解决方法,有多个方面

·避免不必要的shuffle, 如使用广播小表的方式, 将reduce-side-join提升为map-side-join

·分拆发生数据倾斜的记录, 分成几个部分进行, 然后合并join后的结果

·改变并行度, 可能并行度太少了, 导致个别task数据压力大

·两阶段聚合,先局部聚合,再全局聚合

·自定义paritioner, 分散key的分布, 使其更加均匀

RDD创建有哪几种方式?

  1. .使用程序中的集合创建rdd
  2. .使用本地文件系统创建rdd
  3. .使用hdfs创建rdd,
  4. .基于数据库db创建rdd
  5. .基于No sql创建rdd, 如hbase
  6. .基于s3创建rdd,
  7. .基于数据流, 如socket创建rdd 如果只回答了前面三种,是不够的,只能说明你的水平还是入门级的,实践过程中有很多种创建方式。

Spark并行度怎么设置比较合适

答:spark并行度, 每个core承载2~4个partition, 如, 32个core, 那么64~128之间的并行度, 也就是

设置64~128个partion, 并行读和数据规模无关, 只和内存使用量和cpu使用

时间有关

Spark中数据的位置是被谁管理的?

答:每个数据分片都对应具体物理位置, 数据的位置是被block Manager, 无论数据是在磁盘, 内存还是tacyan, 都是由block Manager管理

介绍一下cogroup rdd实现原理, 你在什么场景下用过这个rdd?

答:cogroup的函数实现:这个实现根据两个要进行合并的两个RDD操作, 生成一个Co Grouped RDD的实例, 这个RDD的返回结果是 把相同的key中两个RDD分别进行合并操作, 最后返回的RDD的value是一个Pair的实例, 这个实例包含两个Iterable的值, 第一个值表 示的是RDD 1中相同KEY的值, 第二个值表示的是RDD 2中相同key的值.由于做co group的操作, 需要通过partitioner进行重新分区的 操作, 因此, 执行这个流程时, 需要执行一次shuffle的操作(如果要进行合并的两个RDD的都已经是shuffle后的rdd, 同时他们对应的 partitioner相同时, 就不需要执行shuffle.

场景:表关联查询

Spark应用程序的执行过程是什么?

  1. 构建Spark Application的运行环境(启动Spark Context) , Spark Context向资源管理器(可以是Standalone、Mesos或 YARN) 注册并申请运行Executor资源;
  2. .资源管理器分配Executor资源并启动Standalone Executor Backend, Executor运行情况将随着心跳发送到资源管理器上;
  3. .Spark Context构建成DAG图, 将DAG图分解成Stage, 并把Task set发送给Task Scheduler。Executor向Spark Context申请 Task, Task Scheduler将Task发放给Executor运行同时Spark Context将应用程序代码发放给Executor。
  4. .Task在Executor上运行, 运行完毕释放所有资源。

hbase预分区个数和spark过程中的reduce个数相同么

答:和spark的map个数相同, reduce个数如果没有设置和reduce前的map数相同。

如何理解Standalone模式下, Spark资源分配是粗粒度的?

答:spark默认情况下资源分配是粗粒度的, 也就是说程序在提交时就分配好资源, 后面执行的时候

使用分配好的资源, 除非资源出现了故障才会重新分配。比如Spark shell启动, 已提交, 一注册, 哪怕没有任务, worker都会分配资源给executor。

Spark如何自定义partitioner分区器?

答:

  1. spark默认实现了Hash Partitioner和Range Partitioner两种分区策略, 我们也可以自己扩展分区策略, 自定义分区器的时 候继承org.apache.spark.Partitioner类, 实现类中的三个方法

def numPartitions:Int:这个方法需要返回你想要创建分区的个数;

def getPartition(key:Any) :Int:这个函数需要对输入的key做计算, 然后返回该key的分区ID, 范围一定是0到num Partitions- 1;

equals) :这个是Java标准的判断相等的函数, 之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。

  1. 使用, 调用partion By方法中传入自定义分区对象

参考:http://blog.csdn.net/high2011/article/details/68491115

Spark中的Application、Job、Stage与Task区别

RDD任务切分中间分为:Application、Job、Stage和Task

(1)Application:初始化一个SparkContext即生成一个Application;

(2)Job:一个Action算子就会生成一个Job;

(3)Stage:Stage等于宽依赖的个数加1;

(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

hive中order by、distribute by、sort by和cluster by的区别和联系

order by

order by 会对数据进行全局排序,和oracle和mysql等数据库中的order by 效果一样,它只在一个reduce中进行所以数据量特别大的时候效率非常低。

而且当设置 :set hive.mapred.mode=strict的时候不指定limit,执行select会报错,如下:

LIMIT must also be specified。

sort by

sort by 是单独在各自的reduce中进行排序,所以并不能保证全局有序,一般和distribute by 一起执行,而且distribute by 要写在sort by前面。

如果mapred.reduce.tasks=1和order by效果一样,如果大于1会分成几个文件输出每个文件会按照指定的字段排序,而不保证全局有序。

sort by 不受 hive.mapred.mode 是否为strict ,nostrict 的影响。

distribute by

DISTRIBUTE BY 控制map 中的输出在 reducer 中是如何进行划分的。使用DISTRIBUTE BY 可以保证相同KEY的记录被划分到一个Reduce 中。

cluster by

distribute by 和 sort by 合用就相当于cluster by,但是cluster by 不能指定排序为asc或 desc 的规则,只能是升序排列。

streaming消费kafka的两种方式Receiver/Direct优缺点

Receiver方式:

Receiver从Kafka中获取数据都是存储在Spark Executor内存中的,然后Spark Streaming启动的job会去处理那些数据。
优点:操作简单方便,不用自己管理offset。
缺点:各方面都不如Direct方式。

Direct方式:

它会周期性的查询kafka,来获取每个topic + partition的最新offset,从而定义每一个batch的offset的范围。当处理数据的job启动时,就会使用kafka简单的消费者API来获取kafka指定offset的范围的数据。

优点:
1)它简化了并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对他们进行union操作。Spark会创建跟kafka partition一样多的RDD partition,并且会并行从kafka中读取数据。所以在kafka partition和RDD partition之间有一个一一对应的映射关系
2)高性能:如果要保证数据零丢失,基于Receiver的机制需要开启WAL机制,这种方式其实很低效,因为数据实际上被copy了2分,kafka自己本身就有可靠的机制,会对数据复制一份,而这里又复制一份到WAL中。基于Direct的方式,不依赖于Receiver,不需要开启WAL机制,只要kafka中做了数据的复制,那么就可以通过kafka的副本进行恢复。
3)一次仅且一次的事务机制
基于Receiver的方式,是使用Kafka High Level的API在zookeeper中保存消费过的offset的。这是消费kafka数据的传统方式,这种方式配合这WAL机制可以保证数据零丢失,但是无法保证数据只被处理一次的且仅且一次,可能会两次或者更多,因为spark和zookeeper可能是不同步的。
4)降低资源
Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
5)降低内存
Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。
6)不会出现数据堆积
Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。
缺点:需要自己管理offset,相对更麻烦复杂。