var、val、def三个关键字之间的区别;伴生类与伴生对象;

var用来声明变量;val用来声明常量;def用来创建方法。
伴生类和伴生对象同名且存在同一个文件中,二者互为伴生,可以相互访问私有成员。

case class是什么,与case object的区别是什么?

样例类是scala中特殊的类,具有以下特点:
1.构造器中的参数默认是val修饰的常量,除非被显示为var
2.提供apply方法,不用new关键字就可以构造出相应的对象
3.提供unapply方法,使得样例类可以使用模式匹配
4.生成tostring,equals,hashcode和copy方法。
5.继承了Product和Serializable,即已实现了序列化方法和可以应用Product的方法。
区别:类中有参数的时候用case class,无对象用case object

Spark为什么快,Spark Sql一定比Hive快吗?

Spark和MR相比,更加积极的使用内存,计算时减少数据落地到磁盘。

  • MapReduce会将计算的中间结果落地到磁盘中,获取中间结果又会读取磁盘,会有大量的磁盘IO。
  • MapReduce采用的是多进程模型,而Spark采用的是多线程模型。MR中的map和reduce都是单独的进程,每次启动都要重新申请资源,消耗时间,而Spark则是维护着一个线程池,复用线程,因此减少了启动和关闭task的开销。

    1. MR在没有中间结果落地的情况下就不一定慢于Spark SqlSpark快是在基于内存的可迭代计算。

    描述一下你对Rdd的理解

    Rdd是spark提供的最重要的抽象概念,是一种有容错机制的特殊数据集合,可以分布在集群的节点上,以函数式操作集合的方式进行各种并行操作。
    Rdd是只读,不可变的,具有如下的性质:

  • 只读的:只能通过转换操作生成新的Rdd

  • 分布式:可以分布式在多台节点上并行处理
  • 弹性: 计算过程中内存不足,它会和磁盘进行数据交换
  • 基于内存:可以全部或者部分缓存在内存中进行重复使用,避免重复的计算

    描述以下算子的区别和联系:

    groupByKey、reduceByKey、aggreageByKey

    相同:作用在key-value Rdd上,对相同key的数据进行聚合,都会有shuffle
    不同:

    - groupByKey:没有map段的预聚合combiner,所以会有大数据量的shuffle过程,效率低,不建议使用;
    - reduceByKey、aggreageByKey:有map端的combiner,shuffle过程中数据量小效率高。
    - reduceByKey:不能定义初值,初值类型和Rdd元素类型必须一致;分区内和分区间聚合使用完全相同的算法。
    - aggreageByKey:最灵活也最复杂,可以定义初值,初值类型可以是任何类型,分区间和分区内的算法可以不相同。
    

    cache、persist

    相同:都是操作算子,都是实现RDD的持久化
    不同:cache是通过persist实现的,默认的缓存级别是MEMORY_ONLY,而persist有多种缓存级别

    repartition、coalesce

    相同:都是对RDD进行分区操作。
    不同:repartition可以减少和增加分区,coalesce只能减少分区,coalesce由repartition实现。

    map、flatMap

    相同:都是对每个元素进行操作
    不同:flatMap相当于map+flatten操作的结合,遍历每个元素,并将元素压平

    简述Spark中的缓存机制与checkpoint机制,说明两者的区别与联系

    spark中的缓存机制目的是缓存中间结果在内存或者磁盘,避免中间结果的重复计算,节省时间和资源。但是缓存机制会保存RDD之间的依赖链(血缘关系),如果cache丢失,可以根据依赖链重新计算得到。
    checkpoint机制的目的是容错。直接将RDD存在HDFS上,斩断了血缘关系。

    RDD、DataFrame、DataSet三者的区别与联系

    相同:

  • 三者都是分布式弹性数据集

  • 三者有许多共同的函数,都有partition的概念;都支持持久化
  • 三者都有惰性机制,在创建和转换时都不会立即执行,只有遇到Action时才会触发计算。

不同:

  • DataFrame相比RDD,更向是传统数据库的二维表格,除了数据外,还记录数据的结构信息,即schema;且Dataframe具有更加友好的API,使用门槛更低。
  • DataSet相对DataFrame来说是一个新的拓展,具有类型安全检查,也具有Dataframe查询优化的特性;支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。

    介绍Spark核心组件及功能

    Master:集群中的管理节点,管理集群中的资源,通知worker启动Driver或者Executor
    Worker:集群中的工作节点,负责管理当前节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver或者Executor。
    Driver:执行spark应用中的main方法,负责实际代码的执行工作。主要任务:

    • 负责向集群申请资源,向master注册信息。
    • Executor启动后向Driver反向注册
    • 负责作业的解析,生成stage并调度Task到Executor上
    • 监控Task的执行情况,执行完毕后释放资源
    • 通知Master注销应用程序

Executor:是一个JVM进程,负责执行具体的Task。Executor伴随着spark任务启动而启动,其生命周期伴随着整个spark的应用;如果Executor发生了故障或者崩溃,会将出错节点上的任务调度到其他Executor节点继续运行。

  • 负责运行spark应用的task,并将结果返回到Driver进程。
  • 通过自身的block manage为应用程序缓存RDD

什么是迭代计算,迭代与循环的区别是什么?

迭代一定是循环,但是循环不一定是迭代;
迭代计算的基本思想是将输出作为输入,再次进行计算处理。比如说KMeans中对中心点的计算就是迭代;
循环就是一段代码的重复执行;

MapReduce和Spark都是并行计算,它们之间的区别是什么?

MapReduce不支持迭代计算,其中mapTask和ReduceTask都是独立的进程,启动需要重新拉取资源。
Spark支持迭代计算,其中spark的Application会存在多个Job,每个job都会有多种算子,形成不同的Stage,其中最小的执行单元是Task,Task在Executor上,Executor是和Application相同的生命周期,即只会读取一次数据,分配一次资源,可重复使用。
Spark基于内存计算,可将计算中间结果存放在内存中,且算子种类多,表达能力强;

Spark的算子可以分哪两类,这两类算子的区别是什么,分别列举6个这两类算子,列举6个会产生Shuffle的算子
可以分为两类:操作算子(Transformation)、行动算子(Action)
区别:
操作算子是懒执行的,行动算子触发执行;
操作算子是生成中间结果,是转换动作;行动算子可以是返回程序结果或者存储数据;
分别列举6个:
操作算子:map、mapValue、mapPartitions、flatMap、filter、Join、uoin、reduceByKey
行动算子:count、collect、first、reduce、saveAsTextFile、aggregate
列举6个会产生shuffle的算子:

  • 一堆的xxxByKey(sortBykey、reduceBykey、groupByKey、aggreagebykey)
  • join相关:join、leftOuterJoin、rightOuterJoin、fullOuterJoin
  • distinct、intersection、repartition、partitionBy、subtract

简述Spark中共享变量(广播变量和累加器)的基本原理和用途

基本原理:
广播变量:将大的变量RDD副本由Driver发送到Executor上,给其中的Task共用,避免每个Task都去拉取数据;
累加器:Driver发送到Executor的共享变量,其上的Task共同累加;
用途:
广播变量可以用来优化spark程序,累加器可用在SparkStream上对数进行累加;

Spark提交作业的参数

  • executor-cores:建议2~5个
  • executor-memory:和executor-cores的比例建议是2:1到4:1之间;单个Executor内存大小一般在20G左右(经验值),单个JVM内存太高易导致GC代价过高,或者浪费资源
  • num-executors:executors的数量,这和Task的并行度有直接关系;一般executos-cores*num-executos表示的是能够并行执行Task的数量,不宜太小或者太大,理想情况下一般给每个core分配2-3个Task,知道想分配的Task总数量可反推出executors的数量;
  • driver-memory:不需要做任何计算和存储,只是下发任务,一般设置1-2g

Spark的宽窄依赖,以及Spark如何划分stage,如何确定每个stage中task个数

Spark的RDD之间的依赖关系分为宽依赖和窄依赖;
窄依赖:父RDD和子RDD是多对一或者一对一的关系;
宽依赖:父RDD会被多个子RDD所依赖,存在Shuffle;
Stage:根据RDD之间的依赖关系将Job划分为不同的Stage,遇到一个宽依赖则划分一个Stage
如何确定Stage中的Task个数:Stage本身就是一个TaskSet,将Stage根据分区数划分为一个个的Task;

