第一章 Spark概述
Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
Hadoop 1.x版本的问题:
- NameNode是单点的,制约了整个Hadoop集群的发展
- NameNode是单点的,受到了硬件的制约
- MR计算速度慢,因为早期设计时,主要针对于数据的一次性计算,将数据从存储介质中读取出来,然后进行计算,最后将计算的结果保存到介质中;不适合迭代计算和数据挖掘
- 早期版本的计算框架和资源调度框架耦合在一起,无法分离
Hadoop 2.x版本(Yarn版本):
- NameNode是高可用的
- 增加了独立的资源调度框架(Yarn)
- 将计算框架和资源调度框架解耦合,可以分离
Hadoop 3.x核心架构没变
Spark:
- Spark出现的时机:Hadoop1.x~2.x
- 基于MR框架所产生新的计算框架,优化了其中的计算过程,将数据处理结果保存到内存中,让执行效率得到了极大的提升
- 计算模型非常丰富:map,filter,flatMap,groupBy,sortBy
- 基于Scala语言开发的,支持函数式编程,天生就适合迭代式计算和数据挖掘计算
工作中一般用得多的是Spark On Yarn,Spark负责计算,Yarn负责调度
Spark和Hadoop的根本差异是多个作业之间的数据通信问题 : Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘
Spark只有在shuffle的时候将数据写入磁盘,而Hadoop中多个MR作业之间的数据交互都要依赖于磁盘交互
但是Spark是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致Job执行失败,此时,MapReduce其实是一个更好的选择,所以Spark并不能完全替代MR
- Spark Core
Spark Core中提供了Spark最基础与最核心的功能,Spark其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib都是在Spark Core的基础上进行扩展的
- Spark SQL
Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。
- Spark Streaming
Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。
- Spark MLlib
MLlib是Spark提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。
- Spark GraphX
第二章 Spark快速上手
大数据的很多框架都是基于Java语言开发的,所以很多框架中的核心的进程都是通过 JVM 运行的
在执行自己的作业程序时,同时也会启动其他的进程,这些东西合在一起称之为运行环境
大数据框架核心都是通过main方法执行的
Spark提供了特殊的方法,可以对分组聚合的操作进行简化: reduceByKey
reduceByKey
:数据处理中,如果又相同的key
,那么会对同一个key
的value
进行reduce
操作reduceByKey
对处理的数据要求都是KV键值对
WordCount示例:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount3 {
def main(args: Array[String]): Unit = {
// TODO Spark - WordCount
// 1. 建立Spark引擎的连接(环境)
// local => 1个Thread
// local[*] => 使用当前机器中最大的核数线程
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc: SparkContext = new SparkContext(conf)
// 2. 数据的统计分析
// 读取文件数据源,获取原始数据
val lines: RDD[String] = sc.textFile("D:\\idea-workplace\\Spark\\data\\wordCount.txt")
// 将原始数据进行切分,形成一个个的单词
val words: RDD[String] = lines.flatMap(line => line.split(" "))
val wordToOne: RDD[(String, Int)] = words.map(
word => (word, 1)
)
// Spark提供了特殊的方法,可以对分组聚合的操作进行简化: reduceByKey
// reduceByKey:数据处理中,如果又相同的key,那么会对同一个key的value进行reduce操作
// reduceByKey 对处理的数据要求都是KV键值对
val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)
// 将统计分析的结果打印在控制台上
wordCount.collect().foreach(println)
// 3. 关闭连接
sc.stop()
}
}
第三章 Spark运行环境
Local模式
所谓的Local模式,就是不需要其他任何节点资源就可以在本地执行Spark代码的环境,一般用于教学,调试,演示等,之前在IDEA中运行代码的环境我们称之为开发环境,不太一样。
提交自己代码到服务器运行之前需要打包,用Maven打包时没有提前执行一遍的话,打完的包里可能没有.class文件,服务器上运行会报找不到class错误。(也可以手动打包)
各种操作见附件文档
Standalone模式
只使用Spark自身节点运行的集群模式,也就是我们所谓的独立部署(Standalone)模式。Spark的Standalone模式体现了经典的master-slave模式。
各种操作见附件文档
Yarn模式
独立部署(Standalone)模式由Spark自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。所以接下来我们来学习在强大的Yarn环境下Spark是如何工作的(其实是因为在国内工作中,Yarn使用的非常多)。
各种操作见附件文档
Windows模式
在同学们自己学习时,每次都需要启动虚拟机,启动集群,这是一个比较繁琐的过程,并且会占大量的系统资源,导致系统执行变慢,不仅仅影响学习效果,也影响学习进度,Spark提供了可以在windows系统下启动本地集群的方式,这样,在不使用虚拟机的情况下,也能学习Spark的基本使用。
各种操作见附件文档
第四章 Spark运行架构
运行架构
Spark核心时一个计算引擎,采用了标准的 master-slave 的结构
如下图所示,它展示了一个 Spark执行时的基本结构。图形中的Driver表示master,负责管理整个集群中的作业任务调度。图形中的Executor 则是 slave,负责实际执行任务。
图中Driver表示master,负责管理整个集群中的作业任务调度;图中的Executor则是slave,负责实际执行任务。
核心组件
Driver
实际上,我们无法准确地描述Driver的定义,因为在整个的编程过程中没有看到任何有关Driver的字眼。所以简单理解,所谓的Driver就是驱使整个应用运行起来的程序,也称之为Driver类。一般将创建Spark环境上下文对象的程序,称之为Driver程序
Driver在Spark作业执行时主要负责:
- 将用户程序转化(封装)为作业(job)
- 在Executor之间调度任务(task)
- 跟踪Executor的执行情况
- 通过UI展示查询运行情况
Executor
Spark Executor是集群中工作节点(Worker)中的一个JVM进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。
Executor有两个核心功能:
- 负责运行组成Spark应用的任务,并将结果返回给驱动器(Driver)进程
- 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
其他组件
Master & Worker + ApplicationMasterMaster & Worker
Spark集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能,所以环境中还有其他两个核心组件:Master和Worker,这里的Master是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于Yarn环境中的RM(ResourceManager), 而Worker呢,也是进程,一个Worker运行在集群中的一台服务器上,由Master分配资源对数据进行并行的处理和计算,类似于Yarn环境中NM(NodeManager)。ApplicationMaster
Hadoop用户向YARN集群提交应用程序时,提交程序中应该包含ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container,运行用户自己的程序任务job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。
说的简单点就是,ResourceManager(资源)和Driver(计算)之间的解耦合靠的就是ApplicationMaster。核心概念
Executor & Core
Spark Executor是集群中运行在工作节点(Worker)中的一个JVM进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点Executor的内存大小和使用的虚拟CPU核(vcore)数量。
应用程序相关启动参数如下:
名称 | 说明 |
---|---|
—num-executors | 配置Executor的数量 |
—executor-memory | 配置每个Executor的内存大小 |
—executor-cores | 配置每个Executor的虚拟CPU core数量 |
并行度(Parallelism)
在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正地实现多任务并行执行,记住,这里是并行,而不是并发。这里我们将整个集群并行执行任务的数量称之为并行度。那么一个作业到底并行度是多少呢?这个取决于框架的默认配置。应用程序也可以在运行过程中动态修改。
有向无环图(DAG)
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。
大数据计算引擎框架我们根据使用方式的不同一般会分为四类,其中第一类就是Hadoop所承载的MapReduce,它将计算分为两个阶段,分别为 Map阶段 和 Reduce阶段。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。 由于这样的弊端,催生了支持 DAG 框架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及实时计算。
这里所谓的有向无环图,并不是真正意义的图形,而是由Spark程序直接映射成的数据流的高级抽象模型。简单理解就是将整个程序计算的执行过程用图形表示出来,这样更直观,更便于理解,可以用于表示程序的拓扑结构。
提交流程
所谓的提交流程,其实就是我们开发人员根据需求写的应用程序通过Spark客户端提交给Spark运行环境执行计算的流程。在不同的部署环境中,这个提交过程基本相同,但是又有细微的区别,我们这里不进行详细的比较,但是因为国内工作中,将Spark引用部署到Yarn环境中会更多一些,所以本课程中的提交流程是基于Yarn环境的。(Spark On Yarn)
Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。两种模式主要区别在于:Driver程序的运行节点位置。
Yarn Client模式
Client模式将用于监控和调度的Driver模块在客户端执行,而不是在Yarn中,所以一般用于测试。启动项--deploy-mode client
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
提交到执行流程:
- Driver在任务提交的本地机器上运行
- Driver启动后会和ResourceManager通讯申请启动ApplicationMaster
- ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,负责向ResourceManager申请Executor内存
- ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程
- Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数
之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage(这两个名词的解释见本章节后半部分),每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。
Yarn Cluster模式
Cluster模式将用于监控和调度的Driver模块启动在Yarn集群资源中执行。一般应用于实际生产环境。启动项
--deploy-mode cluster
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
提交到执行流程:
在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,
- 随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。
- Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程
- Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数
之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage(这两个名词的解释见本章节后半部分),每个stage生成对应的TaskSet,之后将task分发到各个Executor上执行。
宽依赖 & 窄依赖
Spark中RDD的高效与DAG图有着莫大的关系,在DAG调度中需要对计算过程划分stage,而划分依据就是RDD之间的依赖关系。针对不同的转换函数,RDD之间的依赖关系分类窄依赖(narrow dependency)和宽依赖(wide dependency, 也称 shuffle dependency)
窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据规模无关)
- 宽依赖是指父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区(O(n),与数据规模有关)
相比于宽依赖,窄**依赖对优化很有利** ,主要基于以下两点:
宽依赖往往对应着shuffle操作,需要在运行过程中将同一个父RDD的分区传入到不同的子RDD分区中,中间可能涉及多个节点之间的数据传输;而窄依赖的每个父RDD的分区只会传入到一个子RDD分区中,通常可以在一个节点内完成转换。
当RDD分区丢失时(某个节点故障),spark会对数据进行重算。
对于窄依赖,由于父RDD的一个分区只对应一个子RDD分区,这样只需要重算和子RDD分区对应的父RDD分区即可,所以这个重算对数据的利用率是100%的;
对于宽依赖,重算的父RDD分区对应多个子RDD分区,这样实际上父RDD 中只有一部分的数据是被用于恢复这个丢失的子RDD分区的,另一部分对应子RDD的其它未丢失分区,这就造成了多余的计算;更一般的,宽依赖中子RDD分区通常来自多个父RDD分区,极端情况下,所有的父RDD分区都要进行重新计算。
如下图所示,b1分区丢失,则需要重新计算a1,a2和a3,这就产生了冗余计算(a1,a2,a3中对应b2的数据)。
窄依赖的函数有:map
, filter
, union
, join
(父RDD是hash-partitioned ), mapPartitions
, mapValues
宽依赖的函数有:groupByKey
, join
(父RDD不是hash-partitioned ), partitionBy
Stage
一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage。Stage的划分,简单的说是以shuffle和result这两种类型来划分。
在Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。
比如rdd.parallize(1 to 10).foreach(println)
这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个。
如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println)
, 这个job因为有reduce,所以有一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。如果job中有多次shuffle,那么每个shuffle之前都是一个stage。
根据RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。
因此spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。
在spark中,Task的类型分为2种:ShuffleMapTask和ResultTask;简单来说,DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的。
而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中。
第五章 Spark核心编程
Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
- RDD : 弹性分布式数据集
- 累加器:分布式共享只写变量
- 广播变量:分布式共享只读变量
RDD
什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集(**RDD不存数据**),是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
存在数据分区的概念,处理的数据会根据规则放置在不同的分区中,然后发给不同的节点进行计算
这里的分区称之为Partition
- 弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片。
- 分布式:数据存储在大数据集群不同节点上
- 数据集:RDD封装了计算逻辑,并不保存数据
- 数据抽象:RDD是一个抽象类,需要子类具体实现
abstract class RDD[T: ClassTag]
- 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
- 可分区、并行计算
RDD的实现原理和 IO 的实现原理是一样的(一层包一层)
关系:都是延迟加载数据,都是一层层地包装功能,等到触发执行命令再一起执行(IO中是readLine(),RDD中是collect())
区别:IO流回临时的存储数据(Buffer缓冲区),RDD不会保存数据(比如:在flatMap处理完数据后,textFile中对应的数据就没了,依此类推)
5大核心属性
* Internally, each RDD is characterized by five __main properties:
- A list of partitions(分区列表)
- A function for computing each split(每个切片[分区]都会有一个计算函数,每个分区的计算逻辑是相同的)
- A list of dependencies on other RDDs(依赖其他RDD的列表[依赖关系表])
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) (当数据为K-V类型数据时,可以通过设定分区器自定义数据的分区,例如:RDD采用哈希分区)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) (可选,一个计算每个切片的首选位置的列表,例如:HDFS文件的块位置)
分区列表:
RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]
分区计算函数:
Spark在计算时,是使用分区函数对每一个分区进行计算(每个分区的计算逻辑是相同的)
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
RDD之间的依赖关系:
RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps
分区器(可选):
当数据为K-V类型数据时,可以通过设定分区器自定义数据的分区(见本章下面RDD转换算子中的partitionBy)
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
首选位置(可选):
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
所谓的首选位置指的是数据的位置和计算的位置的关系
数据和计算如果不在同一个节点上,那么就意味着计算时需要拉取数据,性能会降低
这里的位置存在一个本地化级别的概念:
- 进程本地化:数据和计算在同一个进程中
- 节点本地化:数据和计算在同一个节点中
- 机架本地化:数据和计算在同一个机架上
- 任意:数据和计算随意
计算的时候,移动数据不如移动计算(大数据中数据量比较大)
**
执行原理
从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。
RDD是Spark框架中用于数据处理的核心模型,接下来我们看看,在Yarn环境中,RDD的工作原理:
- 启动Yarn集群环境
- Spark通过申请资源创建调度节点和计算节点
- Spark框架根据需求将计算逻辑根据分区划分成不同的任务
- 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
从以上流程可以看出RDD在整个流程中主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算。
基础编程
RDD创建
- 从集合(内存)中创建RDD
makeRDD
底层其实调用的就是parallelize
方法,所以两个方法没有任何区别,推荐使用makeRDD
(好记)makeRDD
方法有两个参数:- 第一个参数表示数据集
- 第二个参数表示切片(分区)的数量,如果不传递这个参数,会使用默认值,默认值与当前环境中的最大核数有关
切片默认值源码实现(local模式):
scheduler.conf.getInt("spark.default.parallelism", totalCores)
从SparkConf配置对象中读取配置参数,如果获取不到,那么会取当前环境中的最大核数来使用
切片默认值源码实现(集群模式):
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
从SparkConf配置对象中读取配置参数,如果获取不到,那么会取当前环境中的最大核数和2中较大的值来使用
决定分区数量的优先级:makeRDD方法第二个参数传递的分区参数 => conf中配置的参数 => 默认值
object Instance_Memory {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Instance")
// 设置默认分区数量
// conf.set("spark.default.parallelism", "4")
val sc: SparkContext = new SparkContext(conf)
// TODO RDD - 创建(从无到有) - 从内存中
val seq: Seq[Int] = Seq(1, 2, 3, 4)
// parallelize: 并行
val rdd: RDD[Int] = sc.parallelize(seq)
/*
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
makeRDD底层其实调用的就是parallelize方法,所以两个方法没有任何区别,推荐使用makeRDD(好记)
makeRDD方法有两个参数:
第一个参数表示数据集
第二个参数表示切片(分区)的数量,如果不传递这个参数,会使用默认值,默认值与当前环境中的最大核数有关
切片默认值源码实现(local模式):
scheduler.conf.getInt("spark.default.parallelism", totalCores)
从SparkConf配置对象中读取配置参数,如果获取不到,那么会取当前环境中的最大核数来使用
切片默认值源码实现(集群模式):
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
从SparkConf配置对象中读取配置参数,如果获取不到,那么会取当前环境中的最大核数和2中较大的值来使用
决定分区数量的优先级:makeRDD方法第二个参数传递的分区参数 => conf中配置的参数 => 默认值
*/
val rdd1: RDD[Int] = sc.makeRDD(seq)
// 打印分区的数量
println(rdd1.partitions.length)
// 将数据的处理结果保存为分区文件,每个分区保存成一份文件
rdd1.saveAsTextFile("output")
rdd.collect().foreach(println)
rdd1.collect().foreach(println)
sc.stop()
}
}
- 从外部存储(文件)创建RDD
sc.textFile("data/word.txt")
读取文件时,以行为单位进行读取
textFile
方法提供了2个参数:
- 第一个参数为文件(目录)的路径
第二个参数为最小分区数量(但是不一定是这个数量,但一定大于等于这个数量),可以不传,使用默认值
默认值:math.min(defaultParallelism, 2)<br /> **决定分区数量的优先级:textFile方法传参 => 配置参数 => 默认值**<br /> <br /> **Spark****读取文件****采用的就是Hadoop的读取方式进行切片分区,而切片规则和数据读取的规则有些差异**<br /> 底层实现:<br /> totalSize:总的文件的字节数<br /> goalSize:每个分区应该方多少字节 `goalSize = totalSize / (numSplits == 0 ? 1 : numSplits)`<br />然后计算有多少个分区,然后计算剩下多少字节,再看剩余字节数量是否大于goalSize的10%,决定是否再加一个分区存放剩余字节<br />**eg**: totalSize = 33字节 分区数量 = 2 => goalSize = 16字节 33 - (16* 2) = 1字节 1/16 < 10% 所以不添加新分区,将剩余字节并入最后的分区,所以分为2个分区,第一个分区存放16字节,第二个分区存放17字节
object Instance_Disk { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Instance") val sc: SparkContext = new SparkContext(conf) // TODO RDD - 实例化 - 磁盘文件 /* textFile方法提供了2个参数: 第一个参数为文件(目录)的路径 第二个参数为最小分区数量(不一定是这个数量,但一定大于等于这个数量,因为可能会不能平分,会有剩余),可以不传,使用默认值 默认值:math.min(defaultParallelism, 2) 决定分区数量的优先级:textFile方法传参 => 配置参数 => 默认值 1. Spark读取文件采用的就是Hadoop的读取方式 2. 底层实现: totalSize:总的文件的字节数 minPartitions:预期分区数量 goalSize:每个分区应该方多少字节 goalSize = totalSize / (numSplits == 0 ? 1 : numSplits) */ // 读取文件时,以行为单位进行读取 val rdd1: RDD[String] = sc.textFile("data/word.txt") // 文件路径可以是具体的文件路径,也可以是目录的路径 val rdd2: RDD[String] = sc.textFile("someDirectory") // 路径还可以用通配符 val rdd3: RDD[String] = sc.textFile("someDirectory/word*.txt") // 如果想要获取数据所在的文件相关属性,需要使用wholeTextFiles方法 // wholeTextFiles方法返回的为K-V格式数据,Key表示文件的路径,Value表示文件的内容 val rdd4: RDD[(String, String)] = sc.wholeTextFiles("someDirectory/word*.txt") rdd1.collect().foreach(println) sc.stop() } }
- 从其他RDD创建
主要是通过一个RDD运算完后,再产生新的RDD - 直接创建RDD(new)
使用new的方式直接构造RDD,一般由Spark框架自身使用RDD并行度与分区
默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
文件数据如何分区存放:
- 分区计算和数据存放的规则是两回事
计算分区时,需要按照字节数进行计算的,但是存放数据不能按照字节数存放(可能会把一些字符截断) - Spark分区数据存放时采用Hadoop的处理方式
Hadoop读取数据时不是按照字节读取的,是按行读取的 - Hadoop读取数据时不是按照数据的位置,而是按照数据的偏移量读取的(偏移量从0开始)
- Hadoop读取数据时,偏移量不会重复读取
RDD转换算子
算子:从认知心理学的角度来理解,解决一个问题,其实就是改变问题的状态:
问题(初始)=>operator(算子)=>问题(审批)=>operator(算子)=>问题(解决)
集合中的方法就称为方法;RDD中的方法称之为算子(List.flatMap & RDD.flatMap)
集合中的方法是单点操作;RDD中的方法(算子)是分布式的
转换:A(RDD) -> B(RDD)
RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型
所谓算子,其实就是RDD的方法;将有一个RDD转换成另外一个RDD的方法,就称之为转换算子
**
map算子将数据集中的每一条数据按照指定的规则进行转换,这里的转换可能是数值,也可能是类型
map算子需要传递一个参数,这个参数是函数类型:Int => U
默认情况下,旧的RDD转换成新的RDD,分区数量不变
RDD分区之间是并行计算的,但是RDD分区内**是串行计算的
计算时,分区数量不变,数据所在的分区也不变
扁平化:一个整体拆分成一个个的个体,所有的个体都要使用(flatMap)
clone()方法是浅复制:只把外部的复制了,内部的不会复制
map & mapPartitions
map: 将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换
mapPartitions: 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据
map算子和mapPartitions算子的区别:
- 数据处理角度
map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作(分区之间是无序的,第一个取到的不一定的是0号分区,Spark中分区号从0开始)。 - 功能的角度
map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。mapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据 性能的角度
map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions将分区数据当成整体来进行处理,会把整体数据都放进内存中,就需要内存中数据有限制,否则会出现溢出;mapPartitions操作处理的数据不会马上释放,需要等待整个分区的数据全部处理完,才会释放,所以mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。改为使用map操作。[x] 小功能:从服务器日志数据apache.log(见附件)中获取用户请求URL资源路径(map)
val rdd: RDD[String] = sc.textFile("data/apache.log") // eg: 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png val res: RDD[String] = rdd.map(_.split(" ")(6))
[x] 小功能:获取每个数据分区的最大值(mapPartitions)
val rdd = sc.makeRDD(List(1, 2, 3, 4, 3, 4, 5, 6), 4) rdd.mapPartitions(part => List(part.max).iterator)
mapPartitionsWithIndex
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引
- 小功能:获取第二个数据分区的数据
val rdd = sc.makeRDD(List(1, 2, 3, 4, 8, 9, 5, 6), 4) val res = rdd.mapPartitionsWithIndex( (index, part) => { if (index == 1) { part } else { // Nil.iterator返回一个空的List的迭代器 Nil.iterator } } )
Nil
是一个空的List
,定义为List[Nothing]
,根据List的定义List[+A]
,所以Nil
是所有List[T]
的子类。
flatMap
将处理的数据进行扁平化(将一个整体拆分成一个一个的个体来使用的方式)后再进行映射处理,所以算子也称之为扁平映射
- 小功能:将List(List(1,2),3,List(4,5))进行扁平化操作
Scala在函数中用模式匹配,外层函数必须使用中括号{}val rdd = sc.makeRDD(List(List(1, 2), 3, List(4, 5)), 1) // Scala在函数中用模式匹配,外层函数必须使用中括号{} val res: RDD[Any] = rdd.flatMap { case list: List[_] => list case num: Int => List(num) }
glom
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
- 小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),2) val maxRDD: RDD[Int] = rdd.glom().map(_.max) // collect()将 RDD => Array println(maxRDD.collect().sum)
collect()
将 RDD => Array
groupBy
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中makeRDD(List, 2) // 分区
RDD.groupby(func()) // 分组
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
Q: 这两个分区、分组数量不一致怎么办?
A: 一个组的数据只能在一个分区中
groupBy分组后可能会导致不同分区的数据不均衡
默认情况下,分组前和分组后的分区数量不变
(RDD.map处理完的数据,原来数据在哪个分区,处理完还在同一个分区)
groupBy会将同一个分区的数据打乱,和其他分区的数据重新组合在一起,将这样的现象称之为 shuffle
groupBy分组会在执行过程中进行等待,直到所有的数据分组完成后,才能继续往后执行
groupBy执行过程中的等待操作是通过磁盘完成的(数据太多,放内存会炸),所以shuffle操作一定要落盘
所有的shuffle操作都有改变分区的能力(通过方法传参改变分区数量,例如:groupBy)如果一个算子能改变分区,它90%是有shuffle的**
[x] 小功能:将List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。
val rdd = sc.makeRDD(List("Hello", "hive", "hbase", "Hadoop"), 1) rdd.groupBy(_.substring(0, 1))
[x] 小功能:从服务器日志数据apache.log中获取每个时间段访问量。(其实就是WordCount)
val rdd: RDD[String] = sc.textFile("data/apache.log") // eg: 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png rdd.map(_.split(":")(1)).groupBy(_.substring(0, 2)).map(data=>(data._1, data._2.size))
[x] 小功能:WordCount
val rdd: RDD[String] = sc.textFile("data/apache.log") // eg: 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png rdd.map( line => { val hour = line.split(":")(1) (hour, 1) } ).groupBy(_._1).mapValues(_.size).collect().foreach(println)
filter
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
filter算子会根据 指定的规则对数据集种的每一条数据进行判断
如果想要将数据保留,那么结果返回true,如果数据不要,返回false
Scala集合中的filter考虑的是单点操作
SparkRDD的filter考虑分区数据的均衡
0(1000) 1(1000) 2(1000) => 0(100) 1(500) 2(1) => 导致数据倾斜
SparkRDD的filter没有shuffle,分区数量不变,但是分区内的数据可能不均衡,可能会出现数据倾斜
- 小功能:从服务器日志数据apache.log中获取2015年5月17日的请求路径
val rdd: RDD[String] = sc.textFile("data/apache.log") // eg: 83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png rdd.filter(_.split(" ")(3).startsWith("17/05/2015")).map(_.split(" ")(6)) .collect().foreach(println)
sample
根据指定的规则从数据集中抽取数据
sample方法参数:
- 第一个参数表示数据抽取后是否放回
- 第二个 参数依赖于第一个参数,如果第一个参数为false(不放回),该参数表示每个数据被抽取的几率,取值为0-1之间,而不表示有多少比例的数据被抽取出来;如果第一个参数为true(放回),该参数表示每个数据预期被抽取的次数,取值大于0
- 第三个参数:seed 随机数种子(用来做随机算法的第一个数)
随机数不随机:随机算法相同,且随机数种子相同时,后面每次随机算法的结果(得到的随机数)都相同
一定程度上可以减小发生数据倾斜的可能性:sample多次抽取部分数据后可以发现有数据倾斜
val dataRDD = sparkContext.makeRDD(List(1, 2, 3, 4), 1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)
distinct
将数据集中重复的数据去重
思考一个问题:如果不用该算子,你有什么办法实现数据去重?
即使用distinct源码的实现:map(x => (x, null)).reduceByKey((x, _) => x).map(_._1)
数据处理过程:1, 1 => (1, null), (1, null) => (1, (null, null)) => (1, null) => 1
coalesce
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本
分区合并的策略:
不考虑数据是否均衡,只考虑合并更快
coalesce参数:
- 第一个参数表示改变分区的数量
- 第二个参数表示数据是否进行shuffle,默认为false
如果不使用shuffle的场合下,分区数量不能修改为比之前大的值(不报错,能运行,但是没效果),因为数据不会打乱重新组合
repartition
增加分区,并进行shuffle重新分区
该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true(coalesce(numPartitions, shuffle = true)
)
无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。
sortBy
该操作用于排序数据。在排序之前,可以将数据通过函数进行处理,之后按照函数处理的结果进行排序,默认为升序排列。默认排序后新产生的RDD的分区数与原RDD的分区数一致(也可以修改分区数量)。中间存在shuffle的过程。
intersection & union & subtract
交集 & 并集 & 差集
这三个算子返回的都是新的RDD
思考一个问题:如果两个RDD数据类型不一致怎么办?
不同类型的两个数据集无法实现交集,并集,差集的操作,可以实现拉链操作**
zip
将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素
- 不同类型的两个数据集无法实现交集,并集,差集的操作,可以实现拉链操作
- Spark中拉链:每个分区元素数量不同不可以做拉链;分区数量不相同也不可以做拉链
- Scala中拉链:两边长度不一样,取短边的长度做拉链
partitionBy
将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner
这个算子强调的是数据分区的变化,而不是分区数量的变化(coalesce, repartition)
List list = new ArrayList();
ArrayList list2 = new ArrayList();
list.clone(); // 没有clone方法
list2.clone(); // 可以调用clone方法
List是一个接口,ArrayList是一个类,当想使用类的特有的方法,需要使用类去声明,而不能用通用的接口声明
RDD一定没有 partitionBy方法,K-V类型的方法都通过隐式转换调用这个方法(reduceByKey也一样)但要求数据类型必须为K-V类型
RangePartitioner(范围分区器)用于做排序,特定场合用(sortBy)
思考一个问题:如果重分区的分区器和当前RDD的分区器一样怎么办?
如果重分区的分区器和之前的分区器相同,则不会进行重分区,会直接返回原来的RDD,源码中重写了equals方法
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
// Scala中 == 就调用的 equals 方法
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
reduceByKey
可以将数据按照相同的Key对Value进行聚合(分组+聚合)
- 小功能:WordCount
val rdd= sc.textFile("D:\\idea-workplace\\Spark\\data\\apache.log") // WordCount rdd.map(line => (line.split(":")(1), 1)).reduceByKey(_+_) .collect().foreach(println)
groupByKey
将数据源的数据根据key对value进行分组
思考一个问题:reduceByKey和groupByKey的区别?
从shuffle的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能(mapSideCombine = true
),这样会减少落盘的数据量,而groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey性能比较高。
从功能的角度:reduceByKey其实包含分组和聚合的功能。groupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey
**
- 小功能:WordCount
val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3))) rdd.groupByKey().mapValues(_.sum).collect().foreach(println)
aggregateByKey
将数据根据不同的规则进行分区内计算和分区间计算(这里分区内和分区间的操作都是针对Value的操作,和Key无关)
该算子用于相同key对value进行聚合(分区内计算和分区间计算的逻辑可以是不一样的)
而reduceByKey算子分区内计算和分区间计算的逻辑是一样的
该算子有函数柯里化操作,存在多个参数列表
第一个参数列表有一个参数,这个参数表示计算初始值
(Spark或Scala中 z || zero 表示 零值 或 初始值)
第二个参数列表有两个参数
• 第一个参数表示分区内计算规则(相同key对value进行操作)
• 第二个参数表示分区间计算规则(相同key对value进行操作)
如果aggregateByKey分区内计算逻辑和分区间计算逻辑相同的场合,可以采用另外一个算子代替:foldByKey(初始值)(计算逻辑)
(有初始值的概念,不能用reduceByKey是因为reduceByKey不能设置初始值)
初始值可以理解为集合内数据与集合外数据一起做操作,类似Scala集合的fold方法
- 小功能:取出每个分区内相同key的最大值然后分区间相加 ```scala // 取出每个分区内相同key的最大值然后分区间相加 val rdd = sc.makeRDD(List( (“a”,1),(“a”,2),(“c”,3), (“b”,4),(“c”,5),(“c”,6) ),2)
rdd.aggregateByKey(0)( (x, y) => math.max(x, y), (x, y) => x + y ).collect().foreach(println)
---
<a name="V7VdN"></a>
##### foldByKey
当分区内计算规则和分区间计算规则相同时,`aggregateByKey`就可以简化为`foldByKey`
---
<a name="LtVeU"></a>
##### combineByKey
最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
- [x] **小功能:**将数据List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个key的平均值
```scala
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
(_, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?
reduceByKey
: 相同key的第一个数据不进行任何计算,分区内和分区间计算规则相同foldByKey
: 相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同aggregateByKey
:相同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同combineByKey
: 当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构,分区内和分区间计算规则可以不相同。
这里x不能直接用下划线表示,因为x是由第一个参数转换来的,不能直接推断出来
sortByKey
在一个(K,V)的RDD上调用,K必须实现Ordered接口(特质),返回一个按照key进行排序的RDD
该算子根据key进行排序,默认升序
对key排序,不是对value排序
使用范围分区器 RangePartitioner
[x] 小功能:设置key为自定义类User
object Test { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Test") val sc: SparkContext = new SparkContext(conf) val rdd = sc.makeRDD( List( (new User(30), 1), (new User(10), 2), (new User(80), 3), (new User(20), 4) ) ) rdd.sortByKey(false).collect().foreach(println) sc.stop() } class User extends Ordered[User] with Serializable { var user_age: Int = _ def this(age: Int) { this() user_age = age } override def compare(that: User): Int = { this.user_age - that.user_age } override def toString: String = "user_age: " + this.user_age } }
join & leftOuterJoin & rightOuterJoin
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD
两个数据集可以连接的条件是相同的key的数据进行连接
只要数据中有相同的key,都会进行连接,不考虑数量的问题 => 会导致笛卡尔积
所以,join可能会产生大量的数据,如果业务中存在shuffle,那么性能非常差,所以不推荐使用,能不用就不用,能代替就代替
leftOuterJoin: 类似于SQL语句的左外连接
rdd1.leftOuterJoin(rdd2) // rdd1是主表,rdd2是从表
rightOuterJoin: 类似于SQL语句的右外连接
rdd1.rightOuterJoin(rdd2) // rdd2是主表,rdd1是从表
主表中所有的都会出现,从表中只有连接上的才会出现
**
cogroup
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable
cogroup => group + connect
案例实操
统计出每一个省份每个广告被点击数量排行的Top3
agent.log(见附件):时间戳,省份,城市,用户,广告,中间字段使用空格分隔
(最后排序取前三时,将分布式数据进行单点排序,可能内存不够,有可能程序跑不通)
object Test {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Test")
val sc: SparkContext = new SparkContext(conf)
// 1. 获取数据 - agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔
val rdd: RDD[String] = sc.textFile("data/agent.log")
// 2. 时间戳 省份 城市 用户 广告 => (省份-广告, 1)
val tupleRdd = rdd.map(
line => {
val prov: String = line.split(" ")(1)
val ad: String = line.split(" ")(4)
(prov + "-" + ad, 1)
}
)
// 3. (省份-广告, 1) => (省份-广告, sum)
val tupleSumRDD = tupleRdd.reduceByKey(_ + _)
// 4. (省份-广告, sum) => (省份, (广告, sum))
val splitSumRDD = tupleSumRDD.map{
case (prov_ad, sum) => {
val prov: String = prov_ad.split("-")(0)
val ad: String = prov_ad.split("-")(1)
(prov, (ad, sum))
}
}
// 5. (省份, (广告, sum)) => (省份, List[(广告1, sum1), (广告2, sum2), (广告3, sum3), (广告4, sum4)])
// val groupSumRDD = splitSumRDD.groupByKey()
// 6. (省份, List[(广告1, sum1), (广告2, sum2), (广告3, sum3), (广告4, sum4)]) => (省份, List[top3])
// 将分布式数据进行单点排序,可能内存不够,有可能程序跑不通
// val top3 = groupSumRDD.mapValues(
// iter => {
// iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
// }
//)
// 优化过后的
val top3 = splitSumRDD.aggregateByKey(ArrayBuffer[(String, Int)]())(
(buffer, t) => {
buffer.append(t)
buffer.sortBy(_._2)(Ordering.Int.reverse).take(3)
},
(buff1, buff2) => {
buff1.appendAll(buff2)
buff1.sortBy(_._2)(Ordering.Int.reverse).take(3)
}
)
// 7. 打印结果
top3.collect().foreach(println)
sc.stop()
}
}
RDD行动算子
行动:Action,用于让RDD的计算开始执行
所谓的行动算子,其实就是用于触发RDD运行的方法
reduce
聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
collect
在驱动程序(Driver)中,以数组Array的形式返回数据集的所有元素
collect().foreach(println) // 这里的print是 Scala Array的,单点操作,串行打印,数据与源数据顺序相同
rdd.foreach(println) // 这里的print是RDD的,分布式,并行打印,数据与源数据顺序可能不同
附件
相关文档:
数据来源: