Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
- RDD:弹性分布式数据集
- 累加器:分布式共享只写变量
-
1 RDD
1.1 什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
1.2 特点
弹性
- 存储的弹性:内存与磁盘的自动切换
- 容错的弹性:数据丢失可以自动恢复
- 计算的弹性:计算出错重试机制
- 分片的弹性:可根据需要重新分片
- 分布式:数据存储在大数据集群不同节点上
- 数据集:RDD 封装了计算逻辑,并不保存数据
- 数据抽象:RDD 是一个抽象类,需要子类具体实现
- 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
- 可分区、并行计算
类比Java IO和RDD
- IO流,体现装饰者模式

- RDD

1.3 核心属性

- 分区列表
RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
- 分区计算函数
Spark在计算时,是使用分区函数对每一个分区进行计算
- RDD之间的依赖关系
RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系
- 分区器(可选)
当数据为KV类型数据时,可以通过设定分区器自定义数据的分区
- 首选位置(可选)
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
1.4 执行原理
从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。
RDD 是 Spark 框架中用于数据处理的核心模型,主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算。在 Yarn 环境中,RDD的工作原理:
- 启动Yarn集群环境

- Spark通过申请资源创建调度节点和计算节点

- Spark框架根据需求将计算逻辑分区划分成不同的任务