如何理解Spark中的lineage

每个RDD中都会记录其lineage,这是Spark中的容错机制,任意一个Rdd的分区出错或者不可用,都是利用原始输入数据通过转换操作而重新算出的;
对于窄依赖,只要把丢失的父分区重新算即可,不依赖其他分区;
对于宽依赖,父类RDD的所有分区都需要重算,代价昂贵;

Scala类型系统中Null、Nothing、Nil、None、Unit的区别

  • Null是一个trait,是引用类型AnyRef的一个子类型,null是其唯一的实例;
  • Nothing也是一个trait,是类型any(包括值类型和引用类型)的子类型,它没有子类型,也没有实例。
  • Nil代表一个List的空类型,等同List[Nothing]
  • None是Option的空标识
  • Unit代表没有任何意义的值类型,类似于void,是AnyVal的子类型

说说你对spark分区器的理解

前提是只有key-value类型的Rdd才可能存在分区器,因为是以key进行分区的;
spark分区器是直接决定了RDD中分区的个数、RDD的每条数据经过shuffle过程,分到分区;
spark中存在两种分区器:

  • HashPartition(默认分区器):Hash分区器,将key的hashcode对分区个数求余,得到目标分区;

优点就是简单、常用;缺点就是容易造成数据倾斜;

  • RangePartitioner:将一定范围的数映射到某一个分区内,会尽量保证每个分区内的数据量均匀,而且一个分区中任何一个key一定比另一个分区中的任意一个key大或者小;

采用的是水塘抽样法,进行抽样,sortByKey()会使用改分区方法,其中会使用collect(),会触发一次执行;
要求:key类型必须是可以排序的
还可以自定义分区器:继承Partitioner类

Spark Streaming 中有哪些消费Kafka数据的方式,它们之间的区别是什么

