原文链接:https://blog.csdn.net/oscarun/java/article/details/97152082

文章目录
1 Spark为什么快,Spark SQL 一定比 Hive 快吗
2 RDD, DAG, Stage, Task 和 Job 怎么理解?
3 宽依赖、窄依赖怎么理解?
4 Spark 作业提交流程是怎么样的
5 为什么要用 Yarn 来部署 Spark?
6 简单说说 Spark 支持的4种集群管理器
7 说说 Worker 和 Executor 的区别
8 说说 Spark Local 和 Standalone 有什么区别
9 Spark 经常说的 Repartition 有什么作用
10 简单写一个 WordCount 程序

1 Spark为什么快,Spark SQL 一定比 Hive 快吗

Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。

消除了冗余的 HDFS 读写: Hadoop 每次 shuffle 操作后,必须写到磁盘,而 Spark 在 shuffle 后不一定落盘,可以 persist 到内存中,以便迭代时使用。如果操作复杂,很多的 shufle 操作,那么 Hadoop 的读写 IO 时间会大大增加,也是 Hive 更慢的主要原因了。
消除了冗余的 MapReduce 阶段: Hadoop 的 shuffle 操作一定连着完整的 MapReduce 操作,冗余繁琐。而 Spark 基于 RDD 提供了丰富的算子操作,且 reduce 操作产生 shuffle 数据,可以缓存在内存中。
JVM 的优化: Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM,基于进程的操作。而 Spark 每次 MapReduce 操作是基于线程的,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。每次启动 JVM 的时间可能就需要几秒甚至十几秒,那么当 Task 多了,这个时间 Hadoop 不知道比 Spark 慢了多少。
不过凡事都没有绝对,考虑一种极端查询:

select month_id, sum(sales) from T group by month_id;
1
这个查询只有一次 shuffle 操作,此时,也许 Hive HQL 的运行时间也许比 Spark 还快,反正 shuffle 完了都会落一次盘,或者都不落盘。

结论
Spark 快不是绝对的,但是绝大多数,Spark 都比 Hadoop 计算要快。这主要得益于其对 mapreduce 操作的优化以及对 JVM 使用的优化。

2 RDD, DAG, Stage, Task 和 Job 怎么理解?

RDD
RDD 是 Spark 的灵魂,也称为弹性分布式数据集。一个 RDD 代表一个可以被分区的只读数据集。RDD 内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。

DAG
Spark 中使用 DAG 对 RDD 的关系进行建模,描述了 RDD 的依赖关系,这种关系也被称之为 lineage(血缘),RDD 的依赖关系使用 Dependency 维护。

Stage
在 DAG 中又进行 Stage 的划分,划分的依据是依赖是否是 shuffle 的,每个 Stage 又可以划分成若干 Task。接下来的事情就是 Driver 发送 Task 到 Executor,Executor 线程池去执行这些 task,完成之后将结果返回给 Driver。

Job
Spark 的 Job 来源于用户执行 action 操作(这是 Spark 中实际意义的 Job),就是从 RDD 中获取结果的操作,而不是将一个 RDD 转换成另一个 RDD 的 transformation 操作。

Task
一个 Stage 内,最终的 RDD 有多少个 partition,就会产生多少个 task。

3 宽依赖、窄依赖怎么理解?

窄依赖指的是每一个 Parent RDD 的 Partition 最多被子 RDD 的一个 Partition 使用(一子一亲)
宽依赖指的是多个子 RDD 的 Partition 会依赖同一个 parent RDD的 partition(多子一亲)
RDD 作为数据结构,本质上是一个只读的分区记录集合。一个 RDD 可以包含多个分区,每个分区就是一个数据集片段。

首先,窄依赖可以支持在同一个节点上,以 pipeline 形式执行多条命令(也叫同一个 Stage 的操作),例如在执行了 map 后,紧接着执行 filter。相反,宽依赖需要所有的父分区都是可用的,可能还需要调用类似 MapReduce 之类的操作进行跨节点传递。

其次,则是从失败恢复的角度考虑。窄依赖的失败恢复更有效,因为它只需要重新计算丢失的 parent partition 即可,而且可以并行地在不同节点进行重计算(一台机器太慢就会重新调度到多个节点进行)。