- 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
1.5 基础编程
1.5.1 RDD 并行度与分区
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量称之为并行度。这个数量可以在构建 RDD 时指定。这里的并行执行的任务数量,并不是指的切分任务的数量。
object Spark01_RDD_Memory_Par {def main(args: Array[String]): Unit = {// TODO 准备环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数// 也可以用sparkConf.set("spark.default.parallelism","5")val sc = new SparkContext(sparkConf)// TODO 创建RDD// RDD的并行度 & 分区// makeRDD方法可以传递第二个参数,表示分区的数量// 第二个参数可以不传递,默认值defaultParallelism(默认并行度)// scheduler.conf.getInt("spark.default.parallelism", totalCores)// spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism// 如果获取不到,使用totalCores属性,这个属性取值为当前运行环境的最大可用核数val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// 将处理的数据保存成分区文件rdd.saveAsTextFile("output")// TODO 关闭环境sc.stop()}}
1.5.2 RDD创建
1)从集合(内存)中创建RDD
从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD
object Spark01_RDD_Memory {def main(args: Array[String]): Unit = {// TODO 准备环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数val sc = new SparkContext(sparkConf)// TODO 创建RDD// 从内存中创建RDD,将内存中集合的数据作为处理的数据源val seq = Seq[Int](1, 2, 3, 4)// parallelize:并行// val rdd: RDD[Int] = sc.parallelize(seq)// makeRDD方法在底层实现时就是调用了rdd对象的parallelize方法/*/** Distribute a local Scala collection to form an RDD.** This method is identical to `parallelize`.* @param seq Scala collection to distribute* @param numSlices number of partitions to divide the collection into* @return RDD representing distributed collection*/def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {parallelize(seq, numSlices)}*/val rdd: RDD[Int] = sc.makeRDD(seq)rdd.collect().foreach(println)// TODO 关闭环境sc.stop()}}
分区数据的分配
object Spark01_RDD_Memory_Par1 {def main(args: Array[String]): Unit = {// TODO 准备环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数val sc = new SparkContext(sparkConf)// TODO 创建RDD// RDD的并行度 & 分区// 分3个分区,此时会以[1],[2],[3,4]分// val rdd = sc.makeRDD(List(1, 2, 3, 4), 3)// 分3个分区,此时会以[1],[2,3],[4,5]分val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3)// 将处理的数据保存成分区文件rdd.saveAsTextFile("output")// TODO 关闭环境sc.stop()}}
2)从外部存储(文件)创建RDD
由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。
object Spark02_RDD_File {def main(args: Array[String]): Unit = {// TODO 准备环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数val sc = new SparkContext(sparkConf)// TODO 创建RDD// 从文件中创建RDD,将文件中的数据作为处理的数据源// path路径默认以当前环境的根路径为基准,可以写绝对路径、相对路径// sc.textFile("C:\\Users\\ace\\Desktop\\study_spark\\datas\\1.txt")// val rdd: RDD[String] = sc.textFile("datas/1.txt")// 也可以写目录名称,对所有文件统计// val rdd = sc.textFile("datas")// path路径还可以使用通配符// val rdd = sc.textFile("datas/1*.txt")// path还可以是分布式存储系统路径:HDFSval rdd = sc.textFile("hdfs://hadoop102:8020/test.txt")rdd.collect().foreach(println)// TODO 关闭环境sc.stop()}}
object Spark02_RDD_File1 {def main(args: Array[String]): Unit = {// TODO 准备环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数val sc = new SparkContext(sparkConf)// TODO 创建RDD// 从文件中创建RDD,将文件中的数据作为处理的数据源// textFile:以行为单位读取数据,读取的数据都是字符串// wholeTextFiles:以文件为单位读取数据// 读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容val rdd = sc.wholeTextFiles("datas")rdd.collect().foreach(println)// TODO 关闭环境sc.stop()}}

分区的设定
object Spark02_RDD_File_Par {def main(args: Array[String]): Unit = {// TODO 准备环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数val sc = new SparkContext(sparkConf)// TODO 创建RDD// textFile将文件作为数据处理的数据源,默认也可用设定分区// minPartitions:最小分区数量// math.min(defaultParallelism, 2)// val rdd = sc.textFile("datas/1.txt")// 如果不想使用默认的分区数,可用通过第二个参数指定分区数// Spark读取文件,底层使用Hadoop的读取方式// 分区数量计算方式:// totalSize = 26// long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);// goalSize = 26 / 3 = 8...2// 超过了1.1倍,新的分区,一共4个val rdd = sc.textFile("datas/1.txt", 3)rdd.saveAsTextFile("output")// TODO 关闭环境sc.stop()}}
分区数据的分配
11.txt中,数据为
123
object Spark02_RDD_File_Par {def main(args: Array[String]): Unit = {// TODO 准备环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数val sc = new SparkContext(sparkConf)// TODO 创建RDD// 分区数量计算方式:// totalSize = 7// long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);// goalSize = 7 / 3 = 2...1// 超过了1.1倍,新的分区,一共3个val rdd = sc.textFile("datas/11.txt", 2)rdd.saveAsTextFile("output")// TODO 关闭环境sc.stop()}}

object Spark02_RDD_File_Par1 {def main(args: Array[String]): Unit = {// TODO 准备环境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数val sc = new SparkContext(sparkConf)// TODO 创建RDD// 分区数量计算方式:// totalSize = 7// long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);// goalSize = 7 / 3 = 2...1// 超过了1.1倍,新的分区,一共3个// TODO 数据分区的分配// 1. 数据以行为单位进行读取// spark读取文件,采用的是hadoop的方式读取,一行一行读,和字节数没有关系// 2. 数据读取时以偏移量为单位,偏移量不会被重复读取/*前两行后面都跟了回车换行,用@占位文件内容=>偏移量1@@ =>0122@@ =>3453@@ =>6*/// 3. 数据分区的偏移量范围的计算// 分区 => 偏移量 =>分区中数据// 0 => [0,3] => 12// 1 => [3,6] => 3// 2 => [6,7] => 空// 三个分区// [12],[3],[]val rdd = sc.textFile("datas/11.txt", 2)rdd.saveAsTextFile("output")// TODO 关闭环境sc.stop()}}
- 如果数据源为多个文件,计算分区时以文件为单位进行分区
感觉不太对但是debug半天好像确实是这样,迷惑。。。

1.5.3 RDD算子
1.5.3.1 RDD转换算子
RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型
1. Value类型
(1)map
- 函数签名
def mapU: ClassTag: RDD[U]
- 函数说明
将处理的数据逐条进行映射转换
object Spark01_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子-mapval rdd = sc.makeRDD(List(1, 2, 3, 4))// 1,2,3,4 => 2,4,6,8// 转换函数def mapFunction(num: Int): Int = {num * 2}// val mapRDD: RDD[Int] = rdd.map(mapFunction)// val mapRDD: RDD[Int] = rdd.map((num: Int) => {num * 2})val mapRDD: RDD[Int] = rdd.map(_ * 2)mapRDD.collect().foreach(println)sc.stop()}}
例:从服务器日志数据apache.log中获取用户请求URL资源路径
object Spark01_RDD_Operator_Transform_Test {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子-mapval rdd = sc.textFile("datas/apache.log")// 长的字符串 => 短的字符串val mapRDD: RDD[String] = rdd.map(line => {val datas = line.split(" ")datas(6)})mapRDD.collect().foreach(println)sc.stop()}}

例:并行
object Spark01_RDD_Operator_Transform_Par {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子-mapval rdd = sc.makeRDD(List(1, 2, 3, 4), 1)val mapRDD = rdd.map(num => {println(">>>>>>>" + num)num})val mapRDD1 = mapRDD.map(num => {println("#####" + num)num})mapRDD1.collect()sc.stop()}}
- 分区为1

rdd的计算一个分区内的数据是一个一个执行,只有只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据
=> 分区内数据的执行是有序的
- 分区为2,第7行 val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

不同分区数据计算是无序的
(2)mapPartitions
- 函数签名
def mapPartitionsU: ClassTag: RDD[U]
- 函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据
object Spark02_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子-mapval rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// mapPartitions:可以以分区为单位进行数据转换操作// 会将整个分区的数据加载到内存进行引用// 处理完的数据不会被释放掉,存在对象的引用// 在内存较小,数据量较大的场合下,容易出现内存溢出val mapRDD: RDD[Int] = rdd.mapPartitions(iter => {println(">>>>>>>")iter.map(_ * 2)})mapRDD.collect().foreach(println)sc.stop()}}

例:获取每个数据分区的最大值
object Spark02_RDD_Operator_Transform_Test {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子-mapval rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// [1,2],[3,4] => [2],[4]val mapRDD = rdd.mapPartitions(iter => {List(iter.max).iterator})mapRDD.collect().foreach(println)sc.stop()}}

➢ 数据处理角度
Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。
➢ 功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据。
➢ 性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。
(3)mapPartitionsWithIndex函数签名
def mapPartitionsWithIndexU: ClassTag => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
- 函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
// 获取第二个数据分区的数据object Spark03_RDD_Operator_Transform {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子-mapval rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// [1,2],[3,4]val mapRDD = rdd.mapPartitionsWithIndex((index, iter) => {if(index == 1) {iter} else {Nil.iterator}})mapRDD.collect().foreach(println)sc.stop()}}

object Spark03_RDD_Operator_Transform1 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// TODO 算子-mapval rdd = sc.makeRDD(List(1, 2, 3, 4))val mapRDD = rdd.mapPartitionsWithIndex((index, iter) => {iter.map(num => {(index, num)})})mapRDD.collect().foreach(println)sc.stop()}}

(4)flatMap
- 函数签名
def flatMapU: ClassTag: RDD[U]
- 函数说明
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
object Spark04_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-flatMap
val rdd: RDD[List[Int]] = sc.makeRDD(List(
List(1, 2), List(3, 4)
))
val flatRDD: RDD[Int] = rdd.flatMap(
list => {
list
}
)
flatRDD.collect().foreach(println)
sc.stop()
}
}

object Spark04_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-flatMap
val rdd: RDD[String] = sc.makeRDD(List(
"Hello Scala", "Hello Spark"
))
val flatRDD: RDD[String] = rdd.flatMap(
s => {
s.split(" ")
}
)
flatRDD.collect().foreach(println)
sc.stop()
}
}

object Spark04_RDD_Operator_Transform2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-flatMap
val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))
val flatRDD = rdd.flatMap(
data => {
data match {
case list: List[_] => list
case dat => List(dat)
}
}
)
flatRDD.collect().foreach(println)
sc.stop()
}
}

(5)glom
- 函数签名
def glom(): RDD[Array[T]]
- 函数说明
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
object Spark05_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-glom
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// List => Int
// Int => Array
val glomRDD: RDD[Array[Int]] = rdd.glom()
glomRDD.collect().foreach(data => println(data.mkString(",")))
sc.stop()
}
}

例:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
object Spark05_RDD_Operator_Transform_Test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子-glom val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2) // [1,2],[3,4] // [2],[4]=>[6] val glomRDD: RDD[Array[Int]] = rdd.glom() val maxRDD: RDD[Int] = glomRDD.map( array => { array.max } ) println(maxRDD.collect().sum) sc.stop() } }
(6)groupBy函数签名
def groupByK(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
- 函数说明
将数据根据指定的规则进行分组,分区默认不变,但数据会被打乱重新组合,称为shuffle
极限情况下,数据可能被分在同一分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
object Spark06_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-groupBy
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
// groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
// 相同的key值的数据会放置在一个组中
def groupFunction(num: Int): Int = {
num % 2
}
val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
groupRDD.collect().foreach(println)
sc.stop()
}
}

例:将 List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组
object Spark06_RDD_Operator_Transform1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子-groupBy val rdd = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2) // 分组和分区没有必然的关系 val groupRDD = rdd.groupBy(_.charAt(0)) groupRDD.collect().foreach(println) sc.stop() } }
例:从服务器日志数据 apache.log 中获取每个时间段访问量
object Spark06_RDD_Operator_Transform_Test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子-groupBy val rdd = sc.textFile("datas/apache.log") val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map( line => { val datas = line.split(" ") val time = datas(3) // time.substring() val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") val date: Date = sdf.parse(time) val sdf1 = new SimpleDateFormat("HH") val hour = sdf1.format(date) (hour, 1) } ).groupBy(_._1) timeRDD.map { case (hour, iter) => { (hour, iter.size) } }.collect().foreach(println) sc.stop() } }
(7)filter函数签名
def filter(f: T => Boolean): RDD[T]
- 函数说明
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
object Spark07_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-filter
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)
filterRDD.collect().foreach(println)
sc.stop()
}
}

例:从日志数据中获取2015年5月17日的请求路径
object Spark07_RDD_Operator_Transform_Test { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // TODO 算子-filter val rdd = sc.textFile("datas/apache.log") rdd.filter( line => { val datas = line.split(" ") val time = datas(3) time.startsWith("17/05/2015") } ).collect().foreach(println) sc.stop() } }
(8)sample函数签名
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
- 函数说明
根据指定的规则从数据集中抽取数据
object Spark08_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-sample
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
// sample算子需要传递3个参数
// 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
// 2. 第二个参数表示,数据源中每条数据被抽取的概率
// 3. 第三个参数表示,抽取数据时随机算法的种子
println(rdd.sample(
false,
0.4,
1
).collect().mkString(","))
sc.stop()
}
}

object Spark08_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-sample
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
// sample算子需要传递3个参数
// 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
// 2. 第二个参数表示,数据源中每条数据被抽取的概率
// 3. 第三个参数表示,抽取数据时随机算法的种子
// 如果不传递第三个参数,使用的是当前系统时间
println(rdd.sample(
false,
0.4,
// 1
).collect().mkString(","))
sc.stop()
}
}
(9)distinct
- 函数签名
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
- 函数说明
将数据集中重复的数据去重
object Spark09_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-filter
val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
// distinct源码
// map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
// x => (x, null):(1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)
// reduceByKey:(1, null)(1, null) => (null, null)
// (x, _) => x:(null, null) => null
// (1, null)
// map(_._1):1
val rdd1: RDD[Int] = rdd.distinct()
rdd1.collect().foreach(println)
sc.stop()
}
}

(10)coalesce
- 函数签名
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
- 函数说明
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。当 Spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本。
object Spark10_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-filter
val rdd = sc.makeRDD(List(1, 2, 3, 4), 4)
val newRDD: RDD[Int] = rdd.coalesce(2)
newRDD.saveAsTextFile("output")
sc.stop()
}
}

object Spark10_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-filter
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val newRDD: RDD[Int] = rdd.coalesce(2)
newRDD.saveAsTextFile("output")
sc.stop()
}
}



val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
// coalesce方法默认情况下不会将分区的数据打乱重写组合
// 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
// 如果想要让数据均衡,可以进行shuffle处理
val newRDD: RDD[Int] = rdd.coalesce(2)


(11)repartition
- 函数签名
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
object Spark11_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-filter
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
// coalesce算子可以扩大分区,但是如果不进行shuffle操作,不起作用
// 如果要扩大分区,要使用shuffle操作
// 缩减分区:coalesce,如果想要数据均衡,采用shuffle
// 扩大分区:
// 1. val newRDD: RDD[Int] = rdd.coalesce(3,true) // [3,5],[1,6],[2,4]
// 2. repartition 底层:
/*
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
*/
val newRDD: RDD[Int] = rdd.repartition(3)
newRDD.saveAsTextFile("output")
sc.stop()
}
}
(12)sortBy
- 函数签名
def sortByK => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
- 函数说明
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。
object Spark12_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-sortBy
val rdd = sc.makeRDD(List(6, 4, 3, 2, 5, 1), 2)
val newRDD: RDD[Int] = rdd.sortBy(num => num)
newRDD.saveAsTextFile("output") // [1,2,3],[4,5,6]
sc.stop()
}
}
object Spark12_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-sortBy
val rdd = sc.makeRDD(List(("1",1),("11",2),("2",3)), 2)
val newRDD: RDD[(String, Int)] = rdd.sortBy(t => t._1)
newRDD.collect().foreach(println)
sc.stop()
}
}

object Spark12_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-sortBy
val rdd = sc.makeRDD(List(("1",1),("11",2),("2",3)), 2)
// sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序
// 第二个参数可以改变排序的方式
// sortBy默认情况下,不会改变分区,但中间存在shuffle操作
val newRDD: RDD[(String, Int)] = rdd.sortBy(t => t._1,false)
newRDD.collect().foreach(println)
sc.stop()
}
}
2. 双Value类型
object Spark13_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-双Value类型
// 交集,并集和差集要求两个数据源数据类型保持一致
// 拉链操作两个数据源的类型可以不一致
val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
val rdd7 = sc.makeRDD(List("1", "2", "3", "4"))
// 交集 [3,4]
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
println(rdd3.collect().mkString(","))
// 并集 [1,2,3,4,3,4,5,6]
val rdd4: RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect().mkString(","))
// 差集 [1,2]
val rdd5: RDD[Int] = rdd1.subtract(rdd2)
println(rdd4.collect().mkString(","))
// 拉链 (1,3),(2,4),(3,5),(4,6)
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
val rdd8: RDD[(Int, String)] = rdd1.zip(rdd7)
println(rdd8.collect().mkString(",")) // (1,1),(2,2),(3,3),(4,4)
sc.stop()
}
}
object Spark13_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-双Value类型
val rdd1 = sc.makeRDD(List(1, 2, 3, 4), 2)
val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 4)
// 两个数据源要求分区数量要保持一致
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
sc.stop()
}
}

object Spark13_RDD_Operator_Transform1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-双Value类型
val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 2)
// 两个数据源要求分区中数据数量保持一致
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect().mkString(","))
sc.stop()
}
}
3. Key-Value类型
(1)partitionBy
- 函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
- 函数说明
将数据按照指定Partitioner重新进行分区,默认分区器是HashPartitioner
object Spark14_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD = rdd.map((_, 1))
// RDD=>PairRDDFunctions 隐式转换(二次编译)
/*
伴生对象中有隐式
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
*/
// partitionBy根据指定的分区规则对数据进行重分区
mapRDD.partitionBy(new HashPartitioner(2))
.saveAsTextFile("output")
sc.stop()
}
}

(2)reduceByKey
- 函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
object Spark15_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3), ("b", 4)
))
// reduceByKey相同的key的数据进行value数据的聚合操作
// [1,2,3]
// reduceByKey中如果key的数据只有一个,不会参与运算
val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => {
println(s"x=${x}, y=${y}")
x + y
})
reduceRDD.collect().foreach(println)
sc.stop()
}
}

(3)groupByKey
- 函数签名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
object Spark16_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3), ("b", 4)
))
// groupByKey:将数据源中的数据,相同key的数据分在一组中,形成一个对偶元组
// 元组中的第一个元素就是key
// 元组中的第二个元素是相同key的value集合
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
groupRDD.collect().foreach(println)
val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
sc.stop()
}
}
- reduceByKey和groupByKey的区别


- 从Shuffle的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高
- 从功能的角度:reduceByKey包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合,只能使用 groupByKey
(4)aggregateByKey
- 函数签名
def aggregateByKeyU: ClassTag(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
object Spark17_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3), ("a", 4)
), 2)
// => (a,[1,2]),(a,[3,4])
// => (a,2),(a,4)
// => (a,6)
// aggregateByKey存在函数柯里化,有两个参数列表
// 第一个参数列表,需要传递一个参数,表示为初始值,主要用于第一个key的时候,和value进行分区内计算
// 第二个参数列表
// 第一个参数表示分区内计算规则
// 第二个参数表示分区间计算规则
// math.min(x, y)
// math.max(x, y)
rdd.aggregateByKey(0)(
(x, y) => math.max(x, y),
(x, y) => x + y
).collect.foreach(println)
sc.stop()
}
}

object Spark17_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)
// => (a,[1,2]),(a,[3,4])
// => (a,2),(a,4)
// => (a,6)
// aggregateByKey存在函数柯里化,有两个参数列表
// 第一个参数列表,需要传递一个参数,表示为初始值,主要用于第一个key的时候,和value进行分区内计算
// 第二个参数列表
// 第一个参数表示分区内计算规则
// 第二个参数表示分区间计算规则
// math.min(x, y)
// math.max(x, y)
rdd.aggregateByKey(5)(
(x, y) => math.max(x, y),
(x, y) => x + y
).collect.foreach(println)
sc.stop()
}
}


object Spark17_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)
// aggregateByKey最终的返回数据结果应该和初始值的类型保持一致
// 获取相同key数据的平均值 => (a,3),(b,4)
val newRDD: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))(
(t, v) => {
(t._1 + v, t._2 + 1)
},
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
case (num, cnt) => {
num / cnt
}
}
resultRDD.collect().foreach(println)
sc.stop()
}
}

(5)foldByKey
- 函数签名
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
object Spark17_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)
// 如果聚合计算时,分区内和分区间计算规则相同,aggregateByKey 就可以简化为 foldByKey
rdd.foldByKey(0)(_ + _).collect.foreach(println)
sc.stop()
}
}

(6)combineByKey
- 函数签名
def combineByKeyC => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
- 函数说明
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
object Spark19_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)
// combineByKey:方法需要三个参数
// 第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
// 第二个参数表示:分区内的计算关系
// 第三个参数表示:分区间的计算关系
val newRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1),
(t: (Int, Int), v) => {
(t._1 + v, t._2 + 1)
},
(t1: (Int, Int), t2: (Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
case (num, cnt) => {
num / cnt
}
}
resultRDD.collect().foreach(println)
sc.stop()
}
}
reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?
- reduceByKey:相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
- foldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
- aggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
- combineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同
(7)join
- 函数签名
def joinW]): RDD[(K, (V, W))]
- 函数说明
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD
object Spark21_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2), ("c", 3)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5), ("c", 6)
))
val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)
sc.stop()
}
}

object Spark21_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd1 = sc.makeRDD(List(
("a", 1), ("a", 2), ("c", 3)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("c", 5), ("a", 6)
))
// join:两个不同数据源的数据,相同的key的value会连接在一起,形成元组
/*
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2), ("c", 3)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5), ("c", 6)
))
(a,(1,4))
(b,(2,5))
(c,(3,6))
*/
// 如果两个数据源中key没有匹配上,数据不会出现在结果中
/*
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2), ("c", 3)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("d", 5), ("c", 6)
))
(a,(1,4))
(c,(3,6))
*/
// 如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔积
/*
val rdd1 = sc.makeRDD(List(
("a", 1), ("a", 2), ("c", 3)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("c", 5), ("a", 6)
))
(a,(1,4))
(a,(1,6))
(a,(2,4))
(a,(2,6))
(c,(3,5))
*/
val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)
sc.stop()
}
}
(8)leftOuterJoin
- 函数签名
def leftOuterJoinW]): RDD[(K, (V, Option[W]))]
object Spark22_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2), ("c", 3)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5)
))
val leftJoinRDD = rdd1.leftOuterJoin(rdd2)
val rightJoinRDD = rdd1.rightOuterJoin(rdd2)
leftJoinRDD.collect().foreach(println)
println("=====")
rightJoinRDD.collect().foreach(println)
sc.stop()
}
}

(9)cogroup
- 函数签名
def cogroupW]): RDD[(K, (Iterable[V], Iterable[W]))]
- 函数说明
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable
object Spark23_RDD_Operator_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 算子-Key-Value类型
val rdd1 = sc.makeRDD(List(
("a", 1), ("b", 2)
))
val rdd2 = sc.makeRDD(List(
("a", 4), ("b", 5), ("c", 6), ("c", 7)
))
// cogroup: connect + group,分组连接
val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
cgRDD.collect().foreach(println)
sc.stop()
}
}
例
数据:agent.log,时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
需求:统计出每一个省份每个广告被点击数量排行的 Top3
object Spark24_RDD_Req {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// 1. 获取原始数据:时间戳,省份,城市,用户,广告
val dataRDD = sc.textFile("datas/agent.log")
// 2. 将原始数据进行结构的转换
// 时间戳,省份,城市,用户,广告=>((省份,广告),1)
val mapRDD = dataRDD.map(
line => {
val datas = line.split(" ")
((datas(1), datas(4)), 1)
}
)
// 3. 将转换结构后的数据进行分组聚合
// ((省份,广告),1)=> ((省份,广告),sum)
val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _)
// 4. 将聚合的结果进行结构转换
// ((省份,广告),sum)=> (省份,(广告,sum))
val newMapRDD: RDD[(String, (String, Int))] = reduceRDD.map {
case ((prv, ad), sum) => {
(prv, (ad, sum))
}
}
// 5. 将转换结构后的数据根据省份进行分组
// (省份,【(广告A,sumA),(广告B,sumB)】)
val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()
// 6. 将分组后的数据组内排序(降序),取前3名
val resultRDD = groupRDD.mapValues(
iter => {
// Iterable不能排序,转成List
// 默认升序,改成降序
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
}
)
// 7. 采集数据打印在控制台
resultRDD.collect().foreach(println)
sc.stop()
}
}
1.5.3.2 RDD行动算子
(1)collect
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// TODO - 行动算子
// 触发作业执行的方法
// 底层调用的是环境对象的runJob方法 val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
// 底层代码创建ActiveJob,并提交执行
// collect:方法会将不同分区的数据按照分区顺序采集到Driver端内存,形成数组
val ints: Array[Int] = rdd.collect()
println(ints.mkString(","))
sc.stop()
}
}
(2)reduce
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// TODO - 行动算子
val i: Int = rdd.reduce(_ + _)
println(i) // 10
sc.stop()
}
}
(3)count
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// TODO - 行动算子
// count:数据源中数据的个数
val l = rdd.count()
println(l)
sc.stop()
}
}
(4)first
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// TODO - 行动算子
// first:获取数据源中数据的第一个
val first = rdd.first()
println(first)
sc.stop()
}
}
(5)take
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// TODO - 行动算子
// take:获取N个数据
val ints: Array[Int] = rdd.take(3)
println(ints.mkString(","))
sc.stop()
}
}
(6)takeOrdered
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(4, 2, 3, 1))
// TODO - 行动算子
// takeOrdered:数据排序后,取N个数据
val ints: Array[Int] = rdd.takeOrdered(3)
println(ints.mkString(",")) // 1,2,3
sc.stop()
}
}
(7)aggregate
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
// TODO - 行动算子
// aggregateByKey:初始值只会参与分区内计算
// aggregate:初始值会参与分区内计算,并且参与分区间计算
val result = rdd.aggregate(10)(_ + _, _ + _)
println(result) // 40 = 10 + (1 + 2 + 10) + (3 + 4 + 10)
sc.stop()
}
}
(8)fold
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
// TODO - 行动算子
// aggregate简化版操作
val result = rdd.fold(10)(_ + _)
println(result) // 40
sc.stop()
}
}
(9)countByKey
object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
// TODO - 行动算子
val intToLong: collection.Map[Int, Long] = rdd.countByValue()
println(intToLong)
sc.stop()
}
}

object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 1, 1, 4), 2)
// TODO - 行动算子
val intToLong: collection.Map[Int, Long] = rdd.countByValue()
println(intToLong)
sc.stop()
}
}

object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3)
))
// TODO - 行动算子
val stringToLong: collection.Map[String, Long] = rdd.countByKey()
println(stringToLong) // Map(a -> 3)
sc.stop()
}
}

(10)save相关
object Spark05_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("a", 3)
))
// TODO - 行动算子
// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件 必须是k-v类型
rdd.saveAsSequenceFile("output2")
sc.stop()
}
}
(11)foreach
object Spark06_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
// foreach是Driver端内存集合的循环遍历方法
rdd.collect().foreach(println)
println("***********")
// foreach是Executor端内存数据打印
rdd.foreach(println)
// 算子:Operator(操作)区分不同的处理效果,将RDD的方法称为算子
// RDD的方法和Scala集合对象的方法不一样
// 集合对象的方法都是在同一个节点的内存中完成的
// RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
// RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor
sc.stop()
}
}
1.5.3.3 WordCount
object Spark03_WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparkConf)
wordcount9(sc)
sc.stop()
}
// groupBy
def wordcount1(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
}
// groupByKey
def wordcount2(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()
val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
}
// reduceByKey
def wordcount3(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_ + _)
}
// aggregateByKey
def wordcount4(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_ + _, _ + _)
}
// foldByKey
def wordcount5(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_ + _)
}
// combineByKey
def wordcount6(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCount: RDD[(String, Int)] = wordOne.combineByKey(
v => v,
(x: Int, y) => x + y,
(x: Int, y: Int) => x + y
)
}
// countByKey
def wordcount7(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val stringToLong: collection.Map[String, Long] = wordOne.countByKey()
}
// countByValue
def wordcount8(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val stringToLong: collection.Map[String, Long] = words.countByValue()
}
// reduce,aggregate,fold
def wordcount9(sc: SparkContext): Unit = {
val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words = rdd.flatMap(_.split(" "))
val mapWord = words.map(
word => {
mutable.Map[String, Long]((word, 1))
}
)
val wordCount = mapWord.reduce(
(map1, map2) => {
map2.foreach{
case (word, count) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
}
map1
}
)
}
}
1.5.4 RDD序列化
object Spark07_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val user = new User()
// RDD算子中传递的函数是会包含闭包操作,就会进行检测=>闭包检测
rdd.foreach(
num => {
println("age = " + (user.age + num))
}
)
sc.stop()
}
// 样例类在编译时,会自动混入序列化特质(实现可序列化接口)
case class User(){
// 或class User extends Serializable {
var age: Int = 30
}
}
- 闭包检查
从计算的角度,算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。
object Spark01_RDD_Serial {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive"))
val search = new Search("h")
// search.getMatch1(rdd).collect().foreach(println)
search.getMatch2(rdd).collect().foreach(println)
sc.stop()
}
// 查询对象
// Scala类的构造参数其实是类的属性,构造参数需要进行闭包检测,等同于类进行闭包检测
class Search(query: String) extends Serializable {
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 函数序列化案例
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// 属性序列化案例
def getMatch2(rdd: RDD[String]): RDD[String] = {
// extends Serializable或者样例类
// 或者
val s = query // 这个在Driver执行
rdd.filter(x => x.contains(s)) // 这个是算子在Executor执行
}
}
}
- Kryo 序列化框架
Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。
注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。
object serializable_Kryo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Searcher]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu",
"atguigu", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
} }
case class Searcher(val query: String) {
def isMatch(s: String) = {
s.contains(query)
}
def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch)
}
def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}
}
1.5.5 RDD依赖关系
- RDD血缘关系
RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。


object Spark01_RDD_Dep {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("WC")
val sc = new SparkContext(sparkConf)
val lines: RDD[String] = sc.textFile("datas/word.txt")
println(lines.toDebugString)
val words: RDD[String] = lines.flatMap(_.split(" "))
println(words.toDebugString)
val wordToOne = words.map(word => (word, 1))
println(wordToOne.toDebugString)
val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
println(wordToSum.toDebugString)
val array: Array[(String, Int)] = wordToSum.collect()
array.foreach(println)
sc.stop()
}
}

- RDD依赖关系
两个相邻 RDD 之间的关系
object Spark02_RDD_Dep {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("WC")
val sc = new SparkContext(sparkConf)
val lines: RDD[String] = sc.textFile("datas/word.txt")
println(lines.dependencies)
val words: RDD[String] = lines.flatMap(_.split(" "))
println(words.dependencies)
val wordToOne = words.map(word => (word, 1))
println(wordToOne.dependencies)
val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
println(wordToSum.dependencies)
val array: Array[(String, Int)] = wordToSum.collect()
array.foreach(println)
sc.stop()
}
}

- RDD窄依赖
窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用
- RDD宽依赖
宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle。
- RDD阶段划分
shuffle(要打乱重新组合),所以要等待,划分成了不同的阶段。
源码:




- RDD任务划分
RDD 任务切分中间分为:Application、Job、Stage 和 Task
- Application:初始化一个 SparkContext 即生成一个 Application
- Job:一个 Action 算子就会生成一个 Job
- Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1
- Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系
任务的数量 = 当前阶段中最后一个RDD的分区数量
1.5.6 RDD持久化
RDD不存储数据,如果一个RDD需要重复使用,需要从头再次执行 RDD对象可以重用,但数据无法重用
- RDD Cache缓存
RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该RDD 将会被缓存在计算节点的内存中,并供后面重用。
RDD对象的持久化操作不一定为了重用,在数据执行较长或比较重要也会使用持久化操作。
object Spark02_RDD_Persist {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val list = List("Hello Scala", "Hello Spark")
val rdd = sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map((_, 1))
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("********")
// cache默认持久化的操作将数据保存到内存中,如果要保存到磁盘,更改存储级别
// mapRDD.cache()
mapRDD.persist(StorageLevel.DISK_ONLY)
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
}

- RDD CheckPoint检查点
通过将 RDD 中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
object Spark02_RDD_Persist {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
sc.setCheckpointDir("./cp")
val list = List("Hello Scala", "Hello Spark")
val rdd = sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map((_, 1))
// checkpoint需要落盘,需要指定检查点保存路径
// 检查点路径保存的文件,当作业执行完毕后,不会被删除
// 一般保存路径都是在分布式存储系统中
mapRDD.checkpoint()
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
reduceRDD.collect().foreach(println)
println("********")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
sc.stop()
}
}
- 区别
- cache:将数据临时存储在内存中进行数据重用;会在血缘关系中添加新的依赖,一旦出现问题,可以重头读取数据
- persist:将数据临时存储在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但数据安全;如果作业执行完毕,临时保存的数据文件就会丢失
checkpoint:将数据长久地保存在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但数据安全;为了保证数据安全,一般情况下会独立执行作业;为了提高效率,一般和cache联合使用;执行过程中,会切断血缘关系,重新建立新的血缘关系,等同于改变数据源
mapRDD.cache() mapRDD.checkpoint()1.5.7 RDD分区器
自定义分区器:
object Spark01_RDD_Part { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("WC") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List( ("nba", "xxxxxxxx"), ("cba", "xxxxxxxx"), ("wnba", "xxxxxxxx"), ("nba", "xxxxxxxx") )) val partRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner) partRDD.saveAsTextFile("output") sc.stop() } /** * 自定义分区器 * 1. 继承Partitioner * 2. 重写方法 */ class MyPartitioner extends Partitioner { // 分区数量 override def numPartitions: Int = 3 // 根据数据的key值返回数据所在的分区索引,从0开始 override def getPartition(key: Any): Int = { key match { case "nba" => 0 case "wnba" => 1 case _ => 2 } } } }1.5.8 RDD文件读取与保存
保存
object Spark01_RDD_IO_Save { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("WC") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD( List( ("a", 1), ("b", 2), ("c", 3) ) ) rdd.saveAsTextFile("output1") rdd.saveAsObjectFile("output2") rdd.saveAsSequenceFile("output3") sc.stop() } }读取
object Spark02_RDD_IO_Load { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("WC") val sc = new SparkContext(sparkConf) val rdd = sc.textFile("output1") println(rdd.collect().mkString(",")) val rdd1 = sc.objectFile[(String, Int)]("output2") println(rdd1.collect().mkString(",")) val rdd2 = sc.sequenceFile[String, Int]("output3") println(rdd2.collect().mkString(",")) sc.stop() } }2 累加器
object Spark01_Acc { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(1, 2, 3, 4)) // reduce:分区内计算,分区间计算 // val i: Int = rdd.reduce(_ + _) // println(i) var sum = 0 rdd.foreach( num => { sum += num } ) println("sum = " + sum) // 输出0,在Executor中改变了sum,但是没有返回Driver sc.stop() } }2.1 实现原理
累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。
2.2 基础编程
2.2.1 系统累加器
object Spark02_Acc { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(1, 2, 3, 4)) // 获取系统累加器 // Spark默认提供了简单数据聚合的累加器 val sumAcc = sc.longAccumulator("sum") // 参数是给它起的名 // sc.doubleAccumulator // sc.collectionAccumulator // 集合类型,默认是List rdd.foreach( num => { // 使用累加器 sumAcc.add(num) } ) // 获取累加器的值 println(sumAcc.value) sc.stop() } }
object Spark03_Acc { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(1, 2, 3, 4)) // 获取系统累加器 // Spark默认提供了简单数据聚合的累加器 val sumAcc = sc.longAccumulator("sum") // 参数是给它起的名 // sc.doubleAccumulator // sc.collectionAccumulator // 集合类型,默认是List val mapRDD = rdd.map( num => { // 使用累加器 sumAcc.add(num) num } ) // 获取累加器的值 // 少加:转换算子中调用累加器,如果没有行动算子,不会执行 println(sumAcc.value) // 输出0 // 多加:转换算子中调用累加器,多次调用 // 所以累加器一般放在行动算子中 mapRDD.collect() mapRDD.collect() println(sumAcc.value) // 20 sc.stop() } }2.2.2 自定义累加器
object Spark04_Acc_WordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List("hello", "spark", "hello")) // 累加器: WordCount // 创建累加器对象 val wcAcc = new MyAccumulator // 向Spark进行注册 sc.register(wcAcc, "wordCountAcc") rdd.foreach( word => { // 数据的累加 wcAcc.add(word) } ) // 获取累加器累加的结果 println(wcAcc.value) sc.stop() } /* 自定义数据累加器:WordCount 1. 继承AccumulatorV2,定义泛型 IN:累加器输入的数据类型 OUT:累加器返回的数据类型 2. 重写方法 */ class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] { private var wcMap = mutable.Map[String, Long]() // 判断是否为初始状态 override def isZero: Boolean = { wcMap.isEmpty } override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = { new MyAccumulator() } override def reset(): Unit = { wcMap.clear() } // 获取累加器需要计算的值 override def add(word: String): Unit = { val newCnt = wcMap.getOrElse(word, 0L) + 1 wcMap.update(word, newCnt) } // Driver合并多个累加器 override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = { val map1 = this.wcMap val map2 = other.value map2.foreach{ case (word, count) => {} val newCnt = map1.getOrElse(word, 0L) + count map1.update(word, newCnt) } } // 累加器结果 override def value: mutable.Map[String, Long] = { wcMap } } }3 广播变量
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。

object Spark06_Bc { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("Acc") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(1, 2, 3, 4)) val rdd1 = sc.makeRDD(List( ("a", 1), ("b", 2), ("c", 3) )) val map = mutable.Map(("a", 4), ("b", 5), ("c", 6)) // 封装广播变量 val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map) rdd1.map { case (w, c) => { // 访问广播变量 val l = bc.value.getOrElse(w, 0L) (w, (c, l)) } }.collect().foreach(println) sc.stop() } }4 案例
电商网站的用户行为数据,主要包含用户的 4 种行为:搜索,点击,下单,支付。
数据文件中每行数据采用下划线分隔数据
- 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种
- 如果搜索关键字为 null,表示数据不是搜索数据
- 如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据
- 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示
- 支付行为和下单行为类似
字段:
//用户访问动作表
case class UserVisitAction(
date: String, //用户点击行为的日期
user_id: Long, //用户的 ID
session_id: String, //Session 的 ID
page_id: Long, //某个页面的 ID
action_time: String, //动作的时间点
search_keyword: String, //用户搜索的关键词
click_category_id: Long, //某一个商品品类的 ID
click_product_id: Long, //某一个商品的 ID
order_category_ids: String, //一次订单中所有品类的 ID 集合
order_product_ids: String, //一次订单中所有商品的 ID 集合
pay_category_ids: String, //一次支付中所有品类的 ID 集合
pay_product_ids: String, //一次支付中所有商品的 ID 集合
city_id: Long //城市 id
)
4.1 需求1
先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
4.1.1 实现一
object Spark01_Req1_HotCatagoryTop10Analysis {
def main(args: Array[String]): Unit = {
// TODO TOP10热门品类
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
val sc = new SparkContext(sparkConf)
// 1. 读取原始日志数据
val actionRDD = sc.textFile("datas/user_visit_action.txt")
// 2. 统计品类的点击数量:(品类ID,点击数量)
val clickActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(6) != "-1"
}
)
val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
action => {
val datas = action.split("_")
(datas(6), 1)
}
).reduceByKey(_ + _)
// 3. 统计品类的下单数量:(品类ID,下单数量)
val orderActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(8) != "null"
}
)
// 扁平化操作
val OrderCountRDD = orderActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(8)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
// 4. 统计品类的支付数量:(品类ID,支付数量)
val payActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(10) != "null"
}
)
val payCountRDD = payActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(10)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
// 5. 将品类排序,并取前10名
// 点击数量排序,下单数量排序,支付数量排序
// 元组排序:先比较第一个,再比较第二个,再比较第三个,以此类推
// (品类ID,(点击数量,下单数量,支付数量))
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRDD.cogroup(OrderCountRDD, payCountRDD)
val analysisRDD = cogroupRDD.mapValues{
case (clickIter, orderIter, payIter) => {
var clickCnt = 0
val iter1 = clickIter.iterator // iter里面基本只有一个值,存数量,next表示取里面那个值
if (iter1.hasNext) {
clickCnt = iter1.next()
}
var orderCnt = 0
val iter2 = orderIter.iterator
if (iter2.hasNext) {
orderCnt = iter2.next()
}
var payCnt = 0
val iter3 = payIter.iterator
if (iter3.hasNext) {
payCnt = iter3.next()
}
(clickCnt, orderCnt, payCnt)
}
}
val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
// 6. 将结果采集到控制台打印
resultRDD.foreach(println)
sc.stop()
}
}
4.1.2 实现二
object Spark02_Req1_HotCatagoryTop10Analysis {
def main(args: Array[String]): Unit = {
// TODO TOP10热门品类
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
val sc = new SparkContext(sparkConf)
// 1. 读取原始日志数据
val actionRDD = sc.textFile("datas/user_visit_action.txt")
actionRDD.cache()
// 2. 统计品类的点击数量:(品类ID,点击数量)
val clickActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(6) != "-1"
}
)
val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
action => {
val datas = action.split("_")
(datas(6), 1)
}
).reduceByKey(_ + _)
// 3. 统计品类的下单数量:(品类ID,下单数量)
val orderActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(8) != "null"
}
)
// 扁平化操作
val orderCountRDD = orderActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(8)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
// 4. 统计品类的支付数量:(品类ID,支付数量)
val payActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(10) != "null"
}
)
val payCountRDD = payActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(10)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
// (品类ID,点击数量) => (品类ID,(点击数量,0,0))
// (品类ID,下单数量) => (品类ID,(0,下单数量,0))
// => (品类ID,(点击数量,下单数量,0)
// (品类ID,支付数量) => (品类ID,(0,0,支付数量))
// => (品类ID,(点击数量,下单数量,支付数量))
// 5. 将品类排序,并取前10名
// 点击数量排序,下单数量排序,支付数量排序
// 元组排序:先比较第一个,再比较第二个,再比较第三个,以此类推
// (品类ID,(点击数量,下单数量,支付数量))
val rdd1 = clickCountRDD.map {
case (cid, cnt) => {
(cid, (cnt, 0, 0))
}
}
val rdd2 = orderCountRDD.map {
case (cid, cnt) => {
(cid, (0, cnt, 0))
}
}
val rdd3 = payCountRDD.map {
case (cid, cnt) => {
(cid, (0, 0, cnt))
}
}
// 将三个数据源合并在一起,同一进行聚合计算
val sourceRDD: RDD[(String, (Int, Int, Int))] = rdd1.union(rdd2).union(rdd3)
val analysisRDD = sourceRDD.reduceByKey(
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
}
)
val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
// 6. 将结果采集到控制台打印
resultRDD.foreach(println)
sc.stop()
}
}
4.1.3 实现三
object Spark03_Req1_HotCatagoryTop10Analysis {
def main(args: Array[String]): Unit = {
// TODO TOP10热门品类
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
val sc = new SparkContext(sparkConf)
// 1. 读取原始日志数据
val actionRDD = sc.textFile("datas/user_visit_action.txt")
// reduceByKey 聚合算子,Spark会提供优化,缓存
// 但这里是不同数据源的reduceByKey
// 2. 将数据转换结构
// 点击:(品类ID,(1,0,0))
// 下单:(品类ID,(0,1,0))
// 支付:(品类ID,(0,0,1))
val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
action => {
val datas = action.split("_")
if (datas(6) != "-1") {
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
val ids = datas(8).split(",")
ids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
// 支付的场合
val ids = datas(10).split(",")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
}
)
// 3. 将相同的品类ID的数据进行分组聚合
// (品类ID,(点击数量,下单数量,支付数量))
val analysisRDD = flatRDD.reduceByKey(
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
}
)
// 4. 将统计结果根据数量进行降序处理,取前10名
val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
// 5. 将结果采集到控制台打印
resultRDD.foreach(println)
sc.stop()
}
}
4.1.4 实现四
object Spark04_Req1_HotCatagoryTop10Analysis {
def main(args: Array[String]): Unit = {
// TODO TOP10热门品类
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
val sc = new SparkContext(sparkConf)
// 1. 读取原始日志数据
val actionRDD = sc.textFile("datas/user_visit_action.txt")
val acc = new HotCategoryAccumulator
sc.register(acc, "hotCategory")
// 2. 将数据转换结构
// 点击:(品类ID,(1,0,0))
// 下单:(品类ID,(0,1,0))
// 支付:(品类ID,(0,0,1))
actionRDD.foreach(
action => {
val datas = action.split("_")
if (datas(6) != "-1") {
acc.add((datas(6), "click"))
} else if (datas(8) != "null") {
val ids = datas(8).split(",")
ids.foreach(
id => {
acc.add((id, "order"))
}
)
} else if (datas(10) != null) {
val ids = datas(10).split(",")
ids.foreach(
id => {
acc.add((id, "pay"))
}
)
}
}
)
val accVal: mutable.Map[String, HotCategory] = acc.value
val categories: mutable.Iterable[HotCategory] = accVal.map(_._2)
val sort = categories.toList.sortWith(
(left, right) => {
if (left.clickCnt > right.clickCnt) {
true
} else if (left.clickCnt == right.clickCnt) {
if (left.orderCnt > right.orderCnt) {
true
} else if (left.orderCnt == right.orderCnt) {
left.payCnt > right.payCnt
} else {
false
}
} else {
false
}
}
)
// 5. 将结果采集到控制台打印
sort.take(10).foreach(println)
sc.stop()
}
case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)
/**
* 自定义累加器
* 1. 继承AccumulatorV2,定义泛型
* IN: (品类ID,行为类型)
* OUT: mutable.Map[String, HotCategory]
* 2. 重写方法
*/
class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {
private val hcMap = mutable.Map[String, HotCategory]()
override def isZero: Boolean = {
hcMap.isEmpty
}
override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {
new HotCategoryAccumulator()
}
override def reset(): Unit = {
hcMap.clear()
}
override def add(v: (String, String)): Unit = {
val cid = v._1
val actionType = v._2
val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))
if (actionType == "click") {
category.clickCnt += 1
} else if (actionType == "order") {
category.orderCnt += 1
} else if (actionType == "pay") {
category.payCnt += 1
}
hcMap.update(cid, category)
}
override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
val map1 = this.hcMap
val map2 = other.value
map2.foreach {
case (cid, hc) => {
val category = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))
category.clickCnt += hc.clickCnt
category.orderCnt += hc.orderCnt
category.payCnt += hc.payCnt
map1.update(cid, category)
}
}
}
override def value: mutable.Map[String, HotCategory] = hcMap
}
}
4.2 需求二
Top10热门品类中每个品类的Top10活跃Session统计,在需求一增加每个品类用户session的点击统计
object Spark05_Req2_HotCatagoryTop10SessionAnalysis {
def main(args: Array[String]): Unit = {
// TODO TOP10热门品类
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
val sc = new SparkContext(sparkConf)
val actionRDD = sc.textFile("datas/user_visit_action.txt")
actionRDD.cache()
val top10Ids: Array[String] = top10Category(actionRDD)
// 1. 过滤原始数据,保留点击和前10品类ID
val filterActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
if (datas(6) != "-1") {
top10Ids.contains(datas(6))
} else {
false
}
}
)
// 2. 根据品类ID和sessionid进行点击量的统计
val reduceRDD: RDD[((String, String), Int)] = filterActionRDD.map(
action => {
val datas = action.split("_")
((datas(6), datas(2)), 1)
}
).reduceByKey(_ + _)
// 3. 将统计的结果进行结构的转换
// ((品类ID,sessionId),sum) => (品类ID,(sessionId,sum))
val mapRDD = reduceRDD.map {
case ((cid, sid), sum) => {
(cid, (sid, sum))
}
}
// 4. 相同品类进行分组
val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()
// 5. 将分组后的数据进行点击量的排序,取前10名
val resultRDD = groupRDD.mapValues(
iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
}
)
resultRDD.collect().foreach(println)
sc.stop()
}
def top10Category(actionRDD: RDD[String]) = {
val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
action => {
val datas = action.split("_")
if (datas(6) != "-1") {
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
val ids = datas(8).split(",")
ids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
// 支付的场合
val ids = datas(10).split(",")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
}
)
val analysisRDD = flatRDD.reduceByKey(
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
}
)
analysisRDD.sortBy(_._2, false).take(10).map(_._1)
}
}
4.3 需求三
页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,单跳转化率就是要统计页面点击的概率。
比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV) 为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B, B/A 就是 3-5 的页面单跳转化率。
object Spark06_Req3_PageFlowAnalysis {
def main(args: Array[String]): Unit = {
// TODO TOP10热门品类
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
val sc = new SparkContext(sparkConf)
val actionRDD = sc.textFile("datas/user_visit_action.txt")
val actionDataRDD = actionRDD.map(
action => {
val datum = action.split("_")
UserVisitAction(
datum(0),
datum(1).toLong,
datum(2),
datum(3).toLong,
datum(4),
datum(5),
datum(6).toLong,
datum(7).toLong,
datum(8),
datum(9),
datum(10),
datum(11),
datum(12).toLong
)
}
)
actionDataRDD.cache()
// TODO 计算分母
val pageIdToCount: Map[Long, Long] = actionDataRDD.map(
action => {
(action.page_id, 1L)
}
).reduceByKey(_ + _).collect().toMap
// TODO 计算分子
// 根据session进行分组
val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)
// 分组后,根据访问时间进行排序(升序)
val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
iter => {
val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
val flowIds: List[Long] = sortList.map(_.page_id)
// [1,2,3,4]
// [1,2],[2,3],[3,4]
// [1,2,3,4]和[2,3,4] zip拉链
val pageflowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)
pageflowIds.map(
t => {
(t, 1)
}
)
}
)
val flatRDD: RDD[((Long, Long), Int)] = mvRDD.map(_._2).flatMap(list => list)
val dataRDD = flatRDD.reduceByKey(_ + _)
// TODO 计算单跳转换率
// 分子除以分母
dataRDD.foreach {
case ((pageid1, pageid2), sum) => {
val lon = pageIdToCount.getOrElse(pageid1, 0L)
println(s"页面${pageid1}跳转到页面${pageid2}单跳转换率为:" + (sum.toDouble / lon))
}
}
sc.stop()
}
}
//用户访问动作表
case class UserVisitAction(
date: String, //用户点击行为的日期
user_id: Long, //用户的 ID
session_id: String, //Session 的 ID
page_id: Long, //某个页面的 ID
action_time: String, //动作的时间点
search_keyword: String, //用户搜索的关键词
click_category_id: Long, //某一个商品品类的 ID
click_product_id: Long, //某一个商品的 ID
order_category_ids: String, //一次订单中所有品类的 ID 集合
order_product_ids: String, //一次订单中所有商品的 ID 集合
pay_category_ids: String, //一次支付中所有品类的 ID 集合
pay_product_ids: String, //一次支付中所有商品的 ID 集合
city_id: Long //城市 id
)

对指定的页面连续跳转进行统计 ```scala object Spark06_Req3_PageFlowAnalysis { def main(args: Array[String]): Unit = { // TODO TOP10热门品类 val sparkConf = new SparkConf().setMaster(“local[]”).setAppName(“HotCatagoryTop10Analysis”) // 表示当前最大可用核数 val sc = new SparkContext(sparkConf)
val actionRDD = sc.textFile(“datas/user_visit_action.txt”)
val actionDataRDD = actionRDD.map(
action => { val datum = action.split("_") UserVisitAction( datum(0), datum(1).toLong, datum(2), datum(3).toLong, datum(4), datum(5), datum(6).toLong, datum(7).toLong, datum(8), datum(9), datum(10), datum(11), datum(12).toLong ) }) actionDataRDD.cache()
// TODO 对指定的页面连续跳转进行统计 val ids = ListLong val okflowIDs: List[(Long, Long)] = ids.zip(ids.tail)
// TODO 计算分母 val pageidToCountMap: Map[Long, Long] = actionDataRDD.filter(
action => { ids.init.contains(action.page_id) }).map(
action => { (action.page_id, 1L) }).reduceByKey( + ).collect().toMap
// TODO 计算分子
// 根据session进行分组 val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)
// 分组后,根据访问时间进行排序(升序) val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
iter => { val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time) val flowIds: List[Long] = sortList.map(_.page_id) // [1,2,3,4] // [1,2],[2,3],[3,4] // [1,2,3,4]和[2,3,4] zip拉链 val pageflowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail) // 将不合法的页面跳转进行过滤 pageflowIds.filter( t => { okflowIDs.contains(t) } ).map( t => { (t, 1) } ) }) val flatRDD: RDD[((Long, Long), Int)] = mvRDD.map(._2).flatMap(list => list) val dataRDD = flatRDD.reduceByKey( + _)
// TODO 计算单跳转换率 // 分子除以分母 dataRDD.foreach {
case ((pageid1, pageid2), sum) => { val lon = pageidToCountMap.getOrElse(pageid1, 0L) println(s"页面${pageid1}跳转到页面${pageid2}单跳转换率为:" + (sum.toDouble / lon)) }}
sc.stop() } }
//用户访问动作表 case class UserVisitAction( date: String, //用户点击行为的日期 user_id: Long, //用户的 ID session_id: String, //Session 的 ID page_id: Long, //某个页面的 ID action_time: String, //动作的时间点 search_keyword: String, //用户搜索的关键词 click_category_id: Long, //某一个商品品类的 ID click_product_id: Long, //某一个商品的 ID order_category_ids: String, //一次订单中所有品类的 ID 集合 order_product_ids: String, //一次订单中所有商品的 ID 集合 pay_category_ids: String, //一次支付中所有品类的 ID 集合 pay_product_ids: String, //一次支付中所有商品的 ID 集合 city_id: Long //城市 id )

<a name="fTH3o"></a>
# 5 架构模式
<a name="T3Zmq"></a>
## 5.1 三层架构
controller(控制层),service(服务层),dao(持久层)<br />
<a name="xQ95D"></a>
## 5.2 实现

```scala
package com.example.bigdata.spark.core.framework.application
import com.example.bigdata.spark.core.framework.common.TApplication
import com.example.bigdata.spark.core.framework.controller.WordCountController
object WordCountApplication extends App with TApplication {
// 启动应用程序
start() {
val controller = new WordCountController()
controller.dispatch()
}
}
package com.example.bigdata.spark.core.framework.common
import com.example.bigdata.spark.core.framework.util.EnvUtil
import org.apache.spark.{SparkConf, SparkContext}
trait TApplication {
def start(master: String = "local[*]", app: String = "Application")(op: => Unit): Unit = {
val sparkConf = new SparkConf().setMaster(master).setAppName(app)
val sc = new SparkContext(sparkConf)
EnvUtil.put(sc)
try {
op
} catch {
case ex => println(ex.getMessage)
}
sc.stop()
EnvUtil.clear()
}
}
package com.example.bigdata.spark.core.framework.common
trait TController {
def dispatch(): Unit
}
package com.example.bigdata.spark.core.framework.common
import com.example.bigdata.spark.core.framework.util.EnvUtil
import org.apache.spark.rdd.RDD
trait TDao {
def readFile(path: String): RDD[String] = {
EnvUtil.take().textFile(path)
}
}
package com.example.bigdata.spark.core.framework.common
trait TService {
def dataAnalysis(): Any
}
package com.example.bigdata.spark.core.framework.controller
import com.example.bigdata.spark.core.framework.common.TController
import com.example.bigdata.spark.core.framework.service.WordCountService
/**
* 控制层
*/
class WordCountController extends TController{
private val wordCountService = new WordCountService()
// 调度
def dispatch(): Unit = {
// TODO 执行业务操作
val array = wordCountService.dataAnalysis()
array.foreach(println)
}
}
package com.example.bigdata.spark.core.framework.dao
import com.example.bigdata.spark.core.framework.common.TDao
/**
* 持久层
*/
class WordCountDao extends TDao {
}
package com.example.bigdata.spark.core.framework.service
import com.example.bigdata.spark.core.framework.common.TService
import com.example.bigdata.spark.core.framework.dao.WordCountDao
import org.apache.spark.rdd.RDD
/**
* 服务层
*/
class WordCountService extends TService{
private val wordCountDao = new WordCountDao()
// 数据分析
def dataAnalysis(): Array[(String, Int)] = {
val lines = wordCountDao.readFile("datas/word.txt")
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne = words.map(word => (word, 1))
val wordToSum = wordToOne.reduceByKey(_ + _)
val array: Array[(String, Int)] = wordToSum.collect()
array
}
}
package com.example.bigdata.spark.core.framework.util
import org.apache.spark.SparkContext
object EnvUtil {
private val scLocal = new ThreadLocal[SparkContext]()
def put(sc: SparkContext): Unit = {
scLocal.set(sc)
}
def take(): SparkContext = {
scLocal.get()
}
def clear(): Unit = {
scLocal.remove()
}
}
