第一章 Spark概述

Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

Hadoop 1.x版本的问题:

  1. NameNode是单点的,制约了整个Hadoop集群的发展
  2. NameNode是单点的,受到了硬件的制约
  3. MR计算速度慢,因为早期设计时,主要针对于数据的一次性计算,将数据从存储介质中读取出来,然后进行计算,最后将计算的结果保存到介质中;不适合迭代计算和数据挖掘
  4. 早期版本的计算框架和资源调度框架耦合在一起,无法分离

Hadoop 2.x版本(Yarn版本):

  1. NameNode是高可用的
  2. 增加了独立的资源调度框架(Yarn)
  3. 将计算框架和资源调度框架解耦合,可以分离

Hadoop 3.x核心架构没变

Spark:

  1. Spark出现的时机:Hadoop1.x~2.x
  2. 基于MR框架所产生新的计算框架,优化了其中的计算过程,将数据处理结果保存到内存中,让执行效率得到了极大的提升
  3. 计算模型非常丰富:map,filter,flatMap,groupBy,sortBy
  4. 基于Scala语言开发的,支持函数式编程,天生就适合迭代式计算和数据挖掘计算

工作中一般用得多的是Spark On Yarn,Spark负责计算,Yarn负责调度
Hadoop & Spark.png

Spark和Hadoop的根本差异是多个作业之间的数据通信问题 : Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘
Spark只有在shuffle的时候将数据写入磁盘,而Hadoop中多个MR作业之间的数据交互都要依赖于磁盘交互
但是Spark是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致Job执行失败,此时,MapReduce其实是一个更好的选择,所以Spark并不能完全替代MR
image.png

  • Spark Core

Spark Core中提供了Spark最基础与最核心的功能,Spark其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib都是在Spark Core的基础上进行扩展的

  • Spark SQL

Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。

  • Spark Streaming

Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。

  • Spark MLlib

MLlib是Spark提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。

  • Spark GraphX

GraphX是Spark面向图计算提供的框架与算法库。

第二章 Spark快速上手

大数据的很多框架都是基于Java语言开发的,所以很多框架中的核心的进程都是通过 JVM 运行的
在执行自己的作业程序时,同时也会启动其他的进程,这些东西合在一起称之为运行环境
大数据框架核心都是通过main方法执行的

Spark提供了特殊的方法,可以对分组聚合的操作进行简化: reduceByKey
reduceByKey:数据处理中,如果又相同的key,那么会对同一个keyvalue进行reduce操作
reduceByKey 对处理的数据要求都是KV键值对

WordCount示例:

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object WordCount3 {
  4. def main(args: Array[String]): Unit = {
  5. // TODO Spark - WordCount
  6. // 1. 建立Spark引擎的连接(环境)
  7. // local => 1个Thread
  8. // local[*] => 使用当前机器中最大的核数线程
  9. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  10. val sc: SparkContext = new SparkContext(conf)
  11. // 2. 数据的统计分析
  12. // 读取文件数据源,获取原始数据
  13. val lines: RDD[String] = sc.textFile("D:\\idea-workplace\\Spark\\data\\wordCount.txt")
  14. // 将原始数据进行切分,形成一个个的单词
  15. val words: RDD[String] = lines.flatMap(line => line.split(" "))
  16. val wordToOne: RDD[(String, Int)] = words.map(
  17. word => (word, 1)
  18. )
  19. // Spark提供了特殊的方法,可以对分组聚合的操作进行简化: reduceByKey
  20. // reduceByKey:数据处理中,如果又相同的key,那么会对同一个key的value进行reduce操作
  21. // reduceByKey 对处理的数据要求都是KV键值对
  22. val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)
  23. // 将统计分析的结果打印在控制台上
  24. wordCount.collect().foreach(println)
  25. // 3. 关闭连接
  26. sc.stop()
  27. }
  28. }

第三章 Spark运行环境

Local模式

所谓的Local模式,就是不需要其他任何节点资源就可以在本地执行Spark代码的环境,一般用于教学,调试,演示等,之前在IDEA中运行代码的环境我们称之为开发环境,不太一样

提交自己代码到服务器运行之前需要打包,用Maven打包时没有提前执行一遍的话,打完的包里可能没有.class文件,服务器上运行会报找不到class错误。(也可以手动打包)
各种操作见附件文档

Standalone模式

只使用Spark自身节点运行的集群模式,也就是我们所谓的独立部署(Standalone)模式。Spark的Standalone模式体现了经典的master-slave模式
各种操作见附件文档

Yarn模式

独立部署(Standalone)模式由Spark自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。所以接下来我们来学习在强大的Yarn环境下Spark是如何工作的(其实是因为在国内工作中,Yarn使用的非常多)。
各种操作见附件文档

Windows模式

