Spark Application

Spark Application在Standalone集群运行时,有2部分组成:

  • Driver Program(AppMaster)
  • Executors(MapTask和ReduceTask)

image.png

应用运行模式

  • Standalone
  • YARN(推荐)
  • Mesos(少用)

image.png

应用提交

spark-submit —master options —class org.apache.spark.examples.SparkPi spark-examples_2.12-3.1.2.jar 10
options 参数:

  • 本地方式:local[2]
  • 集群方式:spark://master01:7077,master02:7077
  • YARN集群方式:yarn

    应用部署模式

    spark-submit提交应用运行时,指定参数:—deploy-mode,表示Driver Program运行位置

    client模式(默认)

    image.png

    cluster模式(正式环境【推荐】)

    image.png

    Spark on YARN

    client模式

    Driver与AppMaster各司其职,AppMaster负责资源申请Executors,Driver负责Job调度
    image.png

    cluster模式

    Driver与AppMaster合为一体
    image.png

    RDD

    RDD(Resilent Distributed Datasets)俗称弹性分布式数据集,是 Spark 底层的分布式存储的数据结构。可以说是 Spark 的核心, Spark API 的所有操作都是基于 RDD 的。数据不只存储在一台机器上,而是分布在多台机器上,实现数据计算的并行化。弹性表明数据丢失时,可以进行重建。在Spark 1.5版以后,新增了数据结构 Spark-DataFrame,仿造的 R 和 python 的类 SQL 结构-DataFrame,底层为 RDD,能够让数据从业人员更好的操作 RDD。
    三个要点:
    image.png

    RDD设计思想

    在Spark 的设计思想中,为了减少网络及磁盘 IO 开销,需要设计出一种新的容错方式,于是才诞生了新的数据结构 RDD。RDD 是一种只读的数据块。可以从外部数据转换而来,你可以对RDD 进行函数操作(Operation),包括 Transformation 和 Action。在这里只读表示当你对一个 RDD 进行了操作,那么结果将会是一个新的 RDD,这种情况放在代码里,假设变换前后都是使用同一个变量表示这一 RDD,RDD 里面的数据并不是真实的数据,而是一些元数据信息,记录了该 RDD 是通过哪些 Transformation 得到的,在计算机中使用 lineage 来表示这种血缘结构,lineage 形成一个有向无环图 DAG,整个计算过程中,将不需要将中间结果落地到 HDFS 进行容错,加入某个节点出错,则只需要通过 lineage 关系重新计算即可。

    RDD5个特性

    image.png
  1. 它是在集群节点上的不可变的、已分区的集合对象
  2. 通过并行转换的方式来创建(如 Map、 filter、join 等)
  3. 失败自动重建
  4. 可以控制存储级别(内存、磁盘等)来进行重用
  5. 必须是可序列化的
  6. 是静态类型的(只读)

    RDD创建

  • 并行化(Parallelizing)一个已经存在与驱动程序(Driver Program)中的集合,如set、list;
  • 读取外部存储系统上的一个数据集,比如HDFS、Hive、HBase,或者任何提供了Hadoop InputFormat的数据源。也可以从本地读取 txt、csv 等数据集。

    RDD函数(operation)

    主要分为2种类型 Transformation 和 Action。
    image.png
    Transformation 操作不是马上提交 Spark 集群执行的,Spark 在遇到Transformation操作时只会记录需要这样的操作,并不会去执行,需要等到有Action操作的时候才会真正启动计算过程进行计算。
    针对每个 Action,Spark 会生成一个Job,从数据的创建开始,经过 Transformation,结尾是 Action 操作。这些操作对应形成一个有向无环图(DAG),形成 DAG 的先决条件是最后的函数操作是一个Action。

    常用函数(重点)

  • 分区操作函数(创建对象)
    mapPartitions、foreachPartition

  • 重分区函数(调整分区个数)
    repartition(增加分区数量,产生shuffle)、coalesce(减少分区数目)
  • 聚合函数
    Scala聚合函数:reduce【reduce((tmp,item)=>tmp+item)】、fold
    RDD聚合函数:reduce/fold(分区内聚合、分区间聚合)、aggregate
    PairRDDFunctions聚合函数:groupByKey、reduceByKey/foldByKey、aggregateByKey、combineByKey(最底层聚合函数)
  • 关联函数
  • 缓存函数(持久化函数
    将一个RDD数据进行缓存,可以是内存(Executor内存)、磁盘、系统内存(OFF_HEAP)
    释放内存:unpersist

RDD checkpoint

Checkpoint的产生就是为了更加可好的数据持久化,在Checkpoint的时候一般将数据放在HDFS上,这是借助HDFS天生的高容错、高可靠来实现数据最大程度的安全,实现RDD的容错和高可用。
RDD持久化和checkpoint的区别:
image.png
checkpoint是懒加载模式,一般需要借助count函数触发
image.png

外部数据源

image.png
操作方式:
image.png
常见的外部数据源扩展:

  • HBase数据源
  • MySQL数据源

image.png

共享变量

广播变量(Broadcast Variables)和累加器(Accumulators)

广播变量Broadcast

类似于MapReduce中的Map DistributedCache,将某个变量数据发送给所有的Executor,变量值不可变

累加器Accumulator

类似于MapReduce中的计数器Counters,即计数功能

Spark内核

image.png
shuffle 是划分 DAG 中 stage 的标识,同时影响 Spark 执行速度的关键步骤。

RDD依赖

RDD 的 Transformation 函数中,分为:

  • 窄依赖(narrow dependency)
    窄依赖是子 RDD的各个分片(partition)不依赖于其他分片,能够独立计算得到结果。
  • 宽依赖(wide dependency)
    宽依赖会发生 shuffle 操作。宽依赖指子 RDD 的各个分片会依赖于父RDD 的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片。

窄依赖跟宽依赖的区别是是否发生 shuffle(洗牌) ,

Shuffle和Stage流程

看如下示例:

  1. // Map: "cat" -> c, cat
  2. val rdd1 = rdd.Map(x => (x.charAt(0), x))
  3. // groupby same key and count
  4. val rdd2 = rdd1.groupBy(x => x._1).Map(x => (x._1, x._2.toList.length))

第一个 Map 操作将 RDD 里的各个元素进行映射,RDD 的各个数据元素之间不存在依赖,可以在集群的各个内存中独立计算,也就是并行化
第二个 groupby 之后的 Map 操作,为了计算相同 key 下的元素个数,需要把相同 key 的元素聚集到同一个 partition 下,所以造成了数据在内存中的重新分布,即 shuffle 操作。
shuffle 操作是 spark 中最耗时的操作,应尽量避免不必要的 shuffle

宽依赖主要有两个过程:shuffle write 和 shuffle fetch。
类似 Hadoop 的 Map 和 Reduce 阶段。shuffle write 将 ShuffleMapTask 任务产生的中间结果缓存到内存中, shuffle fetch 获得 ShuffleMapTask 缓存的中间结果进行 ShuffleReduceTask 计算,这个过程容易造成OutOfMemory

shuffle 过程内存分配使用 ShuffleMemoryManager 类管理,会针对每个 Task 分配内存,Task 任务完成后通过 Executor 释放空间。这里可以把 Task 理解成不同 key 的数据对应一个 Task。 早期的内存分配机制使用公平分配,即不同 Task 分配的内存是一样的,但是这样容易造成内存需求过多的 Task 的 OutOfMemory, 从而造成多余的 磁盘 IO 过程,影响整体的效率。(某一个 key 下的数据明显偏多,但因为大家内存都一样,这一个 key 的数据就容易 OutOfMemory)。
1.5版以后Task 共用一个内存池,内存池的大小默认为 JVM 最大运行时内存容量的16%,分配机制如下:假如有 N 个 Task,ShuffleMemoryManager 保证每个 Task 溢出之前至少可以申请到1/2N 内存,且至多申请到1/N,N 为当前活动的 shuffle Task 数,因为N 是一直变化的,所以 manager 会一直追踪 Task 数的变化,重新计算队列中的1/N 和1/2N。但是这样仍然容易造成内存需要多的 Task 任务溢出,所以最近有很多相关的研究是针对 shuffle 过程内存优化的。
image.png
如下 DAG 流程图中,分别读取数据,经过处理后 join 2个 RDD 得到结果。
image.png
在这个图中,根据是否发生 shuffle 操作能够将其分成如下的 stage 类型:
image.png
join 需要针对同一个 key 合并,所以需要 shuffle。运行到每个 stage 的边界时,数据在父 stage 中按照 Task 写到磁盘上,而在子 stage 中通过网络按照 Task 去读取数据。
这些操作会导致很重的网络以及磁盘的I/O,所以 stage 的边界是非常占资源的,在编写 Spark 程序的时候需要尽量避免的
父 stage 中 partition 个数与子 stage 的 partition 个数可能不同,所以那些产生 stage 边界的 Transformation 常常需要接受一个 numPartition 的参数来觉得子 stage 中的数据将被切分为多少个 partition。
PS:shuffle 操作的时候可以用 combiner 压缩数据,减少 IO 的消耗

Job调度流程

当创建SparkContext对象时,就会创建DAGSchedulerTaskSCheduler对象。

  • DAGScheduler
    将Job对应的DAG图划分为Stage,划分的依据:RDD之间产生Shuffle(宽依赖)
  • TaskSCheduler
    对每个Stage中Task调度执行,运行在Executor中
    每个Task以线程Thread方式运行,需要1Core CPU,处理1个分区数据,采用pipeline管道计算模式

    资源设置

  • 设置Task数量
    官方推荐,Task数量设置成Application总CPU Core数量的2-3倍(100个cpu core,设置task数量为200-300)

  • 设置Application并行度
    参数spark.default.parallelism默认是没有设置的,设置值之后在shuffle过程中才起作用