一、 Spark 概述

1. Spark 是什么?

image.png
Spark 是基于内存的快速、通用、可扩展的大数据分析计算引擎

2. Spark 和 Hadoop 比较

  • 从时间节点上看:
    • hadoop
    1. 2006年 Doug Cutting 加入Yahoo,领导Hadoop 的开发
    2. 2008年 1月,Hadoop成为 Apache 顶级项目
    3. 2011年 1.0 正式发布
    4. 2012年3月 稳定版发布
    5. 2013年10月 2.x(yarn) 版本
    • Spark
    1. 2009年 spark 诞生于 伯克利大学的AMPLab 实验室
    2. 2010年 伯克利大学开源了 Spark 项目
    3. 2013年6月 Spark 称为 Apache 基金会下的项目
    4. 2014年2月,Spark 飞速成为 Apache 下的顶级项目
    5. 2015年至今,Spark变得愈发火爆,大量的国内公司开始重点部署或者使用 Spark
  • 从功能上看
    • hadoop
      • hadoop 是由java 编写的,在分布式服务器上存储海量数据并运行分布式分析应用的框架
      • HDFS 作为Hadoop 生态圈的最底层,存储着所有的数据,支持Hadoop 服务。他的理论来源于 Goole 的 TheGooleFileSystem 是GFS 的开源实现。
      • MapReduce 是一种编程模型,理论来源于 Goole 的 MapReduce 论文,作为 Hadoop 分布式计算模型,是Hadoop 的核心
      • HBase 基于 HDFS 的分布式数据库引擎,擅长实时读写大数据集,理论来源于 Goole 的论文Bigtable 。
    • Spark
      • Spark 是由scala 编写的,是一种快速、通用、可扩展的大数据分析引擎
      • Spark Core 中提供了 Spark 最基础与最核心的功能
      • Spark SQL 用来操作结构化数据的组件,可以使用 SQL 和 HQL 查询数据
      • Spark Streaming 是 Spark 平台 针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API

从上面可以看出来,Spark 出现的比较晚,并且主要用于数据计算,所以常被认为 Hadoop 的 升级版。两者之间最根本的区别在于 多个作业通信问题,Spark 是基于内存的(只有在shuffle 的时候才写进磁盘),Hadoop 是基于磁盘的。另外,在创建进程方面,Spark 采用 fork 的方式 而 hadoop 采用 创建新进程的方式。
Hadoop 的 MR 和 Spark 都是处理数据的框架,那我们如何选择呢?
MapReduce 设计的初衷并不是为了满足迭代式数据流的处理,因此在多并行的数据可复用场景下(如:机器学习、交互式数据挖掘算法)中存在诸多计算效率的问题。Spark 应运而生,它在MR 的基础上,将计算过程进行优化,从而大大加快了数据分析、挖掘的运行和读写速度,并将计算单元缩小到更合适并行计算和重复使用的RDD(Resilient Distribution Datasets) 模型,同时提供了比MapReduce 更丰富的模型,可以快速在内存中对数据集进行多次迭代。

3. Spark 核心模块

spark - 图2

  • Spark Core : 提供了 Spark 最基础最核心的功能,是其他四个功能的基础
  • Spark SQL : 是 Spark 用来操作结构化数据库的,通过Spark SQL ,用户可以使用 SQL / HQL 来查询数据
  • Spark Streaming :针对实时数据进行流式计算的组件,提供了丰富的数据流处理 API
  • Spark Mlib : 是 Spark 提供的一个机器学习算法库。MLlib 不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语
  • Spark Graph:GraphX 是 Spark 面向图计算提供的框架与算法库。

    二、Spark 快速上手

    1. 增加 Scala 插件

    使用的Spark 3.0.0版本 ,采用的 scala 是 2.12
    image.png

    2. 添加 maven 依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.spark</groupId>
    4. <artifactId>spark-core_2.12</artifactId>
    5. <version>3.0.0</version>
    6. </dependency>
    7. </dependencies>

    3. WorkCount

  • 在resources 目录中创建log4j.properties ```properties log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

Set the default spark-shell log level to ERROR. When running the spark-shell, the

log level for this class is used to overwrite the root logger’s log level, so that

the user can have different defaults for the shell and regular Spark apps.

log4j.logger.org.apache.spark.repl.Main=ERROR

Settings to quiet third party logs that are too verbose

log4j.logger.org.spark_project.jetty=ERROR log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR

SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support

log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

```scala
def main(args: Array[String]): Unit = {
  // 创建 Spark 运行配置对象
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

  // 创建 Spark 上下文环境对象(连接对象)
  val sc : SparkContext = new SparkContext(sparkConf)

  // 读取文件数据
  val fileRDD: RDD[String] = sc.textFile("input/word.txt")

  // 将文件中的数据进行分词
  val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )

  // 转换数据结构 word => (word, 1)
  val word2OneRDD: RDD[(String, Int)] = wordRDD.map((_,1))

  // 将转换结构后的数据按照相同的单词进行分组聚合
  val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_+_)

  // 将数据聚合结果采集到内存中
  val word2Count: Array[(String, Int)] = word2CountRDD.collect()

  // 打印结果
  word2Count.foreach(println)

  //关闭 Spark 连接
  sc.stop()

  }

三、Spark 运行环境

1. local 模式

  1. 解压文件

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module
cd /opt/module/
mv spark-3.0.0-bin-hadoop3.2 spark-local

  1. 启动local环境

bin/spark-shell

  1. 查看 web ui 界面

    http://hadoop102:4040
    image.png

  2. 命令行工具 ```scala scala> sc.textFile(“data/word.txt”).flatMap(.split(“ “)).map((,1)).reduceByKey(+).collect res0: Array[(String, Int)] = Array((scale,1), (hello,2), (spark,1))

scala> :quit


   5. 提交应用
```scala
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.12-3.0.0.jar 10
  • 参数说明 | 参数 | 解释 | 可选值举例 | | —- | —- | —- | | —class | Spark 程序中包含主函数的类 | 此处可以更换为咱们自己写的应用程序
    | | —master | Spark 程序运行的模式(环境) | 模式:local[*]、 spark://linux1:7077、
    Yarn | | —executor-memory 1G | 指定每个 executor 可用内存为 1G | 可选参数 符合集群内存配置即可,具体情况具体分析。 | | —total-executor-cores 2 | 指定所有executor 使用的cpu 核数
    为 2 个 | 同上 | | —executor-cores | 指定每个executor 使用的cpu 核数 | 同上 | | application-jar | 打包好的应用 jar,包含依赖。这个 URL在集群中全局可见。 比如 hdfs:// 共享存储系统,如果是 | 实际使用时,可以设定为自己打的 jar包 |

2. standalone 模式

Spark 的 Standalone 模式体现了经典的master-slave 模式。集群规划:


hadoop102 hadoop103 hadoop104
Spark Worker Master Worker Worker
  1. 解压文件

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module
cd /opt/module
mv spark-3.0.0-bin-hadoop3.2 spark-standalone

  1. 修改配置文件

    hadoop102
    hadoop103
    hadoop104
    
    export JAVA_HOME=/opt/module/jdk1.8.0_212 
    SPARK_MASTER_HOST=hadoop102
    SPARK_MASTER_PORT=7077
    

    最后分发 xsync spark-standalone

  2. 启动集群

sbin/start-all.sh
可访问 http://hadoop102:8080/
image.png

  1. 提交参数

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master spark://hadoop102:7077 \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
    

    image.png

  2. 配置历史服务器

    spark.eventLog.enabled true
    spark.eventLog.dir hdfs://hadoop102:8020/directory
    
    export SPARK_HISTORY_OPTS="
    -Dspark.history.ui.port=18080 
    -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory 
    -Dspark.history.retainedApplications=30"
    

    记得分发文件
    重新开启 sbin/start-all.sh sbin/start-history-server.sh
    重新执行步骤d 查看历史服务器

  3. 配置高可用

借助 zk

注释如下内容:
#SPARK_MASTER_HOST=hadoop102
#SPARK_MASTER_PORT=7077
添加如下内容:
#Master 监控页面默认访问端口为 8080,但是可能会和 Zookeeper 冲突,所以改成 8989,也可以自
定义,访问 UI 监控页面时请注意
SPARK_MASTER_WEBUI_PORT=8989
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=hadoop102,hadoop102,hadoop102
-Dspark.deploy.zookeeper.dir=/spark"

3. yarn

Spark 主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。

  1. 解压文件

tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module
cd /opt/module/
mv spark-3.0.0-bin-hadoop3.2 spark-yarn

  1. 修改配置文件
  • 修改 hadoop 配置文件/opt/module/hadoop/etc/hadoop/yarn-site.xml, 并分发

    <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认
    是 true -->
    <property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
    </property>
    <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认
    是 true -->
    <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
    </property>
    
  • 修改 conf/spark-env.sh,添加 JAVA_HOME 和 YARN_CONF_DIR 配置

    export JAVA_HOME=/opt/module/jdk1.8.0_212 
    YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
    
  1. 启动 hadoop

sbin/start-dfs.sh sbin/start-yarn.sh

  1. 提交应用

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    ./examples/jars/spark-examples_2.12-3.0.0.jar  10
    

    image.png

  2. 配置历史服务器 ```scala spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/directory

spark.yarn.historyServer.address=hadoop102:18080 spark.history.ui.port=18080

注意:需要启动 hadoop 集群,HDFS 上的目录需要提前存在。<br />` sbin/start-dfs.sh `<br />         ` hadoop fs -mkdir /directory `
```scala
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory 
-Dspark.history.retainedApplications=30"
  1. 启动服务

    sbin/start-history-server.sh

    3. K8S & Mesos 模式

    4. Windows 模式 自己练习使用

  2. 将文件 spark-3.0.0-bin-hadoop3.2.tgz 解压缩到无中文无空格的路径中

  3. 执行解压缩文件路径下 bin 目录中的 spark-shell.cmd 文件,启动 Spark 本地环境
  4. 在 bin 目录中创建 input 目录,并添加 word.txt 文件, 在命令行中输入脚本代码
    spark-submit --class org.apache.spark.examples.SparkPi --master 
    local[2] ../examples/jars/spark-examples_2.12-3.0.0.jar 10
    

    5. 部署模式对比

    | 模式 | 安装机器数 | 需要启动的进程 | 应用场景 | | —- | —- | —- | —- | | local | 1 | 无 | 测试 | | Standalone | 3 | Worker Master | 单独部署 | | Yarn | 1 | hadoop | 实际使用 |

6. 常见端口号

  • 查看当前 Spark-Shell 运行情况 端口号 :4040
  • Master 内部通信端口号:7077
  • Standalone模式下,Web 端口 8080
  • 历史服务器 18080
  • yarn 任务运行情况 :8088

    四、运行架构

    1. 架构图

    Spark 框架的核心是一个计算引擎,从整体上来说 ,它采用了 master-slave 架构。
    image.png

    2. 核心组件

  • driver :驱动器节点,执行Spark 任务中的 main 方法。负责实际代码的执行,主要负责

    • 将用户编写的程序转化为job
    • 在 Executor 之间任务调度,并追中每个 Executor 的执行情况
    • 通过 UI 展示运行情况

但是在整个编程过程中,我们没有看到任何关于 driver的字眼,所以简单理解 driver 就是驱动整个应用启动起来的程序。

  • Executor :它是集群工作节点中的一个 JVM进程,负责具体任务的执行,各个任务之间彼此独立。在Spark 启动的时候,所以Executor 同时启动,并伴随着 Spark 的结果而结束。如果某个 Executor 发生故障或者崩溃,整个 Spark 会继续运行,会将出错节点上的任务转移到其他节点上继续运行。他有两个核心的功能:
    • 负责运行 组成Spark的任务,并将结果返回个driver
    • 通过自身的 块管理器,为程序中需要缓存的 RDD 提供内存式存储。RDD是直接缓存在 Executor 进程内的,因此任务可以在运行的过程中充分利用内存缓存进行操作
  • master worker

在 spark 独立部署模式下,不需要依赖其他资源,自身就可以实现资源调度功能,所以还需要额外两个进程 Mater 、Worker 。 Master 主要资源的调度和分配,并进行集群的监控等类似于 yarn 的 RM;Worker 是运行在每一台服务器中,类似于 yarn 的 DM

  • ApplicationMaster

单个任务的老大,负责向资源调度器申请由于执行任务的 Container,运行自己的程序任务,监控整个任务的执行,追踪任务的实时状态,处理任务失败重新执行等

3. 核心概念

1. Executor 和 core

Executor 是节点中运行的一个 JVM 进程,是专门用于计算的。用户在提交应用的时候 可以指定参与计算到节点个数以及内存大小和 CPU 核数

名称 说明
—num-executors 配置 Executor 的数量
—executor-memory 配置每个 Executor 的内存大小
—executor-cores 配置每个 Executor 的虚拟 CPU core 数量

2. 并行度

在分布式计算框架中,任务分配到不同的节点上运行,所以是真正的并行运行。我们把并行执行任务的数量称为并行度,具体大小取决于框架的配置,用户也可以在应用的运行过程中动态修改

3. 有向无环图(DAG)

根据使用的方式不同,大数据计算引擎一般分为三类。第一类就是 hadoop的 MapReduce ,它分为两个阶段Map 和Reduce。对于上层应用来说,就不得不拆分算法,实现多个job 的串联,最终完成整个算法,比如 迭代运算。由于存在这样的弊端,催生了支持 DAG 的框架;我们把支持DAG 算法的称为第二类计算引擎,代表是 Tez 、Oozie,大多数支持批处理任务;接下来就是第三类 以 Spark 为代表的计算引擎,主要特点是在 Job 内部支持 DAG 以及实时计算

4. 数据提交流程(yarn)

Spark 提交到 yarn 环境中执行的时候,一般有两种部署模式,Client 和 Cluster ,区别在于 driver 运行节点的位置。Client模式中,driver在客户端进行,一般用于测试。Cluster 模式中,driver 在yarn 集群资源中启动。
image.png

  • Yarn Client 模式

Client 模式将用于监控和调度的Driver 模块在客户端执行,而不是在 Yarn 中,所以一般用于测试

  1. driver 提交任务到本地机器上运行
  2. driver 启动后和 ResourcesManager 进行通讯 申请启动 ApplicationMaster
  3. ResourcesManager 分配 contain 在合适的节点上启动 ApplicationMaster。 ApplicationMaster启动后向 RM 申请 Executor 执行资源
  4. RM 收到 ApplicationMaster 申请资源的请求后,分配 container 容器,ApplicationMaster 在分配的资源上启动 Executor
  5. Executor 启动后 向 driver 反向注册,所有的 Executor 全部反向注册完成后 driver 开始执行 main 函数,执行到 Action 算子时,触发一个 job,根据宽依赖划分 Stage,在每个 Stage 上生成对应的 TaskSet, 之后将 TaskSet 分发到各个 Executor。
    • Yarn cluster 模式

Cluster 模式将用于监控和调度的 Driver 模块启动在Yarn 集群资源中执行。一般应用于实际生产环境。

  1. 在 YARNCluster模式下,任务提交后会和 ResourceManager 通讯申请启动ApplicationMaster,
  2. 随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的 ApplicationMaster就是Driver。
  3. driver 启动后 向 RM 申请 Executor 内存,ResourceManager 接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程
  4. Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main 函数
  5. 之后执行到 Action算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个stage生成对应的TaskSet,之后将 task 分发到各个Executor上执行。

    4. spark shuffle

    spark Sql 是在hadoop shuffle 上进行优化,主要是对 排序合并上做的一些优化。spark shuffle 分为两种 hash shuffle 和sort shuffle 。从时间发展上看
    image.png
    最开始引入 hash shuffle 是为了避免排序问题,但是在每个 maper 阶段的 task 都会为每个 reduce 产生一个独立的文件,这样 最终会产生 M R ( M 表示 Mapper 阶段的 Task 个数, R 表示 Reduce 阶段的 Task 个数)个文件 ,产生大量的随机磁盘 IO 和内存开销,为了解决这个问题,在 0.8 版本引入 shuffle consolidata 机制(文件合并机制),将每个 执行单位(map 端 core数 / 分配的 task cores 默认是1),为每个 reduce 阶段的task 生成一个。
    生成的文件数据都会依赖于 reduce 端 task 的数据,因此文件数目仍然是不可控的,为了更好的解决这个问题,在 spark 1.1 版本引入 sort shuffle ,并且在1.2版本之后,将默认的实现方式设定为它。在基于 sort 的shuffle 中,他不会为每个 reduce端的 Task 生成一个单独的文件,而是生成一个整体的文件Data,还有一个索引文件 ,reduce端的 task 可以通过索引文件获取相关数据,这样的话 最终生成的文件数目是 2
    M(M 为 map 阶段 Task 的数目)。文件数目虽然是减少了,但是它必须在 map端进行排序,速度就满了,为了解决这个问题。在1.4 版本之后,引入了新的 基于 Tungsten-sort 的shuffle 机制,可以大大提高数据处理能力。Tungsten-sort 目前已经并入到 sort shuffle 中。

    1. hash shuffle writer

    image.png

    2. hash shuffle 优化(合并)

    为了优化 HashShuffleManager 我们可以设置一个参数:spark.shuffle.consolidateFiles,该参数默认值为 false,将其设置为 true 即可开启优化机制,通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项
    开启 consolidate 机制之后,在 shuffle write 过程中,task 就不是为下游 stage 的每个 task 创建一个磁盘文件了,此时会出现 shuffleFileGroup 的概念,每个 shuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量与下游 stage 的 task 数量是相同的。一个 Executor 上有多少个 cpu core,就可以并行执行多少个 task。而第一批并行执行的每个 task 都会创建一个 shuffleFileGroup,并将数据写入对应的磁盘文件内。
    当 Executor 的 cpu core 执行完一批 task,接着执行下一批 task 时,下一批 task 就会复用之前已有的 shuffleFileGroup,包括其中的磁盘文件,也就是说,此时 task 会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate 机制允许不同的 task 复用同一批磁盘文件,这样就可以有效将多个 task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能
    image.png

    3. sort shuffle

    它有三个运行机制。普通运行机制,bypass 机制,tungsten - sort 机制
    1. 普通运行机制
    将数据写入到内存中,根据算子的不同,选用不同的数据结构,如 : 聚合类算子 ,reduce by key ,会一遍进行 map 聚合,一遍写入到内存中;如果是 普通算子 例如join 就会选用 ArrayList 直接写入到内存中。当内存中达到临界值的时候,就会对其进行排序,排序完成后,分批 ,按照每批 1 万条数据通过 java 的buffered output stream 写入到磁盘中,会执行多次溢写过程,最后将所有的文件形成合并,同时生成索引文件。
    image.png
    2. bypass 模式
    Reducer 端任务数比较少的情况下,基于 Hash Shuffle 实现机制明显比基于 Sort Shuffle 实现机制要快,因此基于 Sort Shuffle 实现机制提供了一个带 Hash 风格的回退方案,就是 bypass 运行机制。对于 Reducer 端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold设置的个数时,使用带 Hash 风格的回退计划。
    触发条件:
  • shuffle map task 数量小于spark.shuffle.sort.bypassMergeThreshold= 200 参数的值。
  • 不是聚合类算子

跟没有优化的 hash shuffle 一样,也是会创建惊人的文件数。首先 每个 task 会为下游的每页 task 创建一个文件,通过 hash 计算 key 的数值,写入到内存缓冲中,最后写入到对应的磁盘文件中,最后未将所有的临时文件合并成一个文件,并创建索引。
与普通 sort-shuffle 区别在于 :

  • 磁盘书写机制不同
  • 不需要排序。

image.png

3. Tungsten -sort shuffle 机制

它是对普通机制的一种优化,优化的内容是排序,本来排序是针对数据进行操作,它变成针对指针数组进行排序,实现了直接对序列化后的二进制文件进行排序。
在某些情况下,即使设置了 Tungsten -sort shuffle 机制,也不一定执行。主要是因为

  1. Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求。
  2. Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)
  3. Shuffle 过程中的输出分区个数少于 16777216 个。

对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort 方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制,当该方法返回的条件不满足时,则通过SortShuffleManager.canUseSerializedShuffle 方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制。

五、Spark 核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构:

  • RDD :弹性分布式数据集
  • 累加器:分布式共享只写变量
  • 广播变量: 分布式共享只读变量

    1. RDD

    1. 简介

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

  • 弹性:

    • 存储的弹性: 支持内存和磁盘自动切换
    • 容错的弹性:数据丢失后可以自动恢复
    • 计算的弹性: 计算出错后可以自动重试
    • 分片的弹性: 可以根据需要重新分片
  • 分布式: 数据存储在大数据集群上不同的节点上
  • 数据集: RDD 只是封装了计算逻辑,并不保存数据
  • 数据抽象: RDD 是一个抽象类,需要具体的子类去实现
  • 不可变: RDD 封装了计算逻辑,是不可改变了,如果想要改变,只能产生新的 Rdd,在新的 RDD中 封装计算逻辑
  • 可分区,并行计算

    2. 核心属性

    五大核心数据
    image.png

  • 分区列表

    RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式的重要属性。
    image.png

  • 分区计算函数

Spark 在计算时,会对每一个分区使用分区函数进行计算
image.png

  • RDD 之间的依赖关系

RDD 只是封装了计算的模型,如果需求需要将多个数据模型进行组合是,就需要建立多个 RDD 之间的依赖
image.png

  • 分区器 (可选)

如果数据时 key-value 类型时,可以通过设定分区器自定义数据的分区
image.png

  • 首选位置(可选)

计算数据时,可以根据节点的状态选择不同节点位置进行计算
image.png

3. 执行流程

从数据处理的角度上看 ,数据处理需要计算资源(内存 CPU)和计算模型(逻辑)执行时,将计算模型和计算逻辑进行协调和整合
Spark 在执行时,先申请资源,然后将计算处理逻辑分解成一个个计算任务,分配到已分配资源的节点上,按照指定的计算模型进行数据计算,最后得到结果集。
具体执行过程吐下:

  1. 启动Yarn 集群环境

image.png

  1. Spark 通过申请资源 创建调度节点和计算节点

image.png

  1. Spark 根据需求 将计算逻辑根据分区划分为不同的任务

image.png

  1. 调度节点将任务根据计算节点状态发送到对应的计算节点上进行计算

image.png
从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给Executor 节点执行计算。

4. Rdd 创建

1. 创建 RDD

在Spark 中创建 RDD 共有四种方式,从内存(集合)中创建、从外部存储(文件)中创建、从其他 RDD 创建、直接创建(New)

1. 从集合(内存)中创建

从集合中创建 RDD 提供了两种方法 :parallelize 、 makeRDD

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark");
    val context = new SparkContext(sparkConf)

    val rdd_p = context.parallelize(List(1, 2, 3, 4))
    val rdd_m = context.makeRDD(List(5, 6, 7, 8))

    rdd_p.collect().foreach(print)
    rdd_m.collect().foreach(print)
    context.stop()
  }