在同学们自己学习时,每次都需要启动虚拟机,启动集群,这是一个比较繁琐的过程,并且会占大量的系统资源,导致系统执行变慢,不仅仅影响学习效果,也影响学习进度,Spark提供了可以在windows系统下启动本地集群的方式,这样,在不使用虚拟机的情况下,也能学习Spark的基本使用。
各种操作见附件文档

第四章 Spark运行架构

运行架构

Spark核心时一个计算引擎,采用了标准的 master-slave 的结构

如下图所示,它展示了一个 Spark执行时的基本结构。图形中的Driver表示master,负责管理整个集群中的作业任务调度。图形中的Executor 则是 slave,负责实际执行任务。
11111.png
图中Driver表示master,负责管理整个集群中的作业任务调度;图中的Executor则是slave,负责实际执行任务。

核心组件

Driver + Executor

Driver

实际上,我们无法准确地描述Driver的定义,因为在整个的编程过程中没有看到任何有关Driver的字眼。所以简单理解,所谓的Driver就是驱使整个应用运行起来的程序,也称之为Driver类。一般将创建Spark环境上下文对象的程序,称之为Driver程序

Driver在Spark作业执行时主要负责:

  • 将用户程序转化(封装)为作业(job)
  • 在Executor之间调度任务(task)
  • 跟踪Executor的执行情况
  • 通过UI展示查询运行情况

    Executor

    Spark Executor是集群中工作节点(Worker)中的一个JVM进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。

Executor有两个核心功能:

  • 负责运行组成Spark应用的任务,并将结果返回给驱动器(Driver)进程
  • 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

    其他组件

    Master & Worker + ApplicationMaster

    Master & Worker

    Spark集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能,所以环境中还有其他两个核心组件:Master和Worker,这里的Master是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于Yarn环境中的RM(ResourceManager), 而Worker呢,也是进程,一个Worker运行在集群中的一台服务器上,由Master分配资源对数据进行并行的处理和计算,类似于Yarn环境中NM(NodeManager)。

    ApplicationMaster

    Hadoop用户向YARN集群提交应用程序时,提交程序中应该包含ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container,运行用户自己的程序任务job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。
    说的简单点就是,ResourceManager(资源)和Driver(计算)之间的解耦合靠的就是ApplicationMaster。

    核心概念

    Executor & Core

    Spark Executor是集群中运行在工作节点(Worker)中的一个JVM进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点Executor的内存大小和使用的虚拟CPU核(vcore)数量。

应用程序相关启动参数如下:

名称 说明
—num-executors 配置Executor的数量
—executor-memory 配置每个Executor的内存大小
—executor-cores 配置每个Executor的虚拟CPU core数量

并行度(Parallelism)

在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正地实现多任务并行执行,记住,这里是并行,而不是并发。这里我们将整个集群并行执行任务的数量称之为并行度。那么一个作业到底并行度是多少呢?这个取决于框架的默认配置。应用程序也可以在运行过程中动态修改。

有向无环图(DAG)

11111.png

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。

大数据计算引擎框架我们根据使用方式的不同一般会分为四类,其中第一类就是Hadoop所承载的MapReduce,它将计算分为两个阶段,分别为 Map阶段 和 Reduce阶段。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。 由于这样的弊端,催生了支持 DAG 框架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及实时计算。

这里所谓的有向无环图,并不是真正意义的图形,而是由Spark程序直接映射成的数据流的高级抽象模型。简单理解就是将整个程序计算的执行过程用图形表示出来,这样更直观,更便于理解,可以用于表示程序的拓扑结构。

提交流程

所谓的提交流程,其实就是我们开发人员根据需求写的应用程序通过Spark客户端提交给Spark运行环境执行计算的流程。在不同的部署环境中,这个提交过程基本相同,但是又有细微的区别,我们这里不进行详细的比较,但是因为国内工作中,将Spark引用部署到Yarn环境中会更多一些,所以本课程中的提交流程是基于Yarn环境的。(Spark On Yarn)
11111.png
Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster两种模式主要区别在于:Driver程序的运行节点位置。

Yarn Client模式

Client模式将用于监控和调度的Driver模块在客户端执行,而不是在Yarn中,所以一般用于测试。启动项--deploy-mode client

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

提交到执行流程:

  • Driver在任务提交的本地机器上运行
  • Driver启动后会和ResourceManager通讯申请启动ApplicationMaster
  • ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,负责向ResourceManager申请Executor内存
  • ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程
  • Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数
  • 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage(这两个名词的解释见本章节后半部分),每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。

    Yarn Cluster模式

    Cluster模式将用于监控和调度的Driver模块启动在Yarn集群资源中执行一般应用于实际生产环境。启动项--deploy-mode cluster

    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    10
    

    提交到执行流程:

  • 在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,

  • 随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver
  • Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程
  • Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数
  • 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage(这两个名词的解释见本章节后半部分),每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。

    宽依赖 & 窄依赖

    Spark中RDD的高效与DAG图有着莫大的关系,在DAG调度中需要对计算过程划分stage,而划分依据就是RDD之间的依赖关系。针对不同的转换函数,RDD之间的依赖关系分类窄依赖(narrow dependency)和宽依赖(wide dependency, 也称 shuffle dependency)

  • 窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据规模无关)

  • 宽依赖是指父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区(O(n),与数据规模有关)

SparkCore - 图6
相比于宽依赖,窄**依赖对优化很有利** ,主要基于以下两点:

宽依赖往往对应着shuffle操作,需要在运行过程中将同一个父RDD的分区传入到不同的子RDD分区中,中间可能涉及多个节点之间的数据传输;而窄依赖的每个父RDD的分区只会传入到一个子RDD分区中,通常可以在一个节点内完成转换。

当RDD分区丢失时(某个节点故障),spark会对数据进行重算。

对于窄依赖,由于父RDD的一个分区只对应一个子RDD分区,这样只需要重算和子RDD分区对应的父RDD分区即可,所以这个重算对数据的利用率是100%的

对于宽依赖,重算的父RDD分区对应多个子RDD分区,这样实际上父RDD 中只有一部分的数据是被用于恢复这个丢失的子RDD分区的,另一部分对应子RDD的其它未丢失分区,这就造成了多余的计算;更一般的,宽依赖中子RDD分区通常来自多个父RDD分区,极端情况下,所有的父RDD分区都要进行重新计算。

如下图所示,b1分区丢失,则需要重新计算a1,a2和a3,这就产生了冗余计算(a1,a2,a3中对应b2的数据)。
SparkCore - 图7
窄依赖的函数有:map, filter, union, join(父RDD是hash-partitioned ), mapPartitions, mapValues
宽依赖的函数有:groupByKey, join(父RDD不是hash-partitioned ), partitionBy

Stage

一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage。Stage的划分,简单的说是以shuffle和result这两种类型来划分

在Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage

比如rdd.parallize(1 to 10).foreach(println)这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个

如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 这个job因为有reduce,所以有一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。如果job中有多次shuffle,那么每个shuffle之前都是一个stage

根据RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算

因此spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中

在spark中,Task的类型分为2种:ShuffleMapTaskResultTask;简单来说,DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的

其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中。

第五章 Spark核心编程

Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

  • RDD : 弹性分布式数据集
  • 累加器:分布式共享只写变量
  • 广播变量:分布式共享只读变量

    RDD

    什么是RDD

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集(**RDD不存数据**),是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

存在数据分区的概念,处理的数据会根据规则放置在不同的分区中,然后发给不同的节点进行计算
这里的分区称之为Partition

  • 弹性
    • 存储的弹性:内存与磁盘的自动切换;
    • 容错的弹性:数据丢失可以自动恢复;
    • 计算的弹性:计算出错重试机制;
    • 分片的弹性:可根据需要重新分片。
  • 分布式:数据存储在大数据集群不同节点上
  • 数据集:RDD封装了计算逻辑,并不保存数据
  • 数据抽象:RDD是一个抽象类,需要子类具体实现 abstract class RDD[T: ClassTag]
  • 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
  • 可分区、并行计算

RDD的实现原理和 IO 的实现原理是一样的(一层包一层)
关系:都是延迟加载数据,都是一层层地包装功能,等到触发执行命令再一起执行(IO中是readLine(),RDD中是collect())
区别:IO流回临时的存储数据(Buffer缓冲区),RDD不会保存数据(比如:在flatMap处理完数据后,textFile中对应的数据就没了,依此类推)
IO.png

5大核心属性

* Internally, each RDD is characterized by five __main properties:

- A list of partitions(分区列表)
- A function for computing each split(每个切片[分区]都会有一个计算函数,每个分区的计算逻辑是相同的)
- A list of dependencies on other RDDs(依赖其他RDD的列表[依赖关系表])
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
(当数据为K-V类型数据时,可以通过设定分区器自定义数据的分区,例如:RDD采用哈希分区)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
(可选,一个计算每个切片的首选位置的列表,例如:HDFS文件的块位置)

分区列表
RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]

分区计算函数:
Spark在计算时,是使用分区函数对每一个分区进行计算(每个分区的计算逻辑是相同的

  /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition.
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]

RDD之间的依赖关系:
RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系

  /**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps

分区器(可选):
当数据为K-V类型数据时,可以通过设定分区器自定义数据的分区(见本章下面RDD转换算子中的partitionBy)

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None

首选位置(可选):
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

  /**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

所谓的首选位置指的是数据的位置和计算的位置的关系
数据和计算如果不在同一个节点上,那么就意味着计算时需要拉取数据,性能会降低

这里的位置存在一个本地化级别的概念:

  • 进程本地化:数据和计算在同一个进程中
  • 节点本地化:数据和计算在同一个节点中
  • 机架本地化:数据和计算在同一个机架上
  • 任意:数据和计算随意

计算的时候,移动数据不如移动计算(大数据中数据量比较大)
**

执行原理

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。

RDD是Spark框架中用于数据处理的核心模型,接下来我们看看,在Yarn环境中,RDD的工作原理:

  1. 启动Yarn集群环境
  2. Spark通过申请资源创建调度节点和计算节点
  3. Spark框架根据需求将计算逻辑根据分区划分成不同的任务
  4. 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

从以上流程可以看出RDD在整个流程中主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算

基础编程

RDD创建

  1. 从集合(内存)中创建RDD
    makeRDD底层其实调用的就是parallelize方法,所以两个方法没有任何区别,推荐使用makeRDD(好记)
    makeRDD方法有两个参数:
    1. 第一个参数表示数据集
    2. 第二个参数表示切片(分区)的数量,如果不传递这个参数,会使用默认值,默认值与当前环境中的最大核数有关

切片默认值源码实现(local模式):
scheduler.conf.getInt("spark.default.parallelism", totalCores)
从SparkConf配置对象中读取配置参数,如果获取不到,那么会取当前环境中的最大核数来使用
切片默认值源码实现(集群模式):
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
从SparkConf配置对象中读取配置参数,如果获取不到,那么会取当前环境中的最大核数和2中较大的值来使用

决定分区数量的优先级:makeRDD方法第二个参数传递的分区参数 => conf中配置的参数 => 默认值

object Instance_Memory {

    def main(args: Array[String]): Unit = {

        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Instance")
          // 设置默认分区数量
        // conf.set("spark.default.parallelism", "4")
        val sc: SparkContext = new SparkContext(conf)

        // TODO RDD - 创建(从无到有) - 从内存中
        val seq: Seq[Int] = Seq(1, 2, 3, 4)

        // parallelize: 并行
        val rdd: RDD[Int] = sc.parallelize(seq)
        /*
            def makeRDD[T: ClassTag](
                seq: Seq[T],
                numSlices: Int = defaultParallelism): RDD[T] = withScope {
                parallelize(seq, numSlices)
            }
            makeRDD底层其实调用的就是parallelize方法,所以两个方法没有任何区别,推荐使用makeRDD(好记)

            makeRDD方法有两个参数:
            第一个参数表示数据集
            第二个参数表示切片(分区)的数量,如果不传递这个参数,会使用默认值,默认值与当前环境中的最大核数有关

            切片默认值源码实现(local模式):
                 scheduler.conf.getInt("spark.default.parallelism", totalCores)
                 从SparkConf配置对象中读取配置参数,如果获取不到,那么会取当前环境中的最大核数来使用
            切片默认值源码实现(集群模式):
                 conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
                 从SparkConf配置对象中读取配置参数,如果获取不到,那么会取当前环境中的最大核数和2中较大的值来使用

            决定分区数量的优先级:makeRDD方法第二个参数传递的分区参数 => conf中配置的参数 => 默认值
         */
        val rdd1: RDD[Int] = sc.makeRDD(seq)

        // 打印分区的数量
        println(rdd1.partitions.length)
        // 将数据的处理结果保存为分区文件,每个分区保存成一份文件
        rdd1.saveAsTextFile("output")

        rdd.collect().foreach(println)
        rdd1.collect().foreach(println)

        sc.stop()
    }
}
  1. 从外部存储(文件)创建RDD
    sc.textFile("data/word.txt")读取文件时,以行为单位进行读取

textFile方法提供了2个参数:

  1. 第一个参数为文件(目录)的路径
  2. 第二个参数为最小分区数量(但是不一定是这个数量,但一定大于等于这个数量),可以不传,使用默认值

      默认值:math.min(defaultParallelism, 2)<br />           **决定分区数量的优先级:textFile方法传参 => 配置参数 => 默认值**<br />           <br />      **Spark****读取文件****采用的就是Hadoop的读取方式进行切片分区,而切片规则和数据读取的规则有些差异**<br />       底层实现:<br />           totalSize:总的文件的字节数<br />           goalSize:每个分区应该方多少字节 `goalSize = totalSize / (numSplits == 0 ? 1 : numSplits)`<br />然后计算有多少个分区,然后计算剩下多少字节,再看剩余字节数量是否大于goalSize的10%,决定是否再加一个分区存放剩余字节<br />**eg**: totalSize = 33字节  分区数量 = 2  =>  goalSize = 16字节  33 - (16* 2)  = 1字节  1/16 < 10% 所以不添加新分区,将剩余字节并入最后的分区,所以分为2个分区,第一个分区存放16字节,第二个分区存放17字节
    
    object Instance_Disk {
    
    def main(args: Array[String]): Unit = {
    
      val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Instance")
      val sc: SparkContext = new SparkContext(conf)
    
      // TODO RDD - 实例化 - 磁盘文件
    
        /*
          textFile方法提供了2个参数:
          第一个参数为文件(目录)的路径
          第二个参数为最小分区数量(不一定是这个数量,但一定大于等于这个数量,因为可能会不能平分,会有剩余),可以不传,使用默认值
    
          默认值:math.min(defaultParallelism, 2)
    
          决定分区数量的优先级:textFile方法传参 => 配置参数 => 默认值
    
          1. Spark读取文件采用的就是Hadoop的读取方式
          2. 底层实现:
              totalSize:总的文件的字节数
              minPartitions:预期分区数量
              goalSize:每个分区应该方多少字节 goalSize = totalSize / (numSplits == 0 ? 1 : numSplits)
       */
    
      // 读取文件时,以行为单位进行读取
      val rdd1: RDD[String] = sc.textFile("data/word.txt")
      // 文件路径可以是具体的文件路径,也可以是目录的路径
      val rdd2: RDD[String] = sc.textFile("someDirectory")
      // 路径还可以用通配符
      val rdd3: RDD[String] = sc.textFile("someDirectory/word*.txt")
    
      // 如果想要获取数据所在的文件相关属性,需要使用wholeTextFiles方法
      // wholeTextFiles方法返回的为K-V格式数据,Key表示文件的路径,Value表示文件的内容
      val rdd4: RDD[(String, String)] = sc.wholeTextFiles("someDirectory/word*.txt")
    
      rdd1.collect().foreach(println)
    
      sc.stop()
    }
    }
    
  1. 从其他RDD创建
    主要是通过一个RDD运算完后,再产生新的RDD
  2. 直接创建RDD(new)
    使用new的方式直接构造RDD,一般由Spark框架自身使用

    RDD并行度与分区

    默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。