4 Spark 作业提交流程是怎么样的

spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGScheduler 和 TaskScheduler。
TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。
Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个 Application 启动多个 Executor
Executor 启动后,会自己反向注册到 TaskScheduler 中。所有 Executor 都注册到 Driver 上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。
每执行到一个 Action,就会创建一个 Job。Job 会提交给 DAGScheduler。
DAGScheduler 会将 Job 划分为多个 Stage,然后每个 Stage 创建一个 TaskSet。
TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。
Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。(TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task 执行 RDD 里的一个 partition)

5 为什么要用 Yarn 来部署 Spark?

因为 Yarn 支持动态资源配置。Standalone 模式只支持简单的固定资源分配策略,每个任务固定数量的 core,各 Job 按顺序依次分配在资源,资源不够的时候就排队。

这种模式比较适合单用户的情况,多用户的情境下,会有可能有些用户的任务得不到资源。

Yarn 作为通用的资源调度平台,除了 Spark 提供调度服务之外,还可以为其他系统提供调度,如 Hadoop MapReduce, Hive 等。

6 简单说说 Spark 支持的4种集群管理器

Standalone 模式: 资源管理器是 Master 节点,调度策略相对单一,只支持先进先出模式,固定任务资源。
Hadoop Yarn 模式: 资源管理器是 Yarn 集群,主要用来管理资源。Yarn 支持动态资源的管理,还可以调度其他实现了 Yarn 调度接口的集群计算,非常适用于多个集群同时部署的场景,是目前最流行的一种资源管理系统。
Apache Mesos: Mesos 是专门用于分布式系统资源管理的开源系统,可以对集群中的资源做弹性管理。
Kubernetes: K8S 是自 Apache Spark 2.3.0 引入的集群管理器,Docker 作为基本的 Runtime 方式。
目前来说 Spark 的 Cluster Mode,Yarn 还是主流,K8S 则迎头赶上。

7 说说 Worker 和 Executor 的区别

Worker 是指每个工作节点,启动的一个进程,负责管理本节点,jps 可以看到 Worker 进程在运行,对应的概念是 Master 节点。
Executor 每个 Spark 程序在每个节点上启动的一个进程,专属于一个 Spark 程序,与 Spark 程序有相同的生命周期,负责 Spark 在节点上启动的 Task,管理内存和磁盘。如果一个节点上有多个 Spark 程序,那么相应就会启动多个执行器。所以说一个 Worker 节点可以有多个 Executor 进程。

8 说说 Spark Local 和 Standalone 有什么区别

Spark一共有6种运行模式:Local,Standalone,Yarn-Cluster,Yarn-Client, Mesos, Kubernetes

