RDD编程模型

MapReduce编程模型

image.png

DAG编程模型
image.png

RDD是什么

Resilient Distributed Datasets 弹性分布式数据集

  • Resilient: 弹性的、可容错的,一种容错的内存计算数据抽象
  • Distributed:通过多个分区将数据分散在不同节点
  • Datasets:数据集:一种数据抽象集合

物理上:

  • RDD是一组记录的集合。
  • 一个RDD可以分成多个分区, 每个分区是不可变的,分散在 集群的各个地方

逻辑上:

  • RDD是一个编程的数据抽象, 可以对它进行各自操作。
  • RDD操作都是高阶函数,这些 操作内部都是并发执行
  • 由两种类型的操作: 转换和执 行。

    RDD操作

    image.png

    RDD依赖

  • 窄依赖:父RDD和子RDDpartition之间的关系是一对一的,或者父RDD一个partition只对应一个子RDD的partition情况下的父RDD和子RDD partition关系是多对一的,不会有shuffle产生。父RDD的一个分区去到了子RDD的一个分区

  • 宽依赖:父RDD与子RDD partition之间的关系是一对多,会有shuffle的产生。父RDD的一个分区的数据去到了子RDD的不同分区里面。
  • 宽依赖会将pipeline划分为不同步stage,放在不同的task上去执行

image.png

RDD容错

当pipeline中其中的task出现故障挂机的时候,RDD可以自动恢复的能力。依赖的是RDD中的依赖(血缘)关系。
如RDD2中的Partition3发生故障后,可以基于依赖关系,由RDD的Partiton3重新计算。
image.png

Spark Core架构和原理

Spark运行架构

image.png

Job/Stage/Task

image.png

任务提交流程

spark任务提交方式和执行流程)

  • Application:表示你的应用程序
  • Driver:表示main()函数,创建SparkContext。由SparkContext负责与ClusterManager通信,进行资源的申请,任务的分配和监控等。程序执行完毕后关闭SparkContext
  • Executor:某个Application运行在Worker节点上的一个进程,该进程负责运行某些task,并且负责将数据存在内存或者磁盘上。在Spark on Yarn模式下,其进程名称为 CoarseGrainedExecutor Backend,一个CoarseGrainedExecutor Backend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task,这样,每个CoarseGrainedExecutorBackend能并行运行Task的数据就取决于分配给它的CPU的个数。
  • Worker:集群中可以运行Application代码的节点。在Standalone模式中指的是通过slave文件配置的worker节点,在Spark on Yarn模式中指的就是NodeManager节点。
  • Task:在Executor进程中执行任务的工作单元,多个Task组成一个Stage
  • Job:包含多个Task组成的并行计算,是由Action行为触发的
  • Stage:每个Job会被拆分很多组Task,作为一个TaskSet,其名称为Stage
  • DAGScheduler:根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler,其划分Stage的依据是RDD之间的依赖关系
  • TaskScheduler:将TaskSet提交给Worker(集群)运行,每个Executor运行什么Task就是在此处分配的。

image.png
image.png

流程

  • 构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
  • 资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
  • SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task
  • Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。
  • Task在Executor上运行,运行完毕释放所有资源。

    总结image.png

    Spark Shuffle

  • shuffle只发生在两个stage之间

  • 在partition内部重新排布数据

    shuffle读写

  • Shuffle分为shuffle write阶段(map side)和shuffle read阶段(reduce side)。

    • Write阶段(map side) 的任务个数是根据RDD的分区数决定的。 假设从HDFS中读取数据,那么RDD分区个数由该数据集的block数决定,也就是一个 split对应生成RDD的一个partition。
    • Read阶段(reduce side)的任务个数是通过配置spark.sql.shuffle.partitions决定的。
  • Shuffle中间的数据交互
    • Write阶段(map side) 会将状态以及Shuffle文件的位置等信息封装到MapStatue对象 中,然后发送给Driver。
    • Read阶段(reduce side)会从Driver拉取MapStatue,解析后开始执行reduce操作。
  • Spark1.2前使用HashShuffle算法,1.2之后主要使用SortShuffle。

    HashShuffle

    image.png

  • Shuffle read阶段,从各个节点上通过网络拉取到reduce任务所在的节点,然后进行key的 聚合或连接等操作。 一般来说,拉取Shuffle中间结果的过程是一边拉取一边聚合的。每个 shuffle read task都会有一个自己的buffer缓冲区,每次只能拉取与buffer缓冲区相同大 小的数据,然后在内存中进行聚合。聚合完一批数据后,再拉取下一批数据,直到最后将 所有数据到拉取完,得到最终的结果

  • Shuffle write阶段,每个task根据记录的Key进行哈希取模操作(hash(key) % reduceNum),相同结果的记录会写到同一个磁盘文件中。会先将数据写入内存缓冲区, 当内存缓冲填满之后,才会溢写(spill)到磁盘文件中

    优化后的HashShuffle

    image.png

  • spark.shuffle.consolidateFiles设为true,shuffle write阶段并不会为每个task创建 reduceNum个文件,而是一个cpu core具有一个逻辑上shuffleFileGroup,每个Group 会生成reduceNum个文件,这样大量减少了shuffle中间文件个数

    SortShuffle

    image.png

    普通机制

  • Task的数据会先写入一个内存数据结构中,当内存满了之后,会根据 Key进行排序,然后分批溢写到本地磁盘(示例图演示为3批次)。 溢写过程只会产生2个磁盘文件,一个是数据文件,一个是索引文件 (其中标识了各个task的数据在文件中的start offset与end offset)

    bypass

    image.png
    BypassSortShuffle的触发条件为:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold(默认200)

  • 不是聚合类的shuffle算子(比如reduceByKey)

    • 此时task会创建reduceNum个临时磁盘文件,并将数据按key进行hash取模,写入对应
      的磁盘文件。类似HashShuffle,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再
      溢写到磁盘文件的。最后,将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单
      独的索引文件。
      该Shuffle会生成大量中间文件,虽然最后都合并了。且不需要对数据进行排序。

      为什么要减少Shuffle

  • 网络IO:shuffle将数据从内存中移出

  • 数据通过网络传输必须序列化及反序列化
  • CPU:为了减少数据的网络传输而进行压缩
  • 磁盘IO:一个数据块有多次IO读写

    Shuffle优化原则

  • 减少shuffle的数据量

    • 部分预聚合函数
  • 避免shuffle
    • 数据预分布:比如Local Join
    • Map Join(广播join)

      shuffle参数优化

      shuffle参数官网)

      数据倾斜及其优化