文件数据如何分区存放:

  1. 分区计算和数据存放的规则是两回事
    计算分区时,需要按照字节数进行计算的,但是存放数据不能按照字节数存放(可能会把一些字符截断)
  2. Spark分区数据存放时采用Hadoop的处理方式
    Hadoop读取数据时不是按照字节读取的,是按行读取的
  3. Hadoop读取数据时不是按照数据的位置,而是按照数据的偏移量读取的(偏移量从0开始)
  4. Hadoop读取数据时,偏移量不会重复读取

    RDD转换算子

    算子:从认知心理学的角度来理解,解决一个问题,其实就是改变问题的状态:
    问题(初始)=>operator(算子)=>问题(审批)=>operator(算子)=>问题(解决)

集合中的方法就称为方法;RDD中的方法称之为算子(List.flatMap & RDD.flatMap)
集合中的方法是单点操作;RDD中的方法(算子)是分布式的

转换:A(RDD) -> B(RDD)

RDD根据数据处理方式的不同将算子整体上分为Value类型双Value类型Key-Value类型

所谓算子,其实就是RDD的方法;将有一个RDD转换成另外一个RDD的方法,就称之为转换算子
**
map算子将数据集中的每一条数据按照指定的规则进行转换,这里的转换可能是数值,也可能是类型
map算子需要传递一个参数,这个参数是函数类型:Int => U

默认情况下,旧的RDD转换成新的RDD,分区数量不变

RDD分区之间并行计算的,但是RDD分区内**是串行计算的

计算时,分区数量不变,数据所在的分区也不变

扁平化:一个整体拆分成一个个的个体,所有的个体都要使用(flatMap)

clone()方法是浅复制:只把外部的复制了,内部的不会复制


map & mapPartitions

map: 将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换
mapPartitions: 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据

map算子和mapPartitions算子的区别

  • 数据处理角度
    map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作(分区之间是无序的,第一个取到的不一定的是0号分区,Spark中分区号从0开始)。
  • 功能的角度
    map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。mapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
  • 性能的角度
    map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions将分区数据当成整体来进行处理,会把整体数据都放进内存中,就需要内存中数据有限制,否则会出现溢出;mapPartitions操作处理的数据不会马上释放,需要等待整个分区的数据全部处理完,才会释放,所以mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。改为使用map操作。

  • [x] 小功能:从服务器日志数据apache.log(见附件)中获取用户请求URL资源路径(map)

    val rdd: RDD[String] = sc.textFile("data/apache.log")
    // eg: 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
    val res: RDD[String] = rdd.map(_.split(" ")(6))
    
  • [x] 小功能:获取每个数据分区的最大值(mapPartitions)

    val rdd = sc.makeRDD(List(1, 2, 3, 4, 3, 4, 5, 6), 4)
    rdd.mapPartitions(part => List(part.max).iterator)
    

mapPartitionsWithIndex

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引

  • 小功能:获取第二个数据分区的数据
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 8, 9, 5, 6), 4)
    val res = rdd.mapPartitionsWithIndex(
      (index, part) => {
          if (index == 1) {
                part
          } else {
                // Nil.iterator返回一个空的List的迭代器
                Nil.iterator
          }
      }
    )
    
    Nil是一个空的List,定义为List[Nothing],根据List的定义List[+A],所以Nil是所有List[T]的子类。

flatMap