Local: Local 模式即单机模式,如果在命令语句中不加任何配置,则默认是 Local 模式,在本地运行。这也是部署、设置最简单的一种模式,所有的 Spark 进程都运行在一台机器或一个虚拟机上面。
Standalone: Standalone 是 Spark 自身实现的资源调度框架。如果我们只使用 Spark 进行大数据计算,不使用其他的计算框架时,就采用 Standalone 模式就够了,尤其是单用户的情况下。Standalone 模式是 Spark 实现的资源调度框架,其主要的节点有 Client 节点、Master 节点和 Worker 节点。其中 Driver 既可以运行在 Master 节点上中,也可以运行在本地 Client 端。当用 spark-shell 交互式工具提交 Spark 的 Job 时,Driver 在 Master 节点上运行;当使用 spark-submit 工具提交 Job 或者在 Eclipse、IDEA 等开发平台上使用 new SparkConf.setManager(“spark://master:7077”) 方式运行 Spark 任务时,Driver 是运行在本地 Client 端上的。
Standalone 模式的部署比较繁琐,不过官方有提供部署脚本,需要把 Spark 的部署包安装到每一台节点机器上,并且部署的目录也必须相同,而且需要 Master 节点和其他节点实现 SSH 无密码登录。启动时,需要先启动 Spark 的 Master 和 Slave 节点。提交命令类似于:

  1. ./bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master spark://Oscar-2.local:7077 \
  4. /tmp/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar \
  5. 100
  6. 1
  7. 2
  8. 3
  9. 4
  10. 5

其中 master:7077是 Spark 的 Master 节点的主机名和端口号,当然集群是需要提前启动。

9 Spark 经常说的 Repartition 有什么作用

一般上来说有多少个 Partition,就有多少个 Task,Repartition 的理解其实很简单,就是把原来 RDD 的分区重新安排。这样做有什么好坏呢?

避免小文件
减少 Task 个数
但是会增加每个 Task 处理的数据量,Task 运行时间可能会增加

10 简单写一个 WordCount 程序

我面试中,就曾经被面试官要求过手写一段 WordCount,别看好像很简单,实际上如果常年在做 Spark 内核的开发,而不是业务开发,也许你就想不起来了。

  1. sc.textFile("/path/to/spark/README.md")
  2. .flatMap(_.split(" "))
  3. .map(x => (x, 1))
  4. .reduceByKey(_ + _)
  5. .map(x => (x._2, x._1))
  6. .sortByKey(false)
  7. .map(x => (x._2, x._1))
  8. .take(10)
  9. # 结果
  10. Array[(String, Int)] = Array(("",71), (the,24), (to,17), (Spark,16), (for,12), (##,9), (and,9), (a,8), (can,7), (run,7))

11 RDD 如何通过记录更新的方式容错

RDD 实现分布式数据集容错方法有两种:

数据检查点
记录更新
RDD 采用记录更新的方式:记录所有更新点的成本很高。
所以,RDD只支持粗颗粒变换,即只记录单个块(分区 partition)上执行的单个操作,然后创建某个 RDD 的变换序列(血统 lineage)存储下来;
变换序列指,每个 RDD 都包含了它是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。因此 RDD 的容错机制又称“血统”容错。

12 Spark 优越性

Spark 的几个优势
更高的性能。因为数据被加载到集群主机的分布式内存中。数据可以被快速的转换迭代,并缓存用以后续的频繁访问需求。在数据全部加载到内存的情况下,Spark 可以比 Hadoop 快100倍,在内存不够存放所有数据的情况下快 Hadoop 10倍。
通过建立在 Java, Scala, Python, SQL(应对交互式查询)的标准API以方便各行各业使用,同时还含有大量开箱即用的机器学习库。
与现有 Hadoop 1和2.x(YARN)生态兼容,因此机构可以无缝迁移,目前也在做 Yarn 3 的支持。
方便下载和安装。方便的 shell(REPL: Read-Eval-Print-Loop)可以对 API 进行交互式的学习。
借助高等级的架构提高生产力,从而可以讲精力放到计算上。
MapReduce 与 Spark 相比,有哪些异同点
基本原理上
1.1 MapReduce: 基于磁盘的大数据批量处理系统
1.2 Spark: 基于 RDD (弹性分布式数据集)数据处理,显示将 RDD 数据存储到磁盘和内存中。
模型上
2.1 MapReduce 可以处理超大规模的数据,适合日志分析挖掘等较少的迭代的长任务需求,结合了数据的分布式的计算。
2.2 Spark 适合数据的挖掘,机器学习等多轮迭代式计算任务。
在 Spark 中,一个应用程序包含多个 Job 任务,在 MapReduce 中,一个 Job 任务就是一个应用。

13 Transformation 和 action 是什么?区别?举几个常用方法

RDD 创建后就可以在 RDD 上进行数据处理。RDD 支持两种操作:

转换(transformation): 即从现有的数据集创建一个新的数据集
动作(action): 即在数据集上进行计算后,返回一个值给 Driver 程序
RDD 的转化操作 Transformation 是返回一个新的 RDD 的操作,比如 map() 和 filter() ,而行动操作则是向驱动器程序 Driver 返回结果或把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first() 。Spark 对待转化操作和行动操作的方式很不一样,因此理解你正在进行的操作的类型是很重要的。如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。

RDD 中所有的 Transformation 都是惰性的,也就是说,它们并不会直接计算结果。相反的它们只是记住了这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给 Driver 的 Action 时,这些 Transformation 才会真正运行。

14 RDD 容错方式

Spark 选择记录更新的方式。但是,如果更新粒度太细太多,那么记录更新成本也不低。因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建 RDD 的一系列变换序列(每个 RDD 都包含了他是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。因此 RDD 的容错机制又称血统容错)记录下来,以便恢复丢失的分区。lineage 本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据(所以也称为粗粒度)。

15 可以解释一下这两段程序的异同吗

1

val counter = 0
val data = Seq(1, 2, 3)
data.foreach(x => counter += x)
println("Counter value: " + counter)

2

val counter = 0
val data = Seq(1, 2, 3)
var rdd = sc.parallelizze(data)
rdd.foreach(x => counter += x)
println("Counter value: " + counter)

所有在 Driver 程序追踪的代码看上去好像在 Driver 上计算,实际上都不在本地,每个 RDD 操作都被转换成 Job 分发至集群的执行器 Executor 进程中运行,即便是单机本地运行模式,也是在单独的执行器进程上运行,与 Driver 进程属于不用的进程。所以每个 Job 的执行,都会经历序列化、网络传输、反序列化和运行的过程。

再具体一点解释是 foreach 中的匿名函数 x => counter += x 首先会被序列化然后被传入计算节点,反序列化之后再运行,因为 foreach 是 Action 操作,结果会返回到 Driver 进程中。

在序列化的时候,Spark 会将 Job 运行所依赖的变量、方法全部打包在一起序列化,相当于它们的副本,所以 counter 会一起被序列化,然后传输到计算节点,是计算节点上的 counter 会自增,而 Driver 程序追踪的 counter 则不会发生变化。执行完成之后,结果会返回到 Driver 程序中。而 Driver 中的 counter 依然是当初的那个 Driver 的值为0。

16 说说 map 和 mapPartitions 的区别

map 中的 func 作用的是 RDD 中每一个元素,而 mapPartitioons 中的 func 作用的对象是 RDD 的一整个分区。所以 func 的类型是 Iterator => Iterator,其中 T 是输入 RDD 的元素类型。

这些可以用 API 中看到。

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

 /**
  * Return a new RDD by applying a function to each partition of this RDD.
  *
  * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
  * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
  */
 def mapPartitions[U: ClassTag](
     f: Iterator[T] => Iterator[U],
     preservesPartitioning: Boolean = false): RDD[U] = withScope {
   val cleanedF = sc.clean(f)
   new MapPartitionsRDD(
     this,
     (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
     preservesPartitioning)
 }

17 groupByKey 和 reduceByKey 是属于 Transformation 还是 Action?

前者,因为 Action 输出的不再是 RDD 了,也就意味着输出不是分布式的,而是回送到 Driver 程序。以上两种操作都是返回 RDD,所以应该属于 Transformation。

18 说说检查点 checkpoint 的意义

分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储中。

19 说说 Spark 的特点,相对于 MapReduce 来说

减少磁盘 I/O,MR 会把 map 端将中间输出和结果存储在磁盘中,reduce 端又需要从磁盘读写中间结果,势必造成磁盘 I/O 称为瓶颈。Spark 允许将 map 端的中间结果输出和结果存储在内存中,reduce 端在拉取中间结果的时候避免了大量的磁盘 I/O。
增加并行度,由于把中间结果写到磁盘与从磁盘读取中间结果属于不同的缓解,Hadoop 将他们简单地通过串行执行衔接起来,Spark 则把不同的环节抽象成为 Stage,允许多个 Stage 既可以串行又可以并行执行。
避免重新计算,当 Stage 中某个分区的 Task 执行失败后,会重新对此 Stage 调度,但在重新调度的时候会过滤已经执行成功的分区任务,所以不会造成重复计算和资源浪费。
可选的 Shuffle 排序,MR 在 Shuffle 之前有着固定的排序操作,而 Spark 则可以根据不同场景选择在 map 端排序还是 reduce 排序。
灵活的内存管理策略,Spark 将内存分为堆上的存储内存、堆外的存储内存,堆上的执行内存,堆外的执行内存4个部分。

20 Task 和 Stage 的分类

Task 指具体的执行任务,一个 Job 在每个 Stage 内都会按照 RDD 的 Partition 数量,创建多个 Task,Task 分为 ShuffleMapTask 和 ResultTask 两种。

ShuffleMapStage 中的 Task 为 ShuffleMapTask,而 ResultStage 中的 Task 为 ResultTask。

ShuffleMapTask 和 ResultTask 类似于 Hadoop 中的 Map 任务和 Reduce 任务。