从底层代码上看 ,makeRDD 也是调用的 parallelize
def makeRDD[T: ClassTag](
 seq: Seq[T],
 numSlices: Int = defaultParallelism): RDD[T] = withScope {
 parallelize(seq, numSlices)
}

2. 从外部存储(文件)中创建

外部的存储文件可以是 本地文件系统、hadoop 支持的数据集(HDFS、HBase)

    def main(args: Array[String]): Unit = {

        // TODO 准备环境
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(sparkConf)

        // TODO 创建RDD
        // 从文件中创建RDD,将文件中的数据作为处理的数据源
        // path路径默认以当前环境的根路径为基准。可以写绝对路径,也可以写相对路径
        //sc.textFile("D:\\mineworkspace\\idea\\classes\\atguigu-classes\\datas\\1.txt")
        //val rdd: RDD[String] = sc.textFile("datas/1.txt")
        // path路径可以是文件的具体路径,也可以目录名称
        //val rdd = sc.textFile("datas")
        // path路径还可以使用通配符 *
        //val rdd = sc.textFile("datas/1*.txt")
        // path还可以是分布式存储系统路径:HDFS
        val rdd = sc.textFile("hdfs://linux1:8020/test.txt")
        rdd.collect().foreach(println)

        // textFile : 以行为单位来读取数据,读取的数据都是字符串
        // wholeTextFiles : 以文件为单位读取数据
        //    读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
        // val rdd = sc.wholeTextFiles("datas")

        rdd.collect().foreach(println)


        // TODO 关闭环境
        sc.stop()
    }

3. 从其他 RDD 中创建

主要是通过 RDD 运算完成后,产生新的RDD

4. 直接创建

直接使用 new的方式构造 RDD ,一般是 Spark 框架自身使用

5. 并行度与分析

默认情况下,Spark 可以将一个任务划分为多个任务后,发送到 Executor 及地点进行计算,能够并行计算的任务数量我们称之为 并行度。这个数量我们可以在构建 RDD 的时候指定,技术这里说的是并行执行任务的数量,而不是切分任务的数量。

val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark") 
val sparkContext = new SparkContext(sparkConf)

val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4), 4)
val fileRDD: RDD[String] = sparkContext.textFile("input", 2)

fileRDD.collect().foreach(println) 
sparkContext.stop()
在读取内存数据的时候,数据可以按照并行度的设定进行数据分区操作,源码如下
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
 (0 until numSlices).iterator.map { i =>
 val start = ((i * length) / numSlices).toInt
 val end = (((i + 1) * length) / numSlices).toInt
 (start, end)
 }
}
 在读取文件数据时,数据时按照 Hadoop 文件读取规则进行切片分区,而切片规则和数据读取规则有些差异
public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    long totalSize = 0; // compute total size
    for (FileStatus file: files) { // check we have valid files
        if (file.isDirectory()) {
            throw new IOException("Not a file: "+ file.getPath());
        }
        totalSize += file.getLen();
    }
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
                                        FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    ...

        for (FileStatus file: files) {

            ...

                if (isSplitable(fs, path)) {
                    long blockSize = file.getBlockSize();
                    long splitSize = computeSplitSize(goalSize, minSize, blockSize);
                    ...
                    }
            protected long computeSplitSize(long goalSize, long minSize,
                                            long blockSize) {
                return Math.max(minSize, Math.min(goalSize, blockSize));
 }

6. RDD 转换算子

根据数据处理方式 不同,将转换算子分为 Value 类型、双Value 类型、key-Value类型

1. Value 类型

1. map
  • 函数 : def mapU: ClassTag: RDD[U]
  • 说明 :将待处理的数据逐条进行映射转换,可以使值的转换,也可以是类型的转换
  • 案例 : ```scala val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4)) val dataRDD1: RDD[Int] = dataRDD.map( num => { num * 2 } ) val dataRDD2: RDD[String] = dataRDD1.map( num => { “” + num } )
<a name="HBZSq"></a>
###### 2. mapPartitions

   - 函数 : def mapPartitions[U: ClassTag](

 f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false ): RDD[U]

   - 说明 : 将待处理的数据一分区为单位发送到计算节点进行处理,这里的处理可以使进行任意的处理
   - 案例 : 
```scala
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("operation")
    val sc: SparkContext = new SparkContext(config)
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)


    val rddmax: RDD[Int] = rdd.mapPartitions(inter => {
      Array(inter.max).iterator
    })
    rddmax.collect().foreach(println)


    sc.stop()
  }

map 和 mapPartitions 的区别?

  • 数据处理的角度:
    • map 算子 是将分区内的数据一条一条的执行,类似于串行操作
    • MapPartitions 是将分区内所有的数据都加载到内存中 统一处理,它的处理单位是分区
  • 从功能的角度:
    • Map 算子主要进行数据的转换和改变,但是不会减少或者增多数据
    • MapPartitions 算子需要一个迭代器,返回也是个迭代器,两者之间数据个数可以变化
  • 性能的角度:

    • Map 类似于串行操作,性能较低
    • MapPartitions 类似于批处理,性能较高,但是会长时间占用内存,导致内存溢出
      3. mapPartitionsWithIndex :
  • 函数 : def mapPartitionsWithIndex[U: ClassTag](

    f: (Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U]

  • 说明 : 将待处理的数据以分区为单位发送到计算节点进行处理,在处理的同时可以获取当前分区索引

  • 案例 : ```scala def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“operation”) val sc: SparkContext = new SparkContext(config)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 4) rdd.mapPartitionsWithIndex( (index, datas) => { datas.map(x => {

     (index, x)
    

    }) } ).collect().foreach(println)

    // (0,1) (1,2) (1,3) (2,4) (3,5) (3,6)

    sc.stop() }

// 只要第二个分数的数据 val rdd = sc.makeRDD(List(1,2,3,4), 2) // 【1,2】,【3,4】 val mpiRDD = rdd.mapPartitionsWithIndex( (index, iter) => { if ( index == 1 ) { iter } else { Nil.iterator } } )

<a name="O80OA"></a>
###### 4. flatMap

   - 函数 :  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
   - 说明 :   将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
   - 案例 : 
```scala
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("operation")
    val sc: SparkContext = new SparkContext(config)
    //    val rdd: RDD[String] = sc.makeRDD(List("hello world", "hello saprk", "hello allen"))
    //    rdd.flatMap(x => x.split(" ")).collect().foreach(println)
    val data = List(List(1, 2), 3, List(4, 5))
    val rdd = sc.makeRDD(data)
    rdd.flatMap(x => x match {
      case x: List[Int] => x.map(x => 2 * x)
      case x: Int => List[Int](x)
    }).collect().foreach(println)
    // 2 4 3 8 10
    sc.stop()
  }