将处理的数据进行扁平化(将一个整体拆分成一个一个的个体来使用的方式)后再进行映射处理,所以算子也称之为扁平映射

  • 小功能:将List(List(1,2),3,List(4,5))进行扁平化操作
    val rdd = sc.makeRDD(List(List(1, 2), 3, List(4, 5)), 1)
    // Scala在函数中用模式匹配,外层函数必须使用中括号{}
    val res: RDD[Any] = rdd.flatMap {
      case list: List[_] => list
      case num: Int => List(num)
    }
    
    Scala在函数中用模式匹配,外层函数必须使用中括号{}

glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

  • 小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2)
    val maxRDD: RDD[Int] = rdd.glom().map(_.max)
    // collect()将 RDD => Array
    println(maxRDD.collect().sum)
    
    collect()将 RDD => Array

groupBy

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中
makeRDD(List, 2) // 分区
RDD.groupby(func()) // 分组
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

Q: 这两个分区、分组数量不一致怎么办?
A: 一个组的数据只能在一个分区中
groupBy分组后可能会导致不同分区的数据不均衡
默认情况下,分组前和分组后的分区数量不变
(RDD.map处理完的数据,原来数据在哪个分区,处理完还在同一个分区)
groupBy会将同一个分区的数据打乱,和其他分区的数据重新组合在一起,将这样的现象称之为 shuffle
groupBy分组会在执行过程中进行等待,直到所有的数据分组完成后,才能继续往后执行
groupBy执行过程中的等待操作是通过磁盘完成的(数据太多,放内存会炸),所以shuffle操作一定要落盘
image.png
所有的shuffle操作都有改变分区的能力(通过方法传参改变分区数量,例如:groupBy)如果一个算子能改变分区,它90%是有shuffle的**

  • [x] 小功能:将List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。

    val rdd = sc.makeRDD(List("Hello", "hive", "hbase", "Hadoop"), 1)
    rdd.groupBy(_.substring(0, 1))
    
  • [x] 小功能:从服务器日志数据apache.log中获取每个时间段访问量。(其实就是WordCount)

    val rdd: RDD[String] = sc.textFile("data/apache.log")
    // eg: 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
    rdd.map(_.split(":")(1)).groupBy(_.substring(0, 2)).map(data=>(data._1, data._2.size))
    
  • [x] 小功能:WordCount

    val rdd: RDD[String] = sc.textFile("data/apache.log")
    // eg: 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
    rdd.map(
      line => {
          val hour = line.split(":")(1)
          (hour, 1)
      }
    ).groupBy(_._1).mapValues(_.size).collect().foreach(println)
    

filter

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

filter算子会根据 指定的规则对数据集种的每一条数据进行判断
如果想要将数据保留,那么结果返回true,如果数据不要,返回false

Scala集合中的filter考虑的是单点操作
SparkRDD的filter考虑分区数据的均衡
0(1000) 1(1000) 2(1000) => 0(100) 1(500) 2(1) => 导致数据倾斜

SparkRDD的filter没有shuffle,分区数量不变,但是分区内的数据可能不均衡,可能会出现数据倾斜

  • 小功能:从服务器日志数据apache.log中获取2015年5月17日的请求路径
    val rdd: RDD[String] = sc.textFile("data/apache.log")
    // eg: 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
    rdd.filter(_.split(" ")(3).startsWith("17/05/2015")).map(_.split(" ")(6))
      .collect().foreach(println)
    

sample

根据指定的规则从数据集中抽取数据

sample方法参数:

  1. 第一个参数表示数据抽取后是否放回
  2. 第二个 参数依赖于第一个参数,如果第一个参数为false(不放回),该参数表示每个数据被抽取的几率,取值为0-1之间,而不表示有多少比例的数据被抽取出来;如果第一个参数为true(放回),该参数表示每个数据预期被抽取的次数,取值大于0
  3. 第三个参数:seed 随机数种子(用来做随机算法的第一个数)

随机数不随机:随机算法相同,且随机数种子相同时,后面每次随机算法的结果(得到的随机数)都相同

一定程度上可以减小发生数据倾斜的可能性:sample多次抽取部分数据后可以发现有数据倾斜

val dataRDD = sparkContext.makeRDD(List(1, 2, 3, 4), 1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)

distinct

将数据集中重复的数据去重

思考一个问题:如果不用该算子,你有什么办法实现数据去重?
即使用distinct源码的实现:map(x => (x, null)).reduceByKey((x, _) => x).map(_._1)
数据处理过程:1, 1 => (1, null), (1, null) => (1, (null, null)) => (1, null) => 1


coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本

分区合并的策略:
不考虑数据是否均衡,只考虑合并更快

coalesce参数:

  1. 第一个参数表示改变分区的数量
  2. 第二个参数表示数据是否进行shuffle,默认为false

如果不使用shuffle的场合下,分区数量不能修改为比之前大的值(不报错,能运行,但是没效果),因为数据不会打乱重新组合