有两种方式:
方式一:基于Receiver的方式
kafka0.8.2版本开始的方式,使用的是Kafka高阶消费者API实现的;Receiver将Kafka消费得到的数据存放在Spark Executor的内存中(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后Spark Streaming定期启动job处理这些数据。
默认配置下,底层处理数据失败会丢失数据;可以启用高可靠机制,保证数据的零丢失,但是必须开启WAL。

方式二:基于Direct的方式
在Spark1.3引入,兼容kafak0.10.0,用来替代Receiver方式。使用Kafka低阶消费者API实现;这种方式会周期性地获取topic+partition的最新offset范围,从而定义每个batch的offset范围。
优点如下:
简化并行读取:如果读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,然后并行从kafka中读取数据;即两边的partition是一一对应的。
高性能:基于Receiver需要开启WAL才可以保证数据的零丢失,这种效率很低,因为数据实际上存在多份,kafka中数据存在有多份,复制到HDFS上的数据有多份,Executor中的数据还有一份。
而基于Direct方式,不依赖Receiver,不需要开启WAL,只要复制kafka中的数据进行处理。
支持用户管理offset,支持消息的EOS传递。
两种方式对比:
基于Receiver:使用kafka高阶API实现,在Zookeeper中保存消费过的offset,这是消费kafka数据的传统方式。这种方式配合WAL机制可以保证数据零丢失,但是无法保证数据消费精确一次。
基于Direct的方式,使用kafka低阶API实现,Spark Streaming自己负责追踪消费的offset,并保存在checkpoint中,可以保证数据时消费一次且仅消费一次。

说明yarn-cluster模式作业提交流程,yarn-cluster与yarn-client的区别

yarn-cluster模式作业提交流程:
image.png

  1. client向Resourcemanager提交请求,并上传jar到HDFS上。
  2. RM通知NodeManager启动ApplicationMaster,AppMaster在其中实例化SparkContext,也就是Driver。
  3. AppMaster向RM注册应用,申请资源。RM监控App的运行状态直到结束。
  4. AppMaster申请到资源后,与NM通信,在Container启动Executor进程。
  5. Executor向AppMaster反向注册,申请任务。
  6. Driver监控着Executor执行Task的情况。
  7. 应用执行完毕,AppMaster通知RM注销应用,回收资源。

client和cluster的区别
client模式:

  • Driver运行在客户端
  • 适用于调试,能直接看见各种日志
  • 连接断了,任务就挂了

cluster模式:

  • Driver运行在AppMaster中(运行在集群中)
  • 适合于生产,日志需要登录到某个节点才能看到
  • 客户端连接断了,任务不受影响

介绍一下Spark SQL解析过程

正常的SQL执行会先经过SQL Parser解析SQL,然后经过Catalyst优化器处理,最后到Spark执行。
Catalyst优化的过程又分为很多个过程,其中包括:

  • Analysis:主要利用Catalog信息将Unresolved Logical Plan 解析成Analyzed logical plan
  • Logical Optimizer:利用一些Rule规则将Analyzed logical plan 解析 Optimized Logical plan
  • Physical Planning:前面的logical plan不能被spark执行,而是该过程将 logical plan转换为多个物理执行计划
  • Physical Optimizer:在多个执行计划中选择最佳的physical plan
  • Code Generation:这个过程会吧Sql查询生成java字节码

Spark的动态资源分配

  • 默认情况下,Spark采用资源预分配的方式。即为每个Spark应用设定一个最大可用资源总量,该应用在整个生命周期都会持有这些资源。
  • Spark提供了一种机制,即应用的某些资源不使用的时候,回先返还给集群,并在有用的时候再次请求资源。
  • 动态资源分配是Executor级。

简要介绍Spark的内存管理

Spark内存的管理主要是在Executor中,Driver是主控进程,负责创建spark上下文,负责任务的调度。
Executor内运行的并发任务共享JVM堆内存,按照用途分为:

  • Storage(存储内存):缓存RDD数据和广播变量数据
  • Execution(执行内存):执行Shuffle时占用的内存
  • Other(剩余空间):存储Spark内部的对象实例,和用户定义的Spark应用程序中的对象实例

Spark引入了堆外内存,直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。堆外内存空间的占用可以被精确计算,相比堆内内存来说降低了管理难度。

Spark2.0之前的内存管理是采用静态内存管理机制:

  • Storage:60%
  • Execution:20%
  • Other:20%

堆外内存没有Other,默认Storage:Execution =1:1
由于堆内内存的管理计算是不准确的,Storage和Execution内存都会有预留空间,防止OOM。

Spark2.0之后的内存管理是采用的动态内存管理机制:

  • Storage:60% * 50%
  • Execution:60% * 50%
  • Other:40%

其中Storage和Execution是动态占用的,其中一方空闲会给忙碌的一方使用内存,但是有一点很重要:
Storage占用Execution时,Execution如果需要,Storage会马上将占用的内存返还给Execution,将数据落在磁盘上。但是如果Execution占用Storage,不会马上还,而是等自然释放。

介绍一下你对Spark Shuffle的理解

Spark1.1之前,Spark使用的是Hash Shuffle,Hash Shuffle减少了排序,但会产生海量的中间磁盘文件,大量的磁盘IO操作影响了性能;针对这个问题,Spark后续做了优化,引入了文件合并机制,但Reducer端的并行任务或者数据分片过多的话,仍然会产生大量的小文件。
Spark1.2以后,逐步是引入了Sort Shuffle。在Spark2.0之后,Hash base Shuffle逐渐是退出了历史舞台;
在Spark2.0之后,有三种Shuffle Writer:BypassMeregeSortShuffleWriter、SortShuffleWriter、UnsafeShuffleWriter;使用哪种取决以下条件判断,生产中使用的最多的是SortShuffleWriter。
image.png

谈谈Spark中的容错机制

Spark中很多地方有容错
Master容错
只有在Standlone模式下,才需要进行Master容错配置。如果是on Yarn模式,资源由RM管理,具体的容错由Yarn管理。
StandLone模式下,会启动多个Master,只有一个是处于Action状态,其他都是处于Stand By状态。当前Master异常时,会按照一定的规则从中选取一个接替原Master的工作。
Work容错
Worker也是Standlone才会有的,在on Yarn模式下,对应的是NodeManager,Yarn自身带有NodeManager的容错机制。
在StandLone模式下,Master回监控Worker的心跳,当Worker出现超时未响应的情况,会根据Worker运行的是Executor还是Driver分别进行处理:
Executor:Master会给对应的Driver发送消息,告知Executor已经丢失,同时把这些Executor从其应用程序运行列表删除。
Driver:会判断是否设置了需要重新启动,需要分配合适的节点重启,否则删除应用程序。
Executor容错
Executor出现异常后,Worker会接收到通知,并把该Executor的状态发给Master,Master发现Executor出现异常退出后,会尝试在可用的Worker节点重新启动Executor,会尝试一定的次数,如果失败,整个应用程序退出;这是防止应用程序一直占用资源。
Stage容错
Stage失败会重试,默认重试4次,可参数控制,spark.stage.maxConsecutiveAttempts控制
task容错
task级别的重试,同一个task失败4次才会被影响。
RDD容错
RDD本身会记录其数据来源,也就是依赖链信息,在计算结果出现丢失的时候可以根据Lineage重新恢复计算,如果lineage过长,还可以使用checkPoint机制,斩断依赖链。

项目中遇到数据倾斜没有,如何解决的。

数据倾斜的原因有很多种,首先就是业务数据本身的原因,这时候就要对数据进行预处理。
通常程序执行过程中Shuffle经常会导致数据的倾斜,有如下办法:

  • 消除shuffle:能不shuffle,尽量不使用shuffle。
  • 改变shuffle过程中的并行度:理论上可行,最简单,但通常没有效果。
  • 加盐:给key添加随机数,将原来不能打散的数据,强行打散,通用的办法。
  • 两阶段聚合:通过给key加盐,然后进行一次聚合(局部聚合),再去掉前缀或者后缀进行全局聚合
  • 过滤造成数据倾斜的key,其他没有数据倾斜正常处理即可。对有数据倾斜的key加盐,强行将其打散;然后将结果合并。
  • 如果以上方法都不行,采用本方法。对一张表的数据加盐强行打散,对另一张表数据扩容。

做过哪些Spark的优化

第一部分:代码优化

  • 减少对数据源的扫描
  • RDD复用,尽量少创建重复的RDD,同一份数据应该只有一个RDD
  • RDD缓存。选择合理的缓存级别对RDD进行缓存,对经过复杂计算、计算链路长的RDD做checkpoint。
  • 巧用Filter,并且尽可能早的执行filter操作,过滤无用的数据;过滤后使用coalesce对数据重新分区。
  • 使用高性能的算子:尽量不要使用groupByKey,根据场景选择使用高性能的聚合算子。
  • 设置合理的并行度,让并行度与资源相匹配。executor的数量executor的核心数(2~4)
  • 合理使用广播变量,减少网络数据的传输
  • Kryo序列化
  • 多使用SparkSql:编码简单,其优化器对Sql语句做了大量的优化。
  • 使用高性能的集合框架:fastUtil

第二部分 参数优化

  • shuffle参数调优
  • 内存参数调优
  • 资源调优
  • 启用动态资源分配
  • 调节本地等待时长

是否阅读过Spark的源码,读过哪些?

Master & Worker启动流程描述:

  • Master worker都有main方法,都是RpcEndPoint(消息通信体),通过receive、receiveAndReply、能够收、发、处理各种消息
  • RpcEndPoint的生命周期是:constructor -> onStart -> receive* -> onStop,其中:
    • onstart在接收任务消息前调用,主要用来执行初始化
    • receive和receiveAndReply分别用来接收RpcEndPoint send 或者 ask 过来的消息
    • send发送的消息不需要立即处理,ask发送的信息需要立即处理
    • 在Master.onStart方法中,最重要的事情是执行恢复。Master HA的实现方式有:ZOOKEEPER、FILESYSTEM、CUSTOM
    • 在Worker.onStart方法中,最重要的事情是:Worker向Master注册
    • Worker启动后,启动线程同时向所有的Master发送RegisterWorker消息,进行注册
    • Active Master接收到RegisterWorker后,判断worker是否重复注册,不是则记录worker的资源信息,并向worker返回一个RegisterWorker消息
    • Worker接收到Master返回的RegistedWorker消息后,启动一个定时线程定期向Master发送心跳信息(Hearbeat)