5. glom
  • 函数 : def glom(): RDD[Array[T]]
  • 说明 :将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
  • 案例:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("operation")
    val sc: SparkContext = new SparkContext(config)
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    val glomRdd:RDD[Array[Int]] = rdd.glom()   // [1,2] [3.4]
    val partitionMax = glomRdd.map(arrayInt => {
     arrayInt.max
    })
    println(partitionMax.collect.sum)   // 6
    sc.stop()
    

    6. groupBy
  • 函数 : def groupByK(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

  • 说明 : 对每条数据重新进行分区,相同key 的放到同一个分区,分区总数不变
  • 案例 : 将 List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。

    def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("operation")
    val sc: SparkContext = new SparkContext(config)
    val rdd: RDD[String] = sc.makeRDD(List("hello", "spark", "scala", "hadoop"), 2)
    
    val group: RDD[(Char, Iterable[String])] = rdd.groupBy(x => {
     x.charAt(0)
    })
    group.collect().foreach(println)
    //    group.saveAsTextFile("output")
    sc.stop()
    }
    

    7. filter
  • 函数 : def filter(f: T => Boolean): RDD[T]

  • 说明 : 根据规则对数据进行过滤(可能会出现数据倾斜)
  • 案例 : 过滤奇数,留下偶数

    def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("operation")
    val sc: SparkContext = new SparkContext(config)
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    rdd.filter(_%2==0).collect().foreach(println)
    // 2 4 
    sc.stop()
    }
    

    8. sample
  • 函数 : def sample(

    withReplacement: Boolean,
    fraction: Double,
    seed: Long = Utils.random.nextLong): RDD[T]

  • 说明 : 从指定规则中抽取数据

  • 案例 :

    val dataRDD = sparkContext.makeRDD(List(
    1,2,3,4
    ),1)
    // 抽取数据不放回(伯努利算法)
    // 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
    // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不
    要
    // 第一个参数:抽取的数据是否放回,false:不放回
    // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
    // 第三个参数:随机数种子  如果不传递第三个参数,那么使用的是当前系统时间
    val dataRDD1 = dataRDD.sample(false, 0.5)
    // 抽取数据放回(泊松算法)
    // 第一个参数:抽取的数据是否放回,true:放回;false:不放回
    // 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
    // 第三个参数:随机数种子 如果不传递第三个参数,那么使用的是当前系统时间
    val dataRDD2 = dataRDD.sample(true, 2)   // 1 1 2 2 3 3 4 4
    

    9. distinct
  • 函数 : def distinct()(implicit ord: Ordering[T] = null): RDD[T]

    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

  • 说明 :数据去重

  • 案例 :

    def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("operation")
    val sc: SparkContext = new SparkContext(config)
    val value = sc.makeRDD(Array(1, 2, 3, 1, 2, 3))
    println(value.distinct().collect().mkString(","))   // 1 2 3
    println(value.distinct(2).collect().mkString(","))   // 2 1  3
    sc.stop()
    }
    

    10. coalesce
  • 函数 : def coalesce(numPartitions: Int, shuffle: Boolean = false,

    partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
    (implicit ord: Ordering[T] = null)
    : RDD[T]

  • 说明 : 根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

  • 案例 :

    def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("operation")
    val sc: SparkContext = new SparkContext(config)
    val rdd = sc.makeRDD(Array(1, 2, 3, 4), 4)
    rdd.saveAsTextFile("outpath0")  // 4 个 文件 
    rdd.coalesce(2).saveAsTextFile("outpath1")   // 两个文件
    sc.stop()
    }
    

    11. reparation
  • 函数 :def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

  • 说明 :内部执行的也是 coalesce,但是无论是扩大分区还是缩小分区都会执行 shuffle
  • 案例 :

    def main(args: Array[String]): Unit = {
    
       val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
       val sc = new SparkContext(sparkConf)
    
       val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2)
    
       // coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
       // 所以如果想要实现扩大分区的效果,需要使用shuffle操作
       // spark提供了一个简化的操作
       // 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
       // 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
       //val newRDD: RDD[Int] = rdd.coalesce(3, true)
       val newRDD: RDD[Int] = rdd.repartition(3)
       newRDD.saveAsTextFile("output")
       sc.stop()
    
    }
    

    12. sortBy
  • 函数 : def sortByK => K,

    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

  • 说明 : 对数据进行排序。排序之前 进行 f 函数操作,对操作的结果进行排序,默认升序(true),排序前后分区数不变,但是存在 shuffle 过程

  • 案例:

    def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("operation")
    val sc: SparkContext = new SparkContext(config)
    //    val rdd: RDD[Int] = sc.makeRDD(Array(4, 1, 5, 2, 6, 3))
    //    rdd.sortBy(x =>x).collect().foreach(println)
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("4", 4), ("1", 1), ("5", 5), ("2", 2), ("6", 6), ("3", 3)))
    rdd.sortBy(x => x._1, false).collect().foreach(println)     
    // (6,6) (5,5) (4,4) (3,3) (2,2) (1,1)
    sc.stop()
    }
    

    2. 双 Value 类型

    1. intersection
  • 函数 : def intersection(other: RDD[T]): RDD[T]

  • 说明 : 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD (数据类型保持一致)
  • 案例 :

    val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
    val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
    val dataRDD = dataRDD1.intersection(dataRDD2)   // 3 4
    

    2. union
  • 函数 : def union(other: RDD[T]): RDD[T]

  • 说明 : 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD(数据类型保持一致)
  • 案例 :

    val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
    val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
    val dataRDD = dataRDD1.union(dataRDD2)  // 【1,2,3,4,3,4,5,6】
    

    3. subtract
  • 函数 : def subtract(other: RDD[T]): RDD[T]

  • 说明 :以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集(数据类型保持一致)
  • 案例 :

    val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
    val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
    val dataRDD = dataRDD1.subtract(dataRDD2)   // 【1,2】
    

    4. zip
  • 函数 : def zipU: ClassTag: RDD[(T, U)]

  • 说明 : 将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
  • 案例 :

    3. Key-Value 类型

    1. partitionBy
  • 函数 : def partitionBy(partitioner: Partitioner): RDD[(K, V)]

  • 说明 : 将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner
  • 案例 :

       val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
       val sc = new SparkContext(sparkConf)
       // TODO 算子 - (Key - Value类型)
       val rdd = sc.makeRDD(List(1,2,3,4),2)
       val mapRDD:RDD[(Int, Int)] = rdd.map((_,1))
       // RDD => PairRDDFunctions
       // 隐式转换(二次编译)
    
       // partitionBy根据指定的分区规则对数据进行重分区
       val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
       newRDD.partitionBy(new HashPartitioner(2))
       newRDD.saveAsTextFile("output")
       sc.stop()
    

    2.reduceByKey
  • 函数 : def reduceByKey(func: (V, V) => V): RDD[(K, V)]

    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

  • 说明 : 可以将数据按照相同的 Key 对 Value 进行聚合

  • 案例 :

      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
      val rdd = sc.makeRDD(List(
           ("a", 1), ("a", 2), ("a", 3), ("b", 4)
       ))
       // reduceByKey : 相同的key的数据进行value数据的聚合操作
       // scala语言中一般的聚合操作都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合
       // 【1,2,3】
       // 【3,3】
       // 【6】
       // reduceByKey中如果key的数据只有一个,是不会参与运算的。
       val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey( (x:Int, y:Int) => {
           println(s"x = ${x}, y = ${y}")
           x + y
       } )
       reduceRDD.collect().foreach(println)
       sc.stop()
    

    3. groupByKey
  • 函数 : def groupByKey(): RDD[(K, Iterable[V])]

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

  • 说明 :将数据源的数据根据 key 对 value 进行分组
  • 案例 :

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    
    // TODO 算子 - (Key - Value类型)
    
    val rdd = sc.makeRDD(List(
     ("a", 1), ("a", 2), ("a", 1), ("b", 4)
    ))
    
    // groupByKey : 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
    //              元组中的第一个元素就是key,
    //              元组中的第二个元素就是相同key的value的集合
    val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    groupRDD.collect().foreach(println)  
    // (a,CompactBuffer(1, 2, 3))  (b,CompactBuffer(4))
    
    val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1) 
    groupRDD1.collect().foreach(println)   
    // (a,CompactBuffer((a,1), (a,2), (a,3))) (b,CompactBuffer((b,4))
    
    sc.stop()
    }
    

    groupByKey 和 reduceByKey 区别:

  1. 从 shuffle 角度上看

两者都存在 shuffle 操作,但是 reduceByKey 可以在shuffle 之前对相同的key进行 预聚合功能,减少落盘的数据量,而 groupBYKey 只进行分组操作,数据量不变

  1. 从功能角度上看

reduceByKey 包含分组和聚合功能,groupBYKey 只能分组,不能聚合

4. aggregateByKey
  • 函数 : def aggregateByKeyU: ClassTag(seqOp: (U, V) => U,

    combOp: (U, U) => U): RDD[(K, U)]

  • 说明 : 将数据根据不同的规则进行分区内计算和分区间计算

  • 案例 : 取出每个分区内相同 key 的最大值然后分区间相加

    // aggregateByKey 算子是函数柯里化,存在两个参数列表
    // 1. 第一个参数列表中的参数表示初始值
    // 2. 第二个参数列表中含有两个参数
    // 2.1 第一个参数表示分区内的计算规则
    // 2.2 第二个参数表示分区间的计算规则
    val rdd =
    sc.makeRDD(List(
    ("a",1),("a",2),("c",3),
    ("b",4),("c",5),("c",6)
    ),2)
    // 0:("a",1),("a",2),("c",3) => (a,10)(c,10)
    // => (a,10)(b,10)(c,20)
    // 1:("b",4),("c",5),("c",6) => (b,10)(c,10)
    // 分区内去max 值 ,分区间 相加 
    val resultRDD =
    rdd.aggregateByKey(10)(
    (x, y) => math.max(x,y),    
    (x, y) => x + y
    )
    resultRDD.collect().foreach(println)
    

    5. foldByKey
  • 函数 : def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

  • 说明 : 当 aggregateByKey 的分区间计算规则和分区内计算规则相同时,可以简化为 foldByKey
  • 案例 :

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    
    val rdd = sc.makeRDD(List(
     ("a", 1), ("a", 2), ("b", 3),
     ("b", 4), ("b", 5), ("a", 6)
    ),2)
    
    //rdd.aggregateByKey(0)(_+_, _+_).collect.foreach(println)
    
    // 如果聚合计算时,分区内和分区间计算规则相同,spark提供了简化的方法
    rdd.foldByKey(0)(_+_).collect.foreach(println)   // (b,12) (a,9)
    sc.stop()
    

    6. combineByKey
  • 函数 : def combineByKey[C](

    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C): RDD[(K, C)]

  • 说明 :最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。

  • 案例 :计算平均值

    
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    
    // TODO 算子 - (Key - Value类型)
    
    val rdd = sc.makeRDD(List(
     ("a", 1), ("a", 2), ("b", 3),
     ("b", 4), ("b", 5), ("a", 6)
    ), 2)
    
    // combineByKey : 方法需要三个参数
    // 第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
    // 第二个参数表示:分区内的计算规则
    // 第三个参数表示:分区间的计算规则
    val newRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(
     v => (v, 1),
     (t: (Int, Int), v) => {
       (t._1 + v, t._2 + 1)
     },
     (t1: (Int, Int), t2: (Int, Int)) => {
       (t1._1 + t2._1, t1._2 + t2._2)
     }
    )
    
    val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
     case (num, cnt) => {
       num / cnt
     }
    }
    resultRDD.collect().foreach(println)
    
    sc.stop()
    
  • reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?

    • reduceByKey : 相同key 的进行运算,第一个数据不计算。分区内和分区间计算规则相同
    • foldByKey : 相同的key进行运算,第一个 数据和初始值进行计算,分区内和分区间计算规则相同
    • aggregateByKey : 相同的key进行运算,第一个 数据和初始值进行计算,分区内和分区间计算规则可以不相同
    • combineByKey : 计算时,如果待计算的数据结构不满足需要,进行映射后 再次进行操作,分区内操作和分区间操作可以不相同 ```scala val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“Operator”) val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(

         ("a", 1), ("a", 2), ("b", 3),
         ("b", 4), ("b", 5), ("a", 6)
      

      ),2)

      /* reduceByKey:

          combineByKeyWithClassTag[V](
              (v: V) => v, // 第一个值不会参与计算
              func, // 分区内计算规则
              func, // 分区间计算规则
              )
      

      foldByKey:

         combineByKeyWithClassTag[V](
             (v: V) => cleanedFunc(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
             cleanedFunc,  // 分区内计算规则
             cleanedFunc,  // 分区间计算规则
             )
      

      aggregateByKey :

         combineByKeyWithClassTag[U](
             (v: V) => cleanedSeqOp(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
             cleanedSeqOp, // 分区内计算规则
             combOp,       // 分区间计算规则
             )
      

      combineByKey :

         combineByKeyWithClassTag(
             createCombiner,  // 相同key的第一条数据进行的处理函数
             mergeValue,      // 表示分区内数据的处理函数
             mergeCombiners,  // 表示分区间数据的处理函数
             )
      
      */
      
    rdd.reduceByKey(_+_) // wordcount
    rdd.aggregateByKey(0)(_+_, _+_) // wordcount
    rdd.foldByKey(0)(_+_) // wordcount
    rdd.combineByKey(v=>v,(x:Int,y)=>x+y,(x:Int,y:Int)=>x+y) // wordcount

    sc.stop()

}
<a name="a3w1A"></a>
###### 7. sortByKey

   - 函数 : def sortByKey(ascending: Boolean = true, 

numPartitions: Int = self.partitions.length) : RDD[(K, V)]

   - 说明 : 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序
   - 案例 : 
```scala
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)

8. join
  • 函数 : def joinW]): RDD[(K, (V, W))]
  • 说明 : 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD
  • 案例 :

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    
    // TODO 算子 - (Key - Value类型)
    
    val rdd1 = sc.makeRDD(List(
     ("a", 1), ("a", 2), ("c", 3),("d",4)
    ))
    
    val rdd2 = sc.makeRDD(List(
     ("a", 5), ("c", 6),("a", 4)
    ))
    
    // join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组
    //        如果两个数据源中key没有匹配上,那么数据不会出现在结果中
    //        如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。
    val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
    
    joinRDD.collect().foreach(println)
    // (a,(1,5) (a,(1,4)) (a,(2,5)) (a,(2,4)) (c,(3,6))
    
    sc.stop()
    

    9. leftOuterJoin / rightOuterJoin
  • 函数 : def leftOuterJoinW]): RDD[(K, (V, Option[W]))]

  • 说明 : 类似于 SQL 语句的左(右)外连接
  • 案例 : ```scala

    val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“Operator”) val sc = new SparkContext(sparkConf)

    // TODO 算子 - (Key - Value类型)

    val rdd1 = sc.makeRDD(List(

       ("a", 1), ("b", 2)//, ("c", 3)
    

    ))

    val rdd2 = sc.makeRDD(List(

       ("a", 4), ("b", 5),("c", 6)
    

    )) //val leftJoinRDD = rdd1.leftOuterJoin(rdd2) val rightJoinRDD = rdd1.rightOuterJoin(rdd2)

    //leftJoinRDD.collect().foreach(println) rightJoinRDD.collect().foreach(println)

    sc.stop()
<a name="Abffp"></a>
###### 10.cogroup

   - 函数 :def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
   - 说明 :在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD
   - 使用 :
```scala
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)

7. 行动算子

1. reduce
  • 函数 : def reduce(f: (T, T) => T): T
  • 说明 :聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
  • 案例:

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 聚合数据
    val reduceResult: Int = rdd.reduce(_+_)
    

    2. collect
  • 函数 : def collect(): Array[T]

  • 说明 :在驱动程序中,以数组 Array 的形式返回数据集的所有元素
  • 案例:

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 收集数据到 Driver
    rdd.collect().foreach(println)
    

    3. count
  • 函数 :def count(): Long

  • 说明 :返回 RDD 中元素的个数
  • 案例 :

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 返回 RDD 中元素的个数
    val countResult: Long = rdd.count()
    

    4. first
  • 函数 : def first(): T

  • 说明 :返回 RDD 中的第一个元素
  • 案例:

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 返回 RDD 中元素的个数
    val firstResult: Int = rdd.first()
    println(firstResult)
    

    5. take
  • 函数 : def take(num: Int): Array[T]

  • 说明 :返回 RDD 前 n 个元素组成的数组
  • 案例:

    vval rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 返回 RDD 中元素的个数
    val takeResult: Array[Int] = rdd.take(2)
    println(takeResult.mkString(","))
    

    6. takeOrdered
  • 函数 : def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

  • 说明 :返回该 RDD 排序后的前 n 个元素组成的数组
  • 案例:

    val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
    // 返回 RDD 中元素的个数
    val result: Array[Int] = rdd.takeOrdered(2)
    

    7. aggregate
  • 函数 : def aggregateU: ClassTag(seqOp: (U, T) => U, combOp: (U, U) => U): U

  • 说明 :分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
  • 案例:

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
    // 将该 RDD 所有元素相加得到结果
    //val result: Int = rdd.aggregate(0)(_ + _, _ + _)
    val result: Int = rdd.aggregate(10)(_ + _, _ + _)
    

    8. fold
  • 函数 :def fold(zeroValue: T)(op: (T, T) => T): T

  • 说明 :折叠操作,aggregate 的简化版操作
  • 案例:

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
    val foldResult: Int = rdd.fold(0)(_+_)
    

    9. countByKey
  • 函数 : def countByKey(): Map[K, Long]

  • 说明 :统计每种 key 的个数
  • 案例: ```scala val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, “a”), (1, “a”), (1, “a”), (2, “b”), (3, “c”), (3, “c”))) // 统计每种 key 的个数 val result: collection.Map[Int, Long] = rdd.countByKey()
<a name="zkjeP"></a>
##### 10. save 相关算子

   - 函数 :     def saveAsTextFile(path: String): Unit

def saveAsObjectFile(path: String): Unit<br />def saveAsSequenceFile(<br /> path: String,<br /> codec: Option[Class[_ <: CompressionCodec]] = None): Unit

   - 说明 :将数据保存到不同格式的文件中
   - 案例: 
```scala
// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件
rdd.map((_,1)).saveAsSequenceFile("output2")

11. foreach
  • 函数 : def foreach(f: T => Unit): Unit = withScope {

    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }

  • 说明 :分布式遍历 RDD 中的每一个元素,调用指定函数

  • 案例:

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 收集后打印
    rdd.map(num=>num).collect().foreach(println)
    println("****************")
    // 分布式打印
    rdd.foreach(println)
    

    8. 序列化

    1. 闭包检查

    从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变

    2. 序列化方法和属性

    ```scala object serializable02_function { def main(args: Array[String]): Unit = { //1.创建 SparkConf 并设置 App 名称 val conf: SparkConf = new SparkConf().setAppName(“SparkCoreTest”).setMaster(“local[*]”)

    //2.创建 SparkContext,该对象是提交 Spark App 的入口 val sc: SparkContext = new SparkContext(conf)

    //3.创建一个 RDD val rdd: RDD[String] = sc.makeRDD(Array(“hello world”, “hello spark”, “hive”, “atguigu”))

    //3.1 创建一个 Search 对象 val search = new Search(“hello”)

    //3.2 函数传递,打印:ERROR Task not serializable search.getMatch1(rdd).collect().foreach(println)

    //3.3 属性传递,打印:ERROR Task not serializable search.getMatch2(rdd).collect().foreach(println)

    //4.关闭连接 sc.stop() } } class Search(query:String) extends Serializable { def isMatch(s: String): Boolean = { s.contains(query) }

    // 函数序列化案例 def getMatch1 (rdd: RDD[String]): RDD[String] = { //rdd.filter(this.isMatch) rdd.filter(isMatch) }

    // 属性序列化案例 def getMatch2(rdd: RDD[String]): RDD[String] = { //rdd.filter(x => x.contains(this.query)) rdd.filter(x => x.contains(query)) //val q = query //rdd.filter(x => x.contains(q)) } }

<a name="KvLcJ"></a>
##### 3. Kryo 序列化框架
Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化
```scala
object serializable_Kryo {
 def main(args: Array[String]): Unit = {

   val conf: SparkConf = new SparkConf()
         .setAppName("SerDemo")
         .setMaster("local[*]")
       // 替换默认的序列化机制
         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       // 注册需要使用 kryo 序列化的自定义类
         .registerKryoClasses(Array(classOf[Searcher]))

   val sc = new SparkContext(conf)

   val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)

   val searcher = new Searcher("hello")

   val result: RDD[String] = searcher.getMatchedRDD1(rdd)

   result.collect.foreach(println)
 }
}

case class Searcher(val query: String) {
 def isMatch(s: String) = {
   s.contains(query)
 }

 def getMatchedRDD1(rdd: RDD[String]) = {
   rdd.filter(isMatch) 
 }

 def getMatchedRDD2(rdd: RDD[String]) = {
   val q = query
   rdd.filter(_.contains(q))
 }
}

9. RDD 依赖关系

1. RDD 血缘关系
 RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
    val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparConf)

    val lines: RDD[String] = sc.textFile("datas/word.txt")
      // (1) datas/word.txt MapPartitionsRDD[1] at textFile at demo15_sortBy.scala:148 []
    // |  datas/word.txt HadoopRDD[0] at textFile at demo15_sortBy.scala:148 []
    println(lines.toDebugString)
    println("*************************")

    val words: RDD[String] = lines.flatMap(_.split(" "))
    // (1) MapPartitionsRDD[2] at flatMap at demo15_sortBy.scala:151 []
    // |  datas/word.txt MapPartitionsRDD[1] at textFile at demo15_sortBy.scala:148 []
    // |  datas/word.txt HadoopRDD[0] at textFile at demo15_sortBy.scala:148 []
    println(words.toDebugString)
    println("*************************")

    val wordToOne = words.map(word=>(word,1))
    // (1) MapPartitionsRDD[3] at map at demo15_sortBy.scala:154 []
    // |  MapPartitionsRDD[2] at flatMap at demo15_sortBy.scala:151 []
    // |  datas/word.txt MapPartitionsRDD[1] at textFile at demo15_sortBy.scala:148 []
    // |  datas/word.txt HadoopRDD[0] at textFile at demo15_sortBy.scala:148 []
    println(wordToOne.toDebugString)
    println("*************************")

    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)
    // (1) ShuffledRDD[4] at reduceByKey at demo15_sortBy.scala:157 []
    // +-(1) MapPartitionsRDD[3] at map at demo15_sortBy.scala:154 []
    //    |  MapPartitionsRDD[2] at flatMap at demo15_sortBy.scala:151 []
    //    |  datas/word.txt MapPartitionsRDD[1] at textFile at demo15_sortBy.scala:148 []
    //    |  datas/word.txt HadoopRDD[0] at textFile at demo15_sortBy.scala:148 []
    //*************************
    println(wordToSum.toDebugString)
    println("*************************")

    val array: Array[(String, Int)] = wordToSum.collect()
    array.foreach(println)

    sc.stop()

2. RDD 依赖关系

这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系

    val sparConf = new SparkConf().setMaster("local").setAppName("Dep")
    val sc = new SparkContext(sparConf)

    val lines: RDD[String] = sc.textFile("datas/word.txt")
    // List(org.apache.spark.OneToOneDependency@7e74a380)
    println(lines.dependencies)
    println("*************************")

    val words: RDD[String] = lines.flatMap(_.split(" "))
    // List(org.apache.spark.OneToOneDependency@18d910b3
    println(words.dependencies)
    println("*************************")

    val wordToOne = words.map(word=>(word,1))
    // List(org.apache.spark.OneToOneDependency@a9f023e)
    println(wordToOne.dependencies)
    println("*************************")

    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)
    // List(org.apache.spark.ShuffleDependency@6cbe7d4d)
    println(wordToSum.dependencies)
    println("*************************")

    val array: Array[(String, Int)] = wordToSum.collect()
    array.foreach(println)

    sc.stop()

3. 窄依赖

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)

4. 宽依赖

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
 @transient private val _rdd: RDD[_ <: Product2[K, V]],
 val partitioner: Partitioner,
 val serializer: Serializer = SparkEnv.get.serializer,
 val keyOrdering: Option[Ordering[K]] = None,
 val aggregator: Option[Aggregator[K, V, C]] = None,
 val mapSideCombine: Boolean = false)
 extends Dependency[Product2[K, V]]

5. 任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext即生成一个Application;
  • Job:一个Action算子就会生成一个Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task 每一层都是 1对 n 的关系。

10. RDD 持久化

1. RDD cache 缓存

RDD 通过 Cache / Persist 方法 将前面的计算结果进行缓存,默认情况下 会缓存到 JVM 的堆内存中,但是调用时,并不是直接缓存,而是触发后面的 action算子后,该 Rdd 会被缓存到计算节点的内存中。

  val sparConf = new SparkConf().setMaster("local").setAppName("Persist")
        val sc = new SparkContext(sparConf)

        val list = List("Hello Scala", "Hello Spark")

        val rdd = sc.makeRDD(list)

        val flatRDD = rdd.flatMap(_.split(" "))

        val mapRDD = flatRDD.map(word=>{
            (word,1)
        })
        // cache默认持久化的操作,只能将数据保存到内存中,如果想要保存到磁盘文件,需要更改存储级别
        //mapRDD.cache()

        // 持久化操作必须在行动算子执行时完成的。
        mapRDD.persist(StorageLevel.DISK_ONLY)

        val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
        reduceRDD.collect().foreach(println)
        println("**************************************")
        val groupRDD = mapRDD.groupByKey()
        groupRDD.collect().foreach(println)


        sc.stop()

持久化等级 StorageLevel.
image.png
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。

2. RDD CheckPoint 检查点

所谓的检查点其实就是通过将 RDD 中间结果写入磁盘
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点
之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
// 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input/1.txt")
// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
 word => {
 (word, System.currentTimeMillis())
 }
}
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()
// 触发执行逻辑
wordToOneRdd.collect().foreach(println)

3. 缓存和检查点区别
  • 缓存只是将数据保存起来,不会切断血缘依赖;checkPoint 检查点会切断血缘依赖
  • Cache 缓存数据通常存在磁盘、内存等地方,可靠性底。checkPoint 数据通常存放在 HDFS 中
  • 建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。

    11. RDD 分区器

    Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认
    分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分
    区,进而决定了 Reduce 的个数。

  • 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None

  • 每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

    1. Hash 分区:

    对于给定的 key,计算其 hashCode,并除以分区个数取余

    2. Range 分区:

    将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

    12 .RDD 文件的读取与保存

    Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

  • 文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件;

  • 文件系统分为:本地文件系统、HDFS、HBASE 以及数据库。

    1. text 文件
    // 读取输入文件
    val inputRDD: RDD[String] = sc.textFile("input/1.txt")
    // 保存数据
    inputRDD.saveAsTextFile("output")
    

    2. sequence 文件

    SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)。在 SparkContext 中,可以调用 sequenceFilekeyClass, valueClass

    // 保存数据为 SequenceFile
    dataRDD.saveAsSequenceFile("output")
    // 读取 SequenceFile 文件
    sc.sequenceFile[Int,Int]("output").collect().foreach(println)
    

    3. object 对象文件

    对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过 objectFileT: ClassTag函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

    // 保存数据
    dataRDD.saveAsObjectFile("output")
    // 读取数据
    sc.objectFile[Int]("output").collect().foreach(println)
    
          val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
          val sc = new SparkContext(sparConf)
    
          val rdd = sc.makeRDD(
              List(
                  ("a", 1),
                  ("b", 2),
                  ("c", 3)
              )
          )
    
          rdd.saveAsTextFile("output1")
          rdd.saveAsObjectFile("output2")
          rdd.saveAsSequenceFile("output3")
    
          sc.stop()
    
          val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
          val sc = new SparkContext(sparConf)
    
          val rdd = sc.textFile("output1")
          println(rdd.collect().mkString(","))
    
          val rdd1 = sc.objectFile[(String, Int)]("output2")
          println(rdd1.collect().mkString(","))
    
          val rdd2 = sc.sequenceFile[String, Int]("output3")
          println(rdd2.collect().mkString(","))
    
          sc.stop()
    

    2. 累加器

    1. 实现原理

    累加器可以把 Executor 端 变量的信息聚合到 Driver 端。用户在Driver 端定义的累加变量,在 Executor 端的每个 Task 都会得到一份这个变量的副本,每个Task 更新副本的值后,都会传给 Driver 端进行 merge。

    2. 系统累加器

      val sc = new SparkContext(sparConf)
    
      val rdd = sc.makeRDD(List(1,2,3,4))
    
      // reduce : 分区内计算,分区间计算
          //sc.doubleAccumulator
          //sc.collectionAccumulator
      var sum = sc.longAccumulator("sum")
      rdd.foreach(
        num => {
          sum.add(num)
        }
      )
    
      // 获取累加器的值
      // 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
      // 多加:转换算子中调用累加器,多次调用执行算子
      // 一般情况下,累加器会放置在行动算子进行操作
      println("sum = " + sum.value)
    
      sc.stop()
    

    3.自定义累加器

     /*
        自定义数据累加器:WordCount
    
        1. 继承AccumulatorV2, 定义泛型
           IN : 累加器输入的数据类型 String
           OUT : 累加器返回的数据类型 mutable.Map[String, Long]
    
        2. 重写方法(6)
       */
      class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
    
          private var wcMap = mutable.Map[String, Long]()
    
          // 判断是否初始状态
          override def isZero: Boolean = {
              wcMap.isEmpty
          }
    
          override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
              new MyAccumulator()
          }
    
          override def reset(): Unit = {
              wcMap.clear()
          }
    
          // 获取累加器需要计算的值
          override def add(word: String): Unit = {
              val newCnt = wcMap.getOrElse(word, 0L) + 1
              wcMap.update(word, newCnt)
          }
    
          // Driver合并多个累加器
          override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    
              val map1 = this.wcMap
              val map2 = other.value
    
              map2.foreach{
                  case ( word, count ) => {
                      val newCount = map1.getOrElse(word, 0L) + count
                      map1.update(word, newCount)
                  }
              }
          }
    
          // 累加器结果
          override def value: mutable.Map[String, Long] = {
              wcMap
          }
      }
    
    
          val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
          val sc = new SparkContext(sparConf)
    
          val rdd = sc.makeRDD(List("hello", "spark", "hello"))
    
          // 累加器 : WordCount
          // 创建累加器对象
          val wcAcc = new MyAccumulator()
          // 向Spark进行注册
          sc.register(wcAcc, "wordCountAcc")
    
          rdd.foreach(
              word => {
                  // 数据的累加(使用累加器)
                  wcAcc.add(word)
              }
          )
    
          // 获取累加器累加的结果
          println(wcAcc.value)
    
          sc.stop()
    

    3. 广播变量

    1. 实现原理

    广播变量可以高效分发较大的对象。它向所有节点发送一个较大的只读值,以供一个或者多个 Spark 操作使用, 比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。

    2. 实现

    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
          val sc = new SparkContext(sparConf)
    
          val rdd1 = sc.makeRDD(List(
              ("a", 1),("b", 2),("c", 3)
          ))
          val map = mutable.Map(("a", 4),("b", 5),("c", 6))
    
          // 封装广播变量
          val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
    
          rdd1.map {
              case (w, c) => {
                  // 方法广播变量
                  val l: Int = bc.value.getOrElse(w, 0)
                  (w, (c, l))
              }
          }.collect().foreach(println)
    
          sc.stop()