repartition

增加分区,并进行shuffle重新分区

该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true(coalesce(numPartitions, shuffle = true)

无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。


sortBy

该操作用于排序数据。在排序之前,可以将数据通过函数进行处理,之后按照函数处理的结果进行排序,默认为升序排列默认排序后新产生的RDD的分区数与原RDD的分区数一致(也可以修改分区数量。中间存在shuffle的过程。


intersection & union & subtract

交集 & 并集 & 差集
这三个算子返回的都是新的RDD

思考一个问题:如果两个RDD数据类型不一致怎么办?
不同类型的两个数据集无法实现交集,并集,差集的操作,可以实现拉链操作**


zip

将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素

  • 不同类型的两个数据集无法实现交集,并集,差集的操作,可以实现拉链操作
  • Spark中拉链:每个分区元素数量不同不可以做拉链;分区数量不相同也不可以做拉链
  • Scala中拉链:两边长度不一样,取短边的长度做拉链

partitionBy

将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

这个算子强调的是数据分区的变化,而不是分区数量的变化(coalesce, repartition)

List list = new ArrayList();
ArrayList list2 = new ArrayList();
list.clone(); // 没有clone方法
list2.clone(); // 可以调用clone方法
List是一个接口,ArrayList是一个类,当想使用类的特有的方法,需要使用类去声明,而不能用通用的接口声明

RDD一定没有 partitionBy方法,K-V类型的方法都通过隐式转换调用这个方法(reduceByKey也一样)但要求数据类型必须为K-V类型

RangePartitioner(范围分区器)用于做排序,特定场合用(sortBy)

思考一个问题:如果重分区的分区器和当前RDD的分区器一样怎么办?
如果重分区的分区器和之前的分区器相同,则不会进行重分区,会直接返回原来的RDD,源码中重写了equals方法

def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("HashPartitioner cannot partition array keys.")
    }
      // Scala中 == 就调用的 equals 方法
    if (self.partitioner == Some(partitioner)) {
          self
    } else {
          new ShuffledRDD[K, V, V](self, partitioner)
    }
}

reduceByKey

可以将数据按照相同的Key对Value进行聚合(分组+聚合

  • 小功能:WordCount
    val rdd= sc.textFile("D:\\idea-workplace\\Spark\\data\\apache.log")
    // WordCount
    rdd.map(line => (line.split(":")(1), 1)).reduceByKey(_+_)
      .collect().foreach(println)
    

groupByKey

将数据源的数据根据key对value进行分组

思考一个问题:reduceByKey和groupByKey的区别?
从shuffle的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能(mapSideCombine = true),这样会减少落盘的数据量,而groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey性能比较高。
从功能的角度:reduceByKey其实包含分组和聚合的功能。groupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey
**

  • 小功能:WordCount
    val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3)))
    rdd.groupByKey().mapValues(_.sum).collect().foreach(println)
    

aggregateByKey

将数据根据不同的规则进行分区内计算和分区间计算(这里分区内和分区间的操作都是针对Value的操作,和Key无关

该算子用于相同key对value进行聚合(分区内计算和分区间计算的逻辑可以是不一样的)
而reduceByKey算子分区内计算和分区间计算的逻辑是一样的

该算子有函数柯里化操作,存在多个参数列表

第一个参数列表有一个参数,这个参数表示计算初始值
(Spark或Scala中 z || zero 表示 零值 或 初始值)

第二个参数列表有两个参数
• 第一个参数表示分区内计算规则(相同key对value进行操作)
• 第二个参数表示分区间计算规则(相同key对value进行操作)

如果aggregateByKey分区内计算逻辑和分区间计算逻辑相同的场合,可以采用另外一个算子代替:foldByKey(初始值)(计算逻辑)(有初始值的概念,不能用reduceByKey是因为reduceByKey不能设置初始值)

初始值可以理解为集合内数据与集合外数据一起做操作,类似Scala集合的fold方法

  • 小功能:取出每个分区内相同key的最大值然后分区间相加 ```scala // 取出每个分区内相同key的最大值然后分区间相加 val rdd = sc.makeRDD(List( (“a”,1),(“a”,2),(“c”,3), (“b”,4),(“c”,5),(“c”,6) ),2)

rdd.aggregateByKey(0)( (x, y) => math.max(x, y), (x, y) => x + y ).collect().foreach(println)


---

<a name="V7VdN"></a>
##### foldByKey

当分区内计算规则和分区间计算规则相同时,`aggregateByKey`就可以简化为`foldByKey`

---

<a name="LtVeU"></a>
##### combineByKey

最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

- [x] **小功能:**将数据List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个key的平均值
```scala
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)

val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
    (_, 1),
    (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?

  • reduceByKey: 相同key的第一个数据不进行任何计算,分区内和分区间计算规则相同
  • foldByKey: 相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
  • aggregateByKey:相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
  • combineByKey: 当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构,分区内和分区间计算规则可以不相同。

image.png
image.png
image.png
这里x不能直接用下划线表示,因为x是由第一个参数转换来的,不能直接推断出来


sortByKey

在一个(K,V)的RDD上调用,K必须实现Ordered接口(特质),返回一个按照key进行排序的RDD

该算子根据key进行排序,默认升序

对key排序,不是对value排序

使用范围分区器 RangePartitioner

  • [x] 小功能:设置key为自定义类User

    object Test {
    
      def main(args: Array[String]): Unit = {
    
          val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Test")
          val sc: SparkContext = new SparkContext(conf)
    
          val rdd = sc.makeRDD(
              List(
                  (new User(30), 1),
                  (new User(10), 2),
                  (new User(80), 3),
                  (new User(20), 4)
              )
          )
    
          rdd.sortByKey(false).collect().foreach(println)
    
          sc.stop()
      }
    
      class User extends Ordered[User] with Serializable {
    
          var user_age: Int = _
    
          def this(age: Int) {
              this()
              user_age = age
          }
    
          override def compare(that: User): Int = {
              this.user_age - that.user_age
          }
    
          override def toString: String = "user_age: " + this.user_age
      }
    }
    

join & leftOuterJoin & rightOuterJoin

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD

两个数据集可以连接的条件是相同的key的数据进行连接
只要数据中有相同的key,都会进行连接,不考虑数量的问题 => 会导致笛卡尔积
所以,join可能会产生大量的数据,如果业务中存在shuffle,那么性能非常差,所以不推荐使用,能不用就不用,能代替就代替

leftOuterJoin: 类似于SQL语句的左外连接
rdd1.leftOuterJoin(rdd2) // rdd1是主表,rdd2是从表

rightOuterJoin: 类似于SQL语句的右外连接
rdd1.rightOuterJoin(rdd2) // rdd2是主表,rdd1是从表

主表中所有的都会出现,从表中只有连接上的才会出现
**image.png


cogroup

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

cogroup => group + connect


案例实操

统计出每一个省份每个广告被点击数量排行的Top3
agent.log(见附件):时间戳,省份,城市,用户,广告,中间字段使用空格分隔

最后排序取前三时,将分布式数据进行单点排序,可能内存不够,有可能程序跑不通

object Test {

    def main(args: Array[String]): Unit = {

        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Test")
        val sc: SparkContext = new SparkContext(conf)

        // 1. 获取数据 - agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔
        val rdd: RDD[String] = sc.textFile("data/agent.log")

        // 2. 时间戳 省份 城市 用户 广告 => (省份-广告, 1)
        val tupleRdd = rdd.map(
            line => {
                val prov: String = line.split(" ")(1)
                val ad: String = line.split(" ")(4)
                (prov + "-" + ad, 1)
            }
        )

        // 3. (省份-广告, 1) => (省份-广告, sum)
        val tupleSumRDD = tupleRdd.reduceByKey(_ + _)

        // 4. (省份-广告, sum) => (省份, (广告, sum))
        val splitSumRDD = tupleSumRDD.map{
            case (prov_ad, sum) => {
                val prov: String = prov_ad.split("-")(0)
                val ad: String = prov_ad.split("-")(1)
                (prov, (ad, sum))
            }
        }

        // 5. (省份, (广告, sum)) => (省份, List[(广告1, sum1), (广告2, sum2), (广告3, sum3), (广告4, sum4)])
        // val groupSumRDD = splitSumRDD.groupByKey()

        // 6. (省份, List[(广告1, sum1), (广告2, sum2), (广告3, sum3), (广告4, sum4)]) => (省份, List[top3])
        // 将分布式数据进行单点排序,可能内存不够,有可能程序跑不通
        // val top3 = groupSumRDD.mapValues(
        //    iter => {
        //        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
        //    }
        //)

        // 优化过后的
        val top3 = splitSumRDD.aggregateByKey(ArrayBuffer[(String, Int)]())(
            (buffer, t) => {
                buffer.append(t)
                buffer.sortBy(_._2)(Ordering.Int.reverse).take(3)
            },
            (buff1, buff2) => {
                buff1.appendAll(buff2)
                buff1.sortBy(_._2)(Ordering.Int.reverse).take(3)
            }
        )

        // 7. 打印结果
        top3.collect().foreach(println)

        sc.stop()
    }
}

RDD行动算子

行动:Action,用于让RDD的计算开始执行

所谓的行动算子,其实就是用于触发RDD运行的方法


reduce

聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据


collect

在驱动程序(Driver)中,以数组Array的形式返回数据集的所有元素

collect().foreach(println) // 这里的print是 Scala Array的,单点操作,串行打印,数据与源数据顺序相同
rdd.foreach(println) // 这里的print是RDD的,分布式,并行打印,数据与源数据顺序可能不同


未完待续。。。

附件

相关文档:

数据来源: