学习链接:https://www.bilibili.com/video/BV11A411L7CK?p=22&spm_id_from=pageDriver&vd_source=b9e4f35102d61e6d02e0a5e1bbfea480


  1. Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
  • RDD:弹性分布式数据集
  • 累加器:分布式共享只写变量
  • 广播变量:分布式共享只读变量

    1 RDD

    1.1 什么是RDD

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
    QQ截图20220613161208.png

    1.2 特点

  • 弹性

    • 存储的弹性:内存与磁盘的自动切换
    • 容错的弹性:数据丢失可以自动恢复
    • 计算的弹性:计算出错重试机制
    • 分片的弹性:可根据需要重新分片
  • 分布式:数据存储在大数据集群不同节点上
  • 数据集:RDD 封装了计算逻辑,并不保存数据
  • 数据抽象:RDD 是一个抽象类,需要子类具体实现
  • 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
  • 可分区、并行计算

    类比Java IO和RDD

    • IO流,体现装饰者模式

    QQ截图20220613160322.png

    • RDD

    QQ截图20220613160354.png

1.3 核心属性

QQ截图20220613162055.png

  1. 分区列表

RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
QQ截图20220613162445.png

  1. 分区计算函数

Spark在计算时,是使用分区函数对每一个分区进行计算
QQ截图20220613162734.png

  1. RDD之间的依赖关系

RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个RDD建立依赖关系
QQ截图20220613163103.png

  1. 分区器(可选)

当数据为KV类型数据时,可以通过设定分区器自定义数据的分区
QQ截图20220613163311.png

  1. 首选位置(可选)

计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
QQ截图20220613163844.png
QQ截图20220613163908.png

1.4 执行原理

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。
RDD 是 Spark 框架中用于数据处理的核心模型,主要用于将逻辑进行封装,并生成Task发送给Executor节点执行计算。在 Yarn 环境中,RDD的工作原理:

  1. 启动Yarn集群环境

图片1.png

  1. Spark通过申请资源创建调度节点和计算节点

图片2.png

  1. Spark框架根据需求将计算逻辑分区划分成不同的任务

图片3.png

  1. 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

图片4.png

1.5 基础编程

1.5.1 RDD 并行度与分区

默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量称之为并行度。这个数量可以在构建 RDD 时指定。这里的并行执行的任务数量,并不是指的切分任务的数量。

  1. object Spark01_RDD_Memory_Par {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 准备环境
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数
  5. // 也可以用
  6. sparkConf.set("spark.default.parallelism","5")
  7. val sc = new SparkContext(sparkConf)
  8. // TODO 创建RDD
  9. // RDD的并行度 & 分区
  10. // makeRDD方法可以传递第二个参数,表示分区的数量
  11. // 第二个参数可以不传递,默认值defaultParallelism(默认并行度)
  12. // scheduler.conf.getInt("spark.default.parallelism", totalCores)
  13. // spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
  14. // 如果获取不到,使用totalCores属性,这个属性取值为当前运行环境的最大可用核数
  15. val rdd = sc.makeRDD(
  16. List(1, 2, 3, 4), 2
  17. )
  18. // 将处理的数据保存成分区文件
  19. rdd.saveAsTextFile("output")
  20. // TODO 关闭环境
  21. sc.stop()
  22. }
  23. }

QQ截图20220613210307.png

1.5.2 RDD创建

1)从集合(内存)中创建RDD

从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD

  1. object Spark01_RDD_Memory {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 准备环境
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数
  5. val sc = new SparkContext(sparkConf)
  6. // TODO 创建RDD
  7. // 从内存中创建RDD,将内存中集合的数据作为处理的数据源
  8. val seq = Seq[Int](1, 2, 3, 4)
  9. // parallelize:并行
  10. // val rdd: RDD[Int] = sc.parallelize(seq)
  11. // makeRDD方法在底层实现时就是调用了rdd对象的parallelize方法
  12. /*
  13. /** Distribute a local Scala collection to form an RDD.
  14. *
  15. * This method is identical to `parallelize`.
  16. * @param seq Scala collection to distribute
  17. * @param numSlices number of partitions to divide the collection into
  18. * @return RDD representing distributed collection
  19. */
  20. def makeRDD[T: ClassTag](
  21. seq: Seq[T],
  22. numSlices: Int = defaultParallelism): RDD[T] = withScope {
  23. parallelize(seq, numSlices)
  24. }
  25. */
  26. val rdd: RDD[Int] = sc.makeRDD(seq)
  27. rdd.collect().foreach(println)
  28. // TODO 关闭环境
  29. sc.stop()
  30. }
  31. }

分区数据的分配

  1. object Spark01_RDD_Memory_Par1 {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 准备环境
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数
  5. val sc = new SparkContext(sparkConf)
  6. // TODO 创建RDD
  7. // RDD的并行度 & 分区
  8. // 分3个分区,此时会以[1],[2],[3,4]分
  9. // val rdd = sc.makeRDD(List(1, 2, 3, 4), 3)
  10. // 分3个分区,此时会以[1],[2,3],[4,5]分
  11. val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3)
  12. // 将处理的数据保存成分区文件
  13. rdd.saveAsTextFile("output")
  14. // TODO 关闭环境
  15. sc.stop()
  16. }
  17. }

QQ截图20220613214515.png

2)从外部存储(文件)创建RDD

由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。

  1. object Spark02_RDD_File {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 准备环境
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数
  5. val sc = new SparkContext(sparkConf)
  6. // TODO 创建RDD
  7. // 从文件中创建RDD,将文件中的数据作为处理的数据源
  8. // path路径默认以当前环境的根路径为基准,可以写绝对路径、相对路径
  9. // sc.textFile("C:\\Users\\ace\\Desktop\\study_spark\\datas\\1.txt")
  10. // val rdd: RDD[String] = sc.textFile("datas/1.txt")
  11. // 也可以写目录名称,对所有文件统计
  12. // val rdd = sc.textFile("datas")
  13. // path路径还可以使用通配符
  14. // val rdd = sc.textFile("datas/1*.txt")
  15. // path还可以是分布式存储系统路径:HDFS
  16. val rdd = sc.textFile("hdfs://hadoop102:8020/test.txt")
  17. rdd.collect().foreach(println)
  18. // TODO 关闭环境
  19. sc.stop()
  20. }
  21. }
  1. object Spark02_RDD_File1 {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 准备环境
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数
  5. val sc = new SparkContext(sparkConf)
  6. // TODO 创建RDD
  7. // 从文件中创建RDD,将文件中的数据作为处理的数据源
  8. // textFile:以行为单位读取数据,读取的数据都是字符串
  9. // wholeTextFiles:以文件为单位读取数据
  10. // 读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
  11. val rdd = sc.wholeTextFiles("datas")
  12. rdd.collect().foreach(println)
  13. // TODO 关闭环境
  14. sc.stop()
  15. }
  16. }

QQ截图20220613184023.png
分区的设定

  1. object Spark02_RDD_File_Par {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 准备环境
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数
  5. val sc = new SparkContext(sparkConf)
  6. // TODO 创建RDD
  7. // textFile将文件作为数据处理的数据源,默认也可用设定分区
  8. // minPartitions:最小分区数量
  9. // math.min(defaultParallelism, 2)
  10. // val rdd = sc.textFile("datas/1.txt")
  11. // 如果不想使用默认的分区数,可用通过第二个参数指定分区数
  12. // Spark读取文件,底层使用Hadoop的读取方式
  13. // 分区数量计算方式:
  14. // totalSize = 26
  15. // long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
  16. // goalSize = 26 / 3 = 8...2
  17. // 超过了1.1倍,新的分区,一共4个
  18. val rdd = sc.textFile("datas/1.txt", 3)
  19. rdd.saveAsTextFile("output")
  20. // TODO 关闭环境
  21. sc.stop()
  22. }
  23. }

分区数据的分配
11.txt中,数据为

  1. 1
  2. 2
  3. 3
  1. object Spark02_RDD_File_Par {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 准备环境
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数
  5. val sc = new SparkContext(sparkConf)
  6. // TODO 创建RDD
  7. // 分区数量计算方式:
  8. // totalSize = 7
  9. // long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
  10. // goalSize = 7 / 3 = 2...1
  11. // 超过了1.1倍,新的分区,一共3个
  12. val rdd = sc.textFile("datas/11.txt", 2)
  13. rdd.saveAsTextFile("output")
  14. // TODO 关闭环境
  15. sc.stop()
  16. }
  17. }

QQ截图20220614101250.png

  1. object Spark02_RDD_File_Par1 {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 准备环境
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") // *表示当前最大可用核数
  5. val sc = new SparkContext(sparkConf)
  6. // TODO 创建RDD
  7. // 分区数量计算方式:
  8. // totalSize = 7
  9. // long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
  10. // goalSize = 7 / 3 = 2...1
  11. // 超过了1.1倍,新的分区,一共3个
  12. // TODO 数据分区的分配
  13. // 1. 数据以行为单位进行读取
  14. // spark读取文件,采用的是hadoop的方式读取,一行一行读,和字节数没有关系
  15. // 2. 数据读取时以偏移量为单位,偏移量不会被重复读取
  16. /*
  17. 前两行后面都跟了回车换行,用@占位
  18. 文件内容=>偏移量
  19. 1@@ =>012
  20. 2@@ =>345
  21. 3@@ =>6
  22. */
  23. // 3. 数据分区的偏移量范围的计算
  24. // 分区 => 偏移量 =>分区中数据
  25. // 0 => [0,3] => 12
  26. // 1 => [3,6] => 3
  27. // 2 => [6,7] => 空
  28. // 三个分区
  29. // [12],[3],[]
  30. val rdd = sc.textFile("datas/11.txt", 2)
  31. rdd.saveAsTextFile("output")
  32. // TODO 关闭环境
  33. sc.stop()
  34. }
  35. }
  • 如果数据源为多个文件,计算分区时以文件为单位进行分区

    感觉不太对但是debug半天好像确实是这样,迷惑。。。 QQ截图20220614113207.png

1.5.3 RDD算子

QQ截图20220614120046.png

1.5.3.1 RDD转换算子

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型

1. Value类型

(1)map

  • 函数签名

def mapU: ClassTag: RDD[U]

  • 函数说明

将处理的数据逐条进行映射转换

  1. object Spark01_RDD_Operator_Transform {
  2. def main(args: Array[String]): Unit = {
  3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  4. val sc = new SparkContext(sparkConf)
  5. // TODO 算子-map
  6. val rdd = sc.makeRDD(List(1, 2, 3, 4))
  7. // 1,2,3,4 => 2,4,6,8
  8. // 转换函数
  9. def mapFunction(num: Int): Int = {
  10. num * 2
  11. }
  12. // val mapRDD: RDD[Int] = rdd.map(mapFunction)
  13. // val mapRDD: RDD[Int] = rdd.map((num: Int) => {num * 2})
  14. val mapRDD: RDD[Int] = rdd.map(_ * 2)
  15. mapRDD.collect().foreach(println)
  16. sc.stop()
  17. }
  18. }
  • 例:从服务器日志数据apache.log中获取用户请求URL资源路径

    1. object Spark01_RDD_Operator_Transform_Test {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    4. val sc = new SparkContext(sparkConf)
    5. // TODO 算子-map
    6. val rdd = sc.textFile("datas/apache.log")
    7. // 长的字符串 => 短的字符串
    8. val mapRDD: RDD[String] = rdd.map(
    9. line => {
    10. val datas = line.split(" ")
    11. datas(6)
    12. }
    13. )
    14. mapRDD.collect().foreach(println)
    15. sc.stop()
    16. }
    17. }

    20220614151558.png

  • 例:并行

    1. object Spark01_RDD_Operator_Transform_Par {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    4. val sc = new SparkContext(sparkConf)
    5. // TODO 算子-map
    6. val rdd = sc.makeRDD(List(1, 2, 3, 4), 1)
    7. val mapRDD = rdd.map(
    8. num => {
    9. println(">>>>>>>" + num)
    10. num
    11. }
    12. )
    13. val mapRDD1 = mapRDD.map(
    14. num => {
    15. println("#####" + num)
    16. num
    17. }
    18. )
    19. mapRDD1.collect()
    20. sc.stop()
    21. }
    22. }
    • 分区为1

QQ截图20220614152053.png
rdd的计算一个分区内的数据是一个一个执行,只有只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据
=> 分区内数据的执行是有序的

  • 分区为2,第7行 val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

QQ截图20220614152445.png
不同分区数据计算是无序的
(2)mapPartitions

  • 函数签名

def mapPartitionsU: ClassTag: RDD[U]

  • 函数说明

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据

  1. object Spark02_RDD_Operator_Transform {
  2. def main(args: Array[String]): Unit = {
  3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  4. val sc = new SparkContext(sparkConf)
  5. // TODO 算子-map
  6. val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
  7. // mapPartitions:可以以分区为单位进行数据转换操作
  8. // 会将整个分区的数据加载到内存进行引用
  9. // 处理完的数据不会被释放掉,存在对象的引用
  10. // 在内存较小,数据量较大的场合下,容易出现内存溢出
  11. val mapRDD: RDD[Int] = rdd.mapPartitions(
  12. iter => {
  13. println(">>>>>>>")
  14. iter.map(_ * 2)
  15. }
  16. )
  17. mapRDD.collect().foreach(println)
  18. sc.stop()
  19. }
  20. }

QQ截图20220614155228.png

  • 例:获取每个数据分区的最大值

    1. object Spark02_RDD_Operator_Transform_Test {
    2. def main(args: Array[String]): Unit = {
    3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    4. val sc = new SparkContext(sparkConf)
    5. // TODO 算子-map
    6. val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
    7. // [1,2],[3,4] => [2],[4]
    8. val mapRDD = rdd.mapPartitions(
    9. iter => {
    10. List(iter.max).iterator
    11. }
    12. )
    13. mapRDD.collect().foreach(println)
    14. sc.stop()
    15. }
    16. }

    QQ截图20220614160422.png
    ➢ 数据处理角度
    Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。
    ➢ 功能的角度
    Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据。
    ➢ 性能的角度
    Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。
    (3)mapPartitionsWithIndex

  • 函数签名

def mapPartitionsWithIndexU: ClassTag => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

  • 函数说明

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

  1. // 获取第二个数据分区的数据
  2. object Spark03_RDD_Operator_Transform {
  3. def main(args: Array[String]): Unit = {
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  5. val sc = new SparkContext(sparkConf)
  6. // TODO 算子-map
  7. val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
  8. // [1,2],[3,4]
  9. val mapRDD = rdd.mapPartitionsWithIndex(
  10. (index, iter) => {
  11. if(index == 1) {
  12. iter
  13. } else {
  14. Nil.iterator
  15. }
  16. }
  17. )
  18. mapRDD.collect().foreach(println)
  19. sc.stop()
  20. }
  21. }

QQ截图20220614162058.png

  1. object Spark03_RDD_Operator_Transform1 {
  2. def main(args: Array[String]): Unit = {
  3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
  4. val sc = new SparkContext(sparkConf)
  5. // TODO 算子-map
  6. val rdd = sc.makeRDD(List(1, 2, 3, 4))
  7. val mapRDD = rdd.mapPartitionsWithIndex(
  8. (index, iter) => {
  9. iter.map(
  10. num => {
  11. (index, num)
  12. }
  13. )
  14. }
  15. )
  16. mapRDD.collect().foreach(println)
  17. sc.stop()
  18. }
  19. }

QQ截图20220614163044.png
(4)flatMap

  • 函数签名

def flatMapU: ClassTag: RDD[U]

  • 函数说明

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

object Spark04_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-flatMap
    val rdd: RDD[List[Int]] = sc.makeRDD(List(
      List(1, 2), List(3, 4)
    ))
    val flatRDD: RDD[Int] = rdd.flatMap(
      list => {
        list
      }
    )
    flatRDD.collect().foreach(println)
    sc.stop()
  }
}

QQ截图20220614164944.png

object Spark04_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-flatMap
    val rdd: RDD[String] = sc.makeRDD(List(
      "Hello Scala", "Hello Spark"
    ))
    val flatRDD: RDD[String] = rdd.flatMap(
      s => {
        s.split(" ")
      }
    )
    flatRDD.collect().foreach(println)
    sc.stop()
  }
}

QQ截图20220614165242.png

object Spark04_RDD_Operator_Transform2 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-flatMap
    val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))
    val flatRDD = rdd.flatMap(
      data => {
        data match {
          case list: List[_] => list
          case dat => List(dat)
        }
      }
    )
    flatRDD.collect().foreach(println)
    sc.stop()
  }
}

QQ截图20220614165614.png
(5)glom

  • 函数签名

def glom(): RDD[Array[T]]

  • 函数说明

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

object Spark05_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-glom
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    // List => Int
    // Int => Array
    val glomRDD: RDD[Array[Int]] = rdd.glom()
    glomRDD.collect().foreach(data => println(data.mkString(",")))
    sc.stop()
  }
}

QQ截图20220614170518.png

  • 例:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

    object Spark05_RDD_Operator_Transform_Test {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // TODO 算子-glom
      val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    
      // [1,2],[3,4]
      // [2],[4]=>[6]
      val glomRDD: RDD[Array[Int]] = rdd.glom()
      val maxRDD: RDD[Int] = glomRDD.map(
        array => {
          array.max
        }
      )
      println(maxRDD.collect().sum)
      sc.stop()
    }
    }
    

    QQ截图20220614170951.png
    (6)groupBy

  • 函数签名

def groupByK(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

  • 函数说明

将数据根据指定的规则进行分组,分区默认不变,但数据会被打乱重新组合,称为shuffle
极限情况下,数据可能被分在同一分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

object Spark06_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-groupBy
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    // groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
    // 相同的key值的数据会放置在一个组中
    def groupFunction(num: Int): Int = {
      num % 2
    }

    val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
    groupRDD.collect().foreach(println)
    sc.stop()
  }
}

QQ截图20220614181404.png

  • 例:将 List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组

    object Spark06_RDD_Operator_Transform1 {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // TODO 算子-groupBy
      val rdd = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)
    
      // 分组和分区没有必然的关系
      val groupRDD = rdd.groupBy(_.charAt(0))
      groupRDD.collect().foreach(println)
      sc.stop()
    }
    }
    

    QQ截图20220614182236.png

  • 例:从服务器日志数据 apache.log 中获取每个时间段访问量

    object Spark06_RDD_Operator_Transform_Test {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // TODO 算子-groupBy
      val rdd = sc.textFile("datas/apache.log")
      val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
        line => {
          val datas = line.split(" ")
          val time = datas(3)
          // time.substring()
          val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
          val date: Date = sdf.parse(time)
          val sdf1 = new SimpleDateFormat("HH")
          val hour = sdf1.format(date)
          (hour, 1)
        }
      ).groupBy(_._1)
      timeRDD.map {
        case (hour, iter) => {
          (hour, iter.size)
        }
      }.collect().foreach(println)
      sc.stop()
    }
    }
    

    QQ截图20220614183918.png
    (7)filter

  • 函数签名

def filter(f: T => Boolean): RDD[T]

  • 函数说明

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

object Spark07_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-filter
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
    val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)
    filterRDD.collect().foreach(println)
    sc.stop()
  }
}

QQ截图20220615102352.png

  • 例:从日志数据中获取2015年5月17日的请求路径

    object Spark07_RDD_Operator_Transform_Test {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // TODO 算子-filter
      val rdd = sc.textFile("datas/apache.log")
      rdd.filter(
        line => {
          val datas = line.split(" ")
          val time = datas(3)
          time.startsWith("17/05/2015")
        }
      ).collect().foreach(println)
    
      sc.stop()
    }
    }
    

    QQ截图20220615103547.png
    (8)sample

  • 函数签名

def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]

  • 函数说明

根据指定的规则从数据集中抽取数据

object Spark08_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-sample
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    // sample算子需要传递3个参数
    // 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
    // 2. 第二个参数表示,数据源中每条数据被抽取的概率
    // 3. 第三个参数表示,抽取数据时随机算法的种子
    println(rdd.sample(
      false,
      0.4,
      1
    ).collect().mkString(","))

    sc.stop()
  }
}

QQ截图20220615105258.png

object Spark08_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-sample
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    // sample算子需要传递3个参数
    // 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
    // 2. 第二个参数表示,数据源中每条数据被抽取的概率
    // 3. 第三个参数表示,抽取数据时随机算法的种子
    //                  如果不传递第三个参数,使用的是当前系统时间
    println(rdd.sample(
      false,
      0.4,
      // 1
    ).collect().mkString(","))

    sc.stop()
  }
}

(9)distinct

  • 函数签名

def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

  • 函数说明

将数据集中重复的数据去重

object Spark09_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-filter
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
    // distinct源码
    // map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)

    // x => (x, null):(1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)
    // reduceByKey:(1, null)(1, null) => (null, null)
    // (x, _) => x:(null, null) => null
    // (1, null)
    // map(_._1):1
    val rdd1: RDD[Int] = rdd.distinct()
    rdd1.collect().foreach(println)

    sc.stop()
  }
}

QQ截图20220615183424.png
(10)coalesce

  • 函数签名

def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]

  • 函数说明

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。当 Spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本。

object Spark10_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-filter
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 4)
    val newRDD: RDD[Int] = rdd.coalesce(2)
    newRDD.saveAsTextFile("output")
    sc.stop()
  }
}

QQ截图20220615203641.png

object Spark10_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-filter
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
    val newRDD: RDD[Int] = rdd.coalesce(2)
    newRDD.saveAsTextFile("output")
    sc.stop()
  }
}

QQ截图20220615204843.png
QQ截图20220615204640.png
QQ截图20220615204947.png

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
// coalesce方法默认情况下不会将分区的数据打乱重写组合
// 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
// 如果想要让数据均衡,可以进行shuffle处理
val newRDD: RDD[Int] = rdd.coalesce(2)

QQ截图20220615205148.png
QQ截图20220615205215.png
(11)repartition

  • 函数签名

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

object Spark11_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-filter
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    // coalesce算子可以扩大分区,但是如果不进行shuffle操作,不起作用
    // 如果要扩大分区,要使用shuffle操作
    // 缩减分区:coalesce,如果想要数据均衡,采用shuffle
    // 扩大分区:
    // 1. val newRDD: RDD[Int] = rdd.coalesce(3,true) // [3,5],[1,6],[2,4]
    // 2. repartition 底层:
    /*
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      coalesce(numPartitions, shuffle = true)
    }
     */
    val newRDD: RDD[Int] = rdd.repartition(3)
    newRDD.saveAsTextFile("output")
    sc.stop()
  }
}

(12)sortBy

  • 函数签名

def sortByK => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

  • 函数说明

该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。

object Spark12_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-sortBy
    val rdd = sc.makeRDD(List(6, 4, 3, 2, 5, 1), 2)
    val newRDD: RDD[Int] = rdd.sortBy(num => num)
    newRDD.saveAsTextFile("output") // [1,2,3],[4,5,6]
    sc.stop()
  }
}
object Spark12_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-sortBy
    val rdd = sc.makeRDD(List(("1",1),("11",2),("2",3)), 2)
    val newRDD: RDD[(String, Int)] = rdd.sortBy(t => t._1)
    newRDD.collect().foreach(println)
    sc.stop()
  }
}

QQ截图20220615211747.png

object Spark12_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-sortBy
    val rdd = sc.makeRDD(List(("1",1),("11",2),("2",3)), 2)
    // sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序
    // 第二个参数可以改变排序的方式
    // sortBy默认情况下,不会改变分区,但中间存在shuffle操作
    val newRDD: RDD[(String, Int)] = rdd.sortBy(t => t._1,false)
    newRDD.collect().foreach(println)
    sc.stop()
  }
}

QQ截图20220615212211.png

2. 双Value类型
object Spark13_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-双Value类型
    // 交集,并集和差集要求两个数据源数据类型保持一致
    // 拉链操作两个数据源的类型可以不一致
    val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
    val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
    val rdd7 = sc.makeRDD(List("1", "2", "3", "4"))

    // 交集 [3,4]
    val rdd3: RDD[Int] = rdd1.intersection(rdd2)
    println(rdd3.collect().mkString(","))

    // 并集 [1,2,3,4,3,4,5,6]
    val rdd4: RDD[Int] = rdd1.union(rdd2)
    println(rdd4.collect().mkString(","))

    // 差集 [1,2]
    val rdd5: RDD[Int] = rdd1.subtract(rdd2)
    println(rdd4.collect().mkString(","))

    // 拉链 (1,3),(2,4),(3,5),(4,6)
    val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))
    val rdd8: RDD[(Int, String)] = rdd1.zip(rdd7)
    println(rdd8.collect().mkString(",")) // (1,1),(2,2),(3,3),(4,4)

    sc.stop()
  }
}
object Spark13_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-双Value类型
    val rdd1 = sc.makeRDD(List(1, 2, 3, 4), 2)
    val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 4)

    // 两个数据源要求分区数量要保持一致
    val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2) 
    println(rdd6.collect().mkString(","))

    sc.stop()
  }
}

QQ截图20220615215217.png

object Spark13_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-双Value类型
    val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 2)

    // 两个数据源要求分区中数据数量保持一致
    val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))

    sc.stop()
  }
}

QQ截图20220615215508.png

3. Key-Value类型

(1)partitionBy

  • 函数签名

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

  • 函数说明

将数据按照指定Partitioner重新进行分区,默认分区器是HashPartitioner

object Spark14_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    val mapRDD = rdd.map((_, 1))
    // RDD=>PairRDDFunctions 隐式转换(二次编译)
    /*
    伴生对象中有隐式
    implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
      new PairRDDFunctions(rdd)
    }
     */
    // partitionBy根据指定的分区规则对数据进行重分区
    mapRDD.partitionBy(new HashPartitioner(2))
      .saveAsTextFile("output")

    sc.stop()
  }
}

QQ截图20220616093200.png
(2)reduceByKey

  • 函数签名

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

object Spark15_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("a", 3), ("b", 4)
    ))

    // reduceByKey相同的key的数据进行value数据的聚合操作
    // [1,2,3]
    // reduceByKey中如果key的数据只有一个,不会参与运算 
    val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => {
      println(s"x=${x}, y=${y}")
      x + y
    })
    reduceRDD.collect().foreach(println)
    sc.stop()
  }
}

QQ截图20220616100152.png
(3)groupByKey

  • 函数签名

def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

object Spark16_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("a", 3), ("b", 4)
    ))

    // groupByKey:将数据源中的数据,相同key的数据分在一组中,形成一个对偶元组
    //            元组中的第一个元素就是key
    //            元组中的第二个元素是相同key的value集合
    val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()

    groupRDD.collect().foreach(println)

    val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
    sc.stop()
  }
}
  • reduceByKey和groupByKey的区别

QQ截图20220616102209.png
QQ截图20220616103112.png

  • 从Shuffle的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高
  • 从功能的角度:reduceByKey包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合,只能使用 groupByKey

(4)aggregateByKey

  • 函数签名

def aggregateByKeyU: ClassTag(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]

object Spark17_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("a", 3), ("a", 4)
    ), 2)
    // => (a,[1,2]),(a,[3,4])
    // => (a,2),(a,4)
    // => (a,6)
    // aggregateByKey存在函数柯里化,有两个参数列表
    // 第一个参数列表,需要传递一个参数,表示为初始值,主要用于第一个key的时候,和value进行分区内计算
    // 第二个参数列表
    //      第一个参数表示分区内计算规则
    //      第二个参数表示分区间计算规则

    // math.min(x, y)
    // math.max(x, y)
    rdd.aggregateByKey(0)(
      (x, y) => math.max(x, y),
      (x, y) => x + y
    ).collect.foreach(println)

    sc.stop()
  }
}

QQ截图20220616104904.png

object Spark17_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("b", 3),
      ("b", 4), ("b", 5), ("a", 6)
    ), 2)
    // => (a,[1,2]),(a,[3,4])
    // => (a,2),(a,4)
    // => (a,6)
    // aggregateByKey存在函数柯里化,有两个参数列表
    // 第一个参数列表,需要传递一个参数,表示为初始值,主要用于第一个key的时候,和value进行分区内计算
    // 第二个参数列表
    //      第一个参数表示分区内计算规则
    //      第二个参数表示分区间计算规则

    // math.min(x, y)
    // math.max(x, y)
    rdd.aggregateByKey(5)(
      (x, y) => math.max(x, y),
      (x, y) => x + y
    ).collect.foreach(println)

    sc.stop()
  }
}

QQ截图20220616105929.png
QQ截图20220616110646.png

object Spark17_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("b", 3),
      ("b", 4), ("b", 5), ("a", 6)
    ), 2)
    // aggregateByKey最终的返回数据结果应该和初始值的类型保持一致
    // 获取相同key数据的平均值 => (a,3),(b,4)
    val newRDD: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))(
      (t, v) => {
        (t._1 + v, t._2 + 1)
      },
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
    val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
      case (num, cnt) => {
        num / cnt
      }
    }
    resultRDD.collect().foreach(println)

    sc.stop()
  }
}

QQ截图20220616113222.png
(5)foldByKey

  • 函数签名

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

object Spark17_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("b", 3),
      ("b", 4), ("b", 5), ("a", 6)
    ), 2)
    // 如果聚合计算时,分区内和分区间计算规则相同,aggregateByKey 就可以简化为 foldByKey
    rdd.foldByKey(0)(_ + _).collect.foreach(println)

    sc.stop()
  }
}

QQ截图20220616111131.png
(6)combineByKey

  • 函数签名

def combineByKeyC => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]

  • 函数说明

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

object Spark19_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("b", 3),
      ("b", 4), ("b", 5), ("a", 6)
    ), 2)

    // combineByKey:方法需要三个参数
    // 第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
    // 第二个参数表示:分区内的计算关系
    // 第三个参数表示:分区间的计算关系
    val newRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(
      v => (v, 1),
      (t: (Int, Int), v) => {
        (t._1 + v, t._2 + 1)
      },
      (t1: (Int, Int), t2: (Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
    val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
      case (num, cnt) => {
        num / cnt
      }
    }
    resultRDD.collect().foreach(println)

    sc.stop()
  }
}

reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?

  • reduceByKey:相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
  • foldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
  • aggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
  • combineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同

(7)join

  • 函数签名

def joinW]): RDD[(K, (V, W))]

  • 函数说明

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD

object Spark21_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("c", 3)
    ))

    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("b", 5), ("c", 6)
    ))

    val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
    joinRDD.collect().foreach(println)

    sc.stop()
  }
}

QQ截图20220616160700.png

object Spark21_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("c", 3)
    ))

    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("c", 5), ("a", 6)
    ))

    // join:两个不同数据源的数据,相同的key的value会连接在一起,形成元组
    /*
    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("c", 3)
    ))

    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("b", 5), ("c", 6)
    ))
    (a,(1,4))
    (b,(2,5))
    (c,(3,6))
     */
    //      如果两个数据源中key没有匹配上,数据不会出现在结果中
    /*
    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("c", 3)
    ))

    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("d", 5), ("c", 6)
    ))
    (a,(1,4))
    (c,(3,6))
     */
    // 如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔积
    /*
    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("c", 3)
    ))

    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("c", 5), ("a", 6)
    ))
    (a,(1,4))
    (a,(1,6))
    (a,(2,4))
    (a,(2,6))
    (c,(3,5))
     */
    val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
    joinRDD.collect().foreach(println)

    sc.stop()
  }
}

(8)leftOuterJoin

  • 函数签名

def leftOuterJoinW]): RDD[(K, (V, Option[W]))]

object Spark22_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("c", 3)
    ))

    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("b", 5)
    ))

    val leftJoinRDD = rdd1.leftOuterJoin(rdd2)
    val rightJoinRDD = rdd1.rightOuterJoin(rdd2)
    leftJoinRDD.collect().foreach(println)
    println("=====")
    rightJoinRDD.collect().foreach(println)

    sc.stop()
  }
}

QQ截图20220616163127.png
(9)cogroup

  • 函数签名

def cogroupW]): RDD[(K, (Iterable[V], Iterable[W]))]

  • 函数说明

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的RDD

object Spark23_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子-Key-Value类型
    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2)
    ))

    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("b", 5), ("c", 6), ("c", 7)
    ))

    // cogroup: connect + group,分组连接
    val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
    cgRDD.collect().foreach(println)
    sc.stop()
  }
}

QQ截图20220616163821.png

数据:agent.log,时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
需求:统计出每一个省份每个广告被点击数量排行的 Top3

object Spark24_RDD_Req {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // 1. 获取原始数据:时间戳,省份,城市,用户,广告
    val dataRDD = sc.textFile("datas/agent.log")

    // 2. 将原始数据进行结构的转换
    // 时间戳,省份,城市,用户,广告=>((省份,广告),1)
    val mapRDD = dataRDD.map(
      line => {
        val datas = line.split(" ")
        ((datas(1), datas(4)), 1)
      }
    )

    // 3. 将转换结构后的数据进行分组聚合
    // ((省份,广告),1)=> ((省份,广告),sum)
    val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _)

    // 4. 将聚合的结果进行结构转换
    // ((省份,广告),sum)=> (省份,(广告,sum))
    val newMapRDD: RDD[(String, (String, Int))] = reduceRDD.map {
      case ((prv, ad), sum) => {
        (prv, (ad, sum))
      }
    }

    // 5. 将转换结构后的数据根据省份进行分组
    // (省份,【(广告A,sumA),(广告B,sumB)】)
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()

    // 6. 将分组后的数据组内排序(降序),取前3名
    val resultRDD = groupRDD.mapValues(
      iter => {
        // Iterable不能排序,转成List
        // 默认升序,改成降序
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
      }
    )

    // 7. 采集数据打印在控制台
    resultRDD.collect().foreach(println)
    sc.stop()
  }
}

QQ截图20220617103329.png

1.5.3.2 RDD行动算子

(1)collect

object Spark01_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // TODO - 行动算子
    // 触发作业执行的方法
    // 底层调用的是环境对象的runJob方法 val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    // 底层代码创建ActiveJob,并提交执行
    // collect:方法会将不同分区的数据按照分区顺序采集到Driver端内存,形成数组
    val ints: Array[Int] = rdd.collect()
    println(ints.mkString(","))

    sc.stop()
  }
}

(2)reduce

object Spark01_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // TODO - 行动算子
    val i: Int = rdd.reduce(_ + _)
    println(i) // 10

    sc.stop()
  }
}

(3)count

object Spark01_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // TODO - 行动算子
    // count:数据源中数据的个数
    val l = rdd.count()
    println(l)

    sc.stop()
  }
}

(4)first

object Spark01_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // TODO - 行动算子
    // first:获取数据源中数据的第一个
    val first = rdd.first()
    println(first)

    sc.stop()
  }
}

(5)take

object Spark01_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // TODO - 行动算子
    // take:获取N个数据
    val ints: Array[Int] = rdd.take(3)
    println(ints.mkString(","))

    sc.stop()
  }
}

(6)takeOrdered

object Spark01_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(4, 2, 3, 1))

    // TODO - 行动算子
    // takeOrdered:数据排序后,取N个数据
    val ints: Array[Int] = rdd.takeOrdered(3)
    println(ints.mkString(",")) // 1,2,3

    sc.stop()
  }
}

(7)aggregate

object Spark03_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    // TODO - 行动算子
    // aggregateByKey:初始值只会参与分区内计算
    // aggregate:初始值会参与分区内计算,并且参与分区间计算
    val result = rdd.aggregate(10)(_ + _, _ + _)
    println(result) // 40 = 10 + (1 + 2 + 10) + (3 + 4 + 10)

    sc.stop()
  }
}

(8)fold

object Spark03_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    // TODO - 行动算子
    // aggregate简化版操作
    val result = rdd.fold(10)(_ + _)
    println(result) // 40 

    sc.stop()
  }
}

(9)countByKey

object Spark04_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    // TODO - 行动算子
    val intToLong: collection.Map[Int, Long] = rdd.countByValue()
    println(intToLong)

    sc.stop()
  }
}

QQ截图20220617112820.png

object Spark04_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 1, 1, 4), 2)

    // TODO - 行动算子
    val intToLong: collection.Map[Int, Long] = rdd.countByValue()
    println(intToLong)

    sc.stop()
  }
}

QQ截图20220617113009.png

object Spark04_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("a", 3)
    ))

    // TODO - 行动算子
    val stringToLong: collection.Map[String, Long] = rdd.countByKey()
    println(stringToLong) // Map(a -> 3)

    sc.stop()
  }
}

QQ截图20220617113457.png
(10)save相关

object Spark05_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("a", 3)
    ))

    // TODO - 行动算子
    // 保存成 Text 文件
    rdd.saveAsTextFile("output")
    // 序列化成对象保存到文件
    rdd.saveAsObjectFile("output1")
    // 保存成 Sequencefile 文件 必须是k-v类型
    rdd.saveAsSequenceFile("output2")

    sc.stop()
  }
}

(11)foreach

object Spark06_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // foreach是Driver端内存集合的循环遍历方法
    rdd.collect().foreach(println)
    println("***********")
    // foreach是Executor端内存数据打印
    rdd.foreach(println)

    // 算子:Operator(操作)区分不同的处理效果,将RDD的方法称为算子
    // RDD的方法和Scala集合对象的方法不一样
    // 集合对象的方法都是在同一个节点的内存中完成的
    // RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
    // RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor
    sc.stop()
  }
}

1.5.3.3 WordCount

object Spark03_WordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparkConf)

    wordcount9(sc)

    sc.stop()
  }

  // groupBy
  def wordcount1(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
    val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
  }

  // groupByKey
  def wordcount2(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val group: RDD[(String, Iterable[Int])] = wordOne.groupByKey()
    val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
  }

  // reduceByKey
  def wordcount3(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_ + _)
  }

  // aggregateByKey
  def wordcount4(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordOne.aggregateByKey(0)(_ + _, _ + _)
  }

  // foldByKey
  def wordcount5(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordOne.foldByKey(0)(_ + _)
  }

  // combineByKey
  def wordcount6(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val wordCount: RDD[(String, Int)] = wordOne.combineByKey(
      v => v,
      (x: Int, y) => x + y,
      (x: Int, y: Int) => x + y
    )
  }

  // countByKey
  def wordcount7(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val wordOne = words.map((_, 1))
    val stringToLong: collection.Map[String, Long] = wordOne.countByKey()
  }

  // countByValue
  def wordcount8(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val stringToLong: collection.Map[String, Long] = words.countByValue()
  }

  // reduce,aggregate,fold
  def wordcount9(sc: SparkContext): Unit = {
    val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
    val words = rdd.flatMap(_.split(" "))
    val mapWord = words.map(
      word => {
        mutable.Map[String, Long]((word, 1))
      }
    )
    val wordCount = mapWord.reduce(
      (map1, map2) => {
        map2.foreach{
          case (word, count) => {
            val newCount = map1.getOrElse(word, 0L) + count
            map1.update(word, newCount)
          }
        }
        map1
      }
    )
  }
}

1.5.4 RDD序列化

object Spark07_RDD_Operator_Action {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    val user = new User()

    // RDD算子中传递的函数是会包含闭包操作,就会进行检测=>闭包检测
    rdd.foreach(
      num => {
        println("age = " + (user.age + num))
      }
    )

    sc.stop()
  }

  // 样例类在编译时,会自动混入序列化特质(实现可序列化接口)
  case class User(){
  // 或class User extends Serializable {
    var age: Int = 30
  }
}
  1. 闭包检查

从计算的角度,算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。

object Spark01_RDD_Serial {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive"))

    val search = new Search("h")

    // search.getMatch1(rdd).collect().foreach(println)
    search.getMatch2(rdd).collect().foreach(println)

    sc.stop()
  }
  // 查询对象
  // Scala类的构造参数其实是类的属性,构造参数需要进行闭包检测,等同于类进行闭包检测
  class Search(query: String) extends Serializable {
    def isMatch(s: String): Boolean = {
      s.contains(query)
    }
    // 函数序列化案例
    def getMatch1 (rdd: RDD[String]): RDD[String] = {
      rdd.filter(isMatch)
    }
    // 属性序列化案例
    def getMatch2(rdd: RDD[String]): RDD[String] = {
      // extends Serializable或者样例类
      // 或者
      val s = query // 这个在Driver执行
      rdd.filter(x => x.contains(s)) // 这个是算子在Executor执行
    }
  }
}
  1. Kryo 序列化框架

Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。
注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。

object serializable_Kryo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    .setAppName("SerDemo")
    .setMaster("local[*]")
    // 替换默认的序列化机制
    .set("spark.serializer", 
         "org.apache.spark.serializer.KryoSerializer")
    // 注册需要使用 kryo 序列化的自定义类
    .registerKryoClasses(Array(classOf[Searcher]))
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", 
                                            "atguigu", "hahah"), 2)
    val searcher = new Searcher("hello")
    val result: RDD[String] = searcher.getMatchedRDD1(rdd)
    result.collect.foreach(println)
  } }
case class Searcher(val query: String) {
  def isMatch(s: String) = {
    s.contains(query)
  }
  def getMatchedRDD1(rdd: RDD[String]) = {
    rdd.filter(isMatch) 
  }
  def getMatchedRDD2(rdd: RDD[String]) = {
    val q = query
    rdd.filter(_.contains(q))
  } 
}

1.5.5 RDD依赖关系

  1. RDD血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
QQ截图20220617175330.png
QQ截图20220617175738.png
QQ截图20220618201347.png

object Spark01_RDD_Dep {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("WC")
    val sc = new SparkContext(sparkConf)

    val lines: RDD[String] = sc.textFile("datas/word.txt")
    println(lines.toDebugString)
    val words: RDD[String] = lines.flatMap(_.split(" "))
    println(words.toDebugString)
    val wordToOne = words.map(word => (word, 1))
    println(wordToOne.toDebugString)
    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
    println(wordToSum.toDebugString)
    val array: Array[(String, Int)] = wordToSum.collect()
    array.foreach(println)
    sc.stop()
  }
}

QQ截图20220618201817.png

  1. RDD依赖关系

两个相邻 RDD 之间的关系

object Spark02_RDD_Dep {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("WC")
    val sc = new SparkContext(sparkConf)

    val lines: RDD[String] = sc.textFile("datas/word.txt")
    println(lines.dependencies)
    val words: RDD[String] = lines.flatMap(_.split(" "))
    println(words.dependencies)
    val wordToOne = words.map(word => (word, 1))
    println(wordToOne.dependencies)
    val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
    println(wordToSum.dependencies)
    val array: Array[(String, Int)] = wordToSum.collect()
    array.foreach(println)
    sc.stop()
  }
}

QQ截图20220618202224.png

  1. RDD窄依赖

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用
QQ截图20220618203841.png

  1. RDD宽依赖

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle。
QQ截图20220618203924.png

  1. RDD阶段划分

shuffle(要打乱重新组合),所以要等待,划分成了不同的阶段。
源码:
QQ截图20220618210433.png
QQ截图20220618210608.png
QQ截图20220618210651.png
QQ截图20220618210813.png
QQ截图20220618211841.png

  1. RDD任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext 即生成一个 Application
  • Job:一个 Action 算子就会生成一个 Job
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系
任务的数量 = 当前阶段中最后一个RDD的分区数量

1.5.6 RDD持久化

RDD不存储数据,如果一个RDD需要重复使用,需要从头再次执行 RDD对象可以重用,但数据无法重用

  1. RDD Cache缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该RDD 将会被缓存在计算节点的内存中,并供后面重用。
RDD对象的持久化操作不一定为了重用,在数据执行较长或比较重要也会使用持久化操作。

object Spark02_RDD_Persist {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    val list = List("Hello Scala", "Hello Spark")
    val rdd = sc.makeRDD(list)
    val flatRDD = rdd.flatMap(_.split(" "))

    val mapRDD = flatRDD.map((_, 1))

    val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)

    reduceRDD.collect().foreach(println)

    println("********")

    // cache默认持久化的操作将数据保存到内存中,如果要保存到磁盘,更改存储级别
    // mapRDD.cache()
    mapRDD.persist(StorageLevel.DISK_ONLY)

    val groupRDD = mapRDD.groupByKey()

    groupRDD.collect().foreach(println)
    sc.stop()
  }
}

QQ截图20220619173008.png

  1. RDD CheckPoint检查点

通过将 RDD 中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

object Spark02_RDD_Persist {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    sc.setCheckpointDir("./cp")

    val list = List("Hello Scala", "Hello Spark")
    val rdd = sc.makeRDD(list)
    val flatRDD = rdd.flatMap(_.split(" "))

    val mapRDD = flatRDD.map((_, 1))
    // checkpoint需要落盘,需要指定检查点保存路径
    // 检查点路径保存的文件,当作业执行完毕后,不会被删除
    // 一般保存路径都是在分布式存储系统中
    mapRDD.checkpoint()

    val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)

    reduceRDD.collect().foreach(println)

    println("********")

    val groupRDD = mapRDD.groupByKey()

    groupRDD.collect().foreach(println)
    sc.stop()
  }
}
  1. 区别
  • cache:将数据临时存储在内存中进行数据重用;会在血缘关系中添加新的依赖,一旦出现问题,可以重头读取数据
  • persist:将数据临时存储在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但数据安全;如果作业执行完毕,临时保存的数据文件就会丢失
  • checkpoint:将数据长久地保存在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但数据安全;为了保证数据安全,一般情况下会独立执行作业;为了提高效率,一般和cache联合使用;执行过程中,会切断血缘关系,重新建立新的血缘关系,等同于改变数据源

    mapRDD.cache()
    mapRDD.checkpoint()
    

    1.5.7 RDD分区器

    自定义分区器:

    object Spark01_RDD_Part {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local").setAppName("WC")
      val sc = new SparkContext(sparkConf)
    
      val rdd = sc.makeRDD(List(
        ("nba", "xxxxxxxx"),
        ("cba", "xxxxxxxx"),
        ("wnba", "xxxxxxxx"),
        ("nba", "xxxxxxxx")
      ))
    
      val partRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)
    
      partRDD.saveAsTextFile("output")
    
      sc.stop()
    }
    
    /**
     * 自定义分区器
     * 1. 继承Partitioner
     * 2. 重写方法
     */
    class MyPartitioner extends Partitioner {
      // 分区数量
      override def numPartitions: Int = 3
    
      // 根据数据的key值返回数据所在的分区索引,从0开始
      override def getPartition(key: Any): Int = {
        key match {
          case "nba" => 0
          case "wnba" => 1
          case _ => 2
        }
    
      }
    }
    }
    

    1.5.8 RDD文件读取与保存

    保存

    object Spark01_RDD_IO_Save {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local").setAppName("WC")
      val sc = new SparkContext(sparkConf)
      val rdd = sc.makeRDD(
        List(
          ("a", 1),
          ("b", 2),
          ("c", 3)
        )
      )
    
      rdd.saveAsTextFile("output1")
      rdd.saveAsObjectFile("output2")
      rdd.saveAsSequenceFile("output3")
    
      sc.stop()
    }
    }
    

    读取

    object Spark02_RDD_IO_Load {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local").setAppName("WC")
      val sc = new SparkContext(sparkConf)
    
      val rdd = sc.textFile("output1")
      println(rdd.collect().mkString(","))
    
      val rdd1 = sc.objectFile[(String, Int)]("output2")
      println(rdd1.collect().mkString(","))
    
      val rdd2 = sc.sequenceFile[String, Int]("output3")
      println(rdd2.collect().mkString(","))
    
      sc.stop()
    }
    }
    

    2 累加器

    object Spark01_Acc {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local").setAppName("Acc")
      val sc = new SparkContext(sparkConf)
      val rdd = sc.makeRDD(List(1, 2, 3, 4))
    
      // reduce:分区内计算,分区间计算
      // val i: Int = rdd.reduce(_ + _)
      // println(i)
      var sum = 0
      rdd.foreach(
        num => {
          sum += num
        }
      )
      println("sum = " + sum) // 输出0,在Executor中改变了sum,但是没有返回Driver
      sc.stop()
    }
    }
    

    QQ截图20220619205726.png

    2.1 实现原理

    累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

    2.2 基础编程

    2.2.1 系统累加器

    object Spark02_Acc {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local").setAppName("Acc")
      val sc = new SparkContext(sparkConf)
      val rdd = sc.makeRDD(List(1, 2, 3, 4))
    
      // 获取系统累加器
      // Spark默认提供了简单数据聚合的累加器
      val sumAcc = sc.longAccumulator("sum") // 参数是给它起的名
      // sc.doubleAccumulator
      // sc.collectionAccumulator  // 集合类型,默认是List
      rdd.foreach(
        num => {
          // 使用累加器
          sumAcc.add(num)
        }
      )
      // 获取累加器的值
      println(sumAcc.value)
      sc.stop()
    }
    }
    

    QQ截图20220619205805.png

    object Spark03_Acc {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local").setAppName("Acc")
      val sc = new SparkContext(sparkConf)
      val rdd = sc.makeRDD(List(1, 2, 3, 4))
    
      // 获取系统累加器
      // Spark默认提供了简单数据聚合的累加器
      val sumAcc = sc.longAccumulator("sum") // 参数是给它起的名
      // sc.doubleAccumulator
      // sc.collectionAccumulator  // 集合类型,默认是List
      val mapRDD = rdd.map(
        num => {
          // 使用累加器
          sumAcc.add(num)
          num
        }
      )
      // 获取累加器的值
      // 少加:转换算子中调用累加器,如果没有行动算子,不会执行
      println(sumAcc.value) // 输出0
    
      // 多加:转换算子中调用累加器,多次调用
      // 所以累加器一般放在行动算子中
      mapRDD.collect()
      mapRDD.collect()
      println(sumAcc.value) // 20
      sc.stop()
    }
    }
    

    2.2.2 自定义累加器

    object Spark04_Acc_WordCount {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local").setAppName("Acc")
      val sc = new SparkContext(sparkConf)
      val rdd = sc.makeRDD(List("hello", "spark", "hello"))
    
      // 累加器: WordCount
      // 创建累加器对象
      val wcAcc = new MyAccumulator
      // 向Spark进行注册
      sc.register(wcAcc, "wordCountAcc")
    
      rdd.foreach(
        word => {
          // 数据的累加
          wcAcc.add(word)
        }
      )
    
      // 获取累加器累加的结果
      println(wcAcc.value)
      sc.stop()
    }
    
    /*
    自定义数据累加器:WordCount
    1. 继承AccumulatorV2,定义泛型
      IN:累加器输入的数据类型
      OUT:累加器返回的数据类型
    2. 重写方法
     */
    class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
      private var wcMap = mutable.Map[String, Long]()
    
      // 判断是否为初始状态
      override def isZero: Boolean = {
        wcMap.isEmpty
      }
    
      override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
        new MyAccumulator()
      }
    
      override def reset(): Unit = {
        wcMap.clear()
      }
    
      // 获取累加器需要计算的值
      override def add(word: String): Unit = {
        val newCnt = wcMap.getOrElse(word, 0L) + 1
        wcMap.update(word, newCnt)
      }
    
      // Driver合并多个累加器
      override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
        val map1 = this.wcMap
        val map2 = other.value
    
        map2.foreach{
          case (word, count) => {}
            val newCnt = map1.getOrElse(word, 0L) + count
            map1.update(word, newCnt)
        }
      }
    
      // 累加器结果
      override def value: mutable.Map[String, Long] = {
        wcMap
      }
    }
    }
    

    3 广播变量

    广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。
    QQ截图20220620102808.png

    object Spark06_Bc {
    def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local").setAppName("Acc")
      val sc = new SparkContext(sparkConf)
      val rdd = sc.makeRDD(List(1, 2, 3, 4))
    
      val rdd1 = sc.makeRDD(List(
        ("a", 1), ("b", 2), ("c", 3)
      ))
    
      val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))
    
      // 封装广播变量
      val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
    
      rdd1.map {
        case (w, c) => {
          // 访问广播变量
          val l = bc.value.getOrElse(w, 0L)
          (w, (c, l))
        }
      }.collect().foreach(println)
    
      sc.stop()
    }
    }
    

    4 案例

    电商网站的用户行为数据,主要包含用户的 4 种行为:搜索,点击,下单,支付。

  • 数据文件中每行数据采用下划线分隔数据

  • 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种
  • 如果搜索关键字为 null,表示数据不是搜索数据
  • 如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据
  • 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示
  • 支付行为和下单行为类似

字段:

//用户访问动作表
case class UserVisitAction(
  date: String, //用户点击行为的日期
  user_id: Long, //用户的 ID
  session_id: String, //Session 的 ID
  page_id: Long, //某个页面的 ID
  action_time: String, //动作的时间点
  search_keyword: String, //用户搜索的关键词
  click_category_id: Long, //某一个商品品类的 ID
  click_product_id: Long, //某一个商品的 ID
  order_category_ids: String, //一次订单中所有品类的 ID 集合
  order_product_ids: String, //一次订单中所有商品的 ID 集合
  pay_category_ids: String, //一次支付中所有品类的 ID 集合
  pay_product_ids: String, //一次支付中所有商品的 ID 集合
  city_id: Long //城市 id
)

4.1 需求1

先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。

4.1.1 实现一

object Spark01_Req1_HotCatagoryTop10Analysis {
  def main(args: Array[String]): Unit = {
    // TODO TOP10热门品类
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
    val sc = new SparkContext(sparkConf)

    // 1. 读取原始日志数据
    val actionRDD = sc.textFile("datas/user_visit_action.txt")

    // 2. 统计品类的点击数量:(品类ID,点击数量)
    val clickActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(6) != "-1"
      }
    )
    val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
      action => {
        val datas = action.split("_")
        (datas(6), 1)
      }
    ).reduceByKey(_ + _)

    // 3. 统计品类的下单数量:(品类ID,下单数量)
    val orderActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(8) != "null"
      }
    )
    // 扁平化操作
    val OrderCountRDD = orderActionRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(8)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)

    // 4. 统计品类的支付数量:(品类ID,支付数量)
    val payActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(10) != "null"
      }
    )

    val payCountRDD = payActionRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(10)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)

    // 5. 将品类排序,并取前10名
    //    点击数量排序,下单数量排序,支付数量排序
    //    元组排序:先比较第一个,再比较第二个,再比较第三个,以此类推
    //    (品类ID,(点击数量,下单数量,支付数量))
    val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRDD.cogroup(OrderCountRDD, payCountRDD)
    val analysisRDD = cogroupRDD.mapValues{
      case (clickIter, orderIter, payIter) => {
        var clickCnt = 0
        val iter1 = clickIter.iterator // iter里面基本只有一个值,存数量,next表示取里面那个值
        if (iter1.hasNext) {
          clickCnt = iter1.next()
        }
        var orderCnt = 0
        val iter2 = orderIter.iterator
        if (iter2.hasNext) {
          orderCnt = iter2.next()
        }
        var payCnt = 0
        val iter3 = payIter.iterator
        if (iter3.hasNext) {
          payCnt = iter3.next()
        }
        (clickCnt, orderCnt, payCnt)
      }
    }
    val resultRDD = analysisRDD.sortBy(_._2, false).take(10)

    // 6. 将结果采集到控制台打印
    resultRDD.foreach(println)

    sc.stop()
  }
}

QQ截图20220620111842.png

4.1.2 实现二

object Spark02_Req1_HotCatagoryTop10Analysis {
  def main(args: Array[String]): Unit = {
    // TODO TOP10热门品类
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
    val sc = new SparkContext(sparkConf)

    // 1. 读取原始日志数据
    val actionRDD = sc.textFile("datas/user_visit_action.txt")
    actionRDD.cache()

    // 2. 统计品类的点击数量:(品类ID,点击数量)
    val clickActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(6) != "-1"
      }
    )
    val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
      action => {
        val datas = action.split("_")
        (datas(6), 1)
      }
    ).reduceByKey(_ + _)

    // 3. 统计品类的下单数量:(品类ID,下单数量)
    val orderActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(8) != "null"
      }
    )
    // 扁平化操作
    val orderCountRDD = orderActionRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(8)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)

    // 4. 统计品类的支付数量:(品类ID,支付数量)
    val payActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(10) != "null"
      }
    )

    val payCountRDD = payActionRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(10)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)

    // (品类ID,点击数量) => (品类ID,(点击数量,0,0))
    // (品类ID,下单数量) => (品类ID,(0,下单数量,0))
    //                 => (品类ID,(点击数量,下单数量,0)
    // (品类ID,支付数量) => (品类ID,(0,0,支付数量))
    //                 => (品类ID,(点击数量,下单数量,支付数量))


    // 5. 将品类排序,并取前10名
    //    点击数量排序,下单数量排序,支付数量排序
    //    元组排序:先比较第一个,再比较第二个,再比较第三个,以此类推
    //    (品类ID,(点击数量,下单数量,支付数量))
    val rdd1 = clickCountRDD.map {
      case (cid, cnt) => {
        (cid, (cnt, 0, 0))
      }
    }
    val rdd2 = orderCountRDD.map {
      case (cid, cnt) => {
        (cid, (0, cnt, 0))
      }
    }

    val rdd3 = payCountRDD.map {
      case (cid, cnt) => {
        (cid, (0, 0, cnt))
      }
    }

    // 将三个数据源合并在一起,同一进行聚合计算
    val sourceRDD: RDD[(String, (Int, Int, Int))] = rdd1.union(rdd2).union(rdd3)
    val analysisRDD = sourceRDD.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    )
    val resultRDD = analysisRDD.sortBy(_._2, false).take(10)

    // 6. 将结果采集到控制台打印
    resultRDD.foreach(println)

    sc.stop()
  }
}

4.1.3 实现三

object Spark03_Req1_HotCatagoryTop10Analysis {
  def main(args: Array[String]): Unit = {
    // TODO TOP10热门品类
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
    val sc = new SparkContext(sparkConf)

    // 1. 读取原始日志数据
    val actionRDD = sc.textFile("datas/user_visit_action.txt")

    // reduceByKey 聚合算子,Spark会提供优化,缓存
    // 但这里是不同数据源的reduceByKey

    // 2. 将数据转换结构
    //    点击:(品类ID,(1,0,0))
    //    下单:(品类ID,(0,1,0))
    //    支付:(品类ID,(0,0,1))
    val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
      action => {
        val datas = action.split("_")
        if (datas(6) != "-1") {
          List((datas(6), (1, 0, 0)))
        } else if (datas(8) != "null") {
          val ids = datas(8).split(",")
          ids.map(id => (id, (0, 1, 0)))
        } else if (datas(10) != "null") {
          // 支付的场合
          val ids = datas(10).split(",")
          ids.map(id => (id, (0, 0, 1)))
        } else {
          Nil
        }
      }
    )

    // 3. 将相同的品类ID的数据进行分组聚合
    //    (品类ID,(点击数量,下单数量,支付数量))
    val analysisRDD = flatRDD.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    )

    // 4. 将统计结果根据数量进行降序处理,取前10名
    val resultRDD = analysisRDD.sortBy(_._2, false).take(10)

    // 5. 将结果采集到控制台打印
    resultRDD.foreach(println)

    sc.stop()
  }
}

4.1.4 实现四

object Spark04_Req1_HotCatagoryTop10Analysis {
  def main(args: Array[String]): Unit = {
    // TODO TOP10热门品类
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
    val sc = new SparkContext(sparkConf)

    // 1. 读取原始日志数据
    val actionRDD = sc.textFile("datas/user_visit_action.txt")

    val acc = new HotCategoryAccumulator
    sc.register(acc, "hotCategory")

    // 2. 将数据转换结构
    //    点击:(品类ID,(1,0,0))
    //    下单:(品类ID,(0,1,0))
    //    支付:(品类ID,(0,0,1))
    actionRDD.foreach(
      action => {
        val datas = action.split("_")
        if (datas(6) != "-1") {
          acc.add((datas(6), "click"))
        } else if (datas(8) != "null") {
          val ids = datas(8).split(",")
          ids.foreach(
            id => {
              acc.add((id, "order"))
            }
          )
        } else if (datas(10) != null) {
          val ids = datas(10).split(",")
          ids.foreach(
            id => {
              acc.add((id, "pay"))
            }
          )
        }
      }
    )

    val accVal: mutable.Map[String, HotCategory] = acc.value
    val categories: mutable.Iterable[HotCategory] = accVal.map(_._2)

    val sort = categories.toList.sortWith(
      (left, right) => {
        if (left.clickCnt > right.clickCnt) {
          true
        } else if (left.clickCnt == right.clickCnt) {
          if (left.orderCnt > right.orderCnt) {
            true
          } else if (left.orderCnt == right.orderCnt) {
            left.payCnt > right.payCnt
          } else {
            false
          }
        } else {
          false
        }
      }
    )

    // 5. 将结果采集到控制台打印
    sort.take(10).foreach(println)

    sc.stop()
  }

  case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)

  /**
   * 自定义累加器
   * 1. 继承AccumulatorV2,定义泛型
   * IN: (品类ID,行为类型)
   * OUT: mutable.Map[String, HotCategory]
   * 2. 重写方法
   */
  class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {
    private val hcMap = mutable.Map[String, HotCategory]()

    override def isZero: Boolean = {
      hcMap.isEmpty
    }

    override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {
      new HotCategoryAccumulator()
    }

    override def reset(): Unit = {
      hcMap.clear()
    }

    override def add(v: (String, String)): Unit = {
      val cid = v._1
      val actionType = v._2
      val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))
      if (actionType == "click") {
        category.clickCnt += 1
      } else if (actionType == "order") {
        category.orderCnt += 1
      } else if (actionType == "pay") {
        category.payCnt += 1
      }
      hcMap.update(cid, category)
    }

    override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
      val map1 = this.hcMap
      val map2 = other.value

      map2.foreach {
        case (cid, hc) => {
          val category = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))
          category.clickCnt += hc.clickCnt
          category.orderCnt += hc.orderCnt
          category.payCnt += hc.payCnt
          map1.update(cid, category)
        }
      }
    }

    override def value: mutable.Map[String, HotCategory] = hcMap
  }
}

4.2 需求二

Top10热门品类中每个品类的Top10活跃Session统计,在需求一增加每个品类用户session的点击统计

object Spark05_Req2_HotCatagoryTop10SessionAnalysis {
  def main(args: Array[String]): Unit = {
    // TODO TOP10热门品类
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
    val sc = new SparkContext(sparkConf)

    val actionRDD = sc.textFile("datas/user_visit_action.txt")
    actionRDD.cache()

    val top10Ids: Array[String] = top10Category(actionRDD)

    // 1. 过滤原始数据,保留点击和前10品类ID
    val filterActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        if (datas(6) != "-1") {
          top10Ids.contains(datas(6))
        } else {
          false
        }
      }
    )

    // 2. 根据品类ID和sessionid进行点击量的统计
    val reduceRDD: RDD[((String, String), Int)] = filterActionRDD.map(
      action => {
        val datas = action.split("_")
        ((datas(6), datas(2)), 1)
      }
    ).reduceByKey(_ + _)

    // 3. 将统计的结果进行结构的转换
    // ((品类ID,sessionId),sum) => (品类ID,(sessionId,sum))
    val mapRDD = reduceRDD.map {
      case ((cid, sid), sum) => {
        (cid, (sid, sum))
      }
    }

    // 4. 相同品类进行分组
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()

    // 5. 将分组后的数据进行点击量的排序,取前10名
    val resultRDD = groupRDD.mapValues(
      iter => {
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
      }
    )
    resultRDD.collect().foreach(println)

    sc.stop()
  }

  def top10Category(actionRDD: RDD[String]) = {
    val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
      action => {
        val datas = action.split("_")
        if (datas(6) != "-1") {
          List((datas(6), (1, 0, 0)))
        } else if (datas(8) != "null") {
          val ids = datas(8).split(",")
          ids.map(id => (id, (0, 1, 0)))
        } else if (datas(10) != "null") {
          // 支付的场合
          val ids = datas(10).split(",")
          ids.map(id => (id, (0, 0, 1)))
        } else {
          Nil
        }
      }
    )

    val analysisRDD = flatRDD.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    )

    analysisRDD.sortBy(_._2, false).take(10).map(_._1)

  }
}

4.3 需求三

页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,单跳转化率就是要统计页面点击的概率。
比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV) 为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B, B/A 就是 3-5 的页面单跳转化率。

object Spark06_Req3_PageFlowAnalysis {
  def main(args: Array[String]): Unit = {
    // TODO TOP10热门品类
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCatagoryTop10Analysis") // *表示当前最大可用核数
    val sc = new SparkContext(sparkConf)

    val actionRDD = sc.textFile("datas/user_visit_action.txt")

    val actionDataRDD = actionRDD.map(
      action => {
        val datum = action.split("_")
        UserVisitAction(
          datum(0),
          datum(1).toLong,
          datum(2),
          datum(3).toLong,
          datum(4),
          datum(5),
          datum(6).toLong,
          datum(7).toLong,
          datum(8),
          datum(9),
          datum(10),
          datum(11),
          datum(12).toLong
        )
      }
    )
    actionDataRDD.cache()
    // TODO 计算分母
    val pageIdToCount: Map[Long, Long] = actionDataRDD.map(
      action => {
        (action.page_id, 1L)
      }
    ).reduceByKey(_ + _).collect().toMap

    // TODO 计算分子

    // 根据session进行分组
    val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)

    // 分组后,根据访问时间进行排序(升序)
    val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
      iter => {
        val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
        val flowIds: List[Long] = sortList.map(_.page_id)
        // [1,2,3,4]
        // [1,2],[2,3],[3,4]
        // [1,2,3,4]和[2,3,4] zip拉链
        val pageflowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)
        pageflowIds.map(
          t => {
            (t, 1)
          }
        )
      }
    )
    val flatRDD: RDD[((Long, Long), Int)] = mvRDD.map(_._2).flatMap(list => list)
    val dataRDD = flatRDD.reduceByKey(_ + _)

    // TODO 计算单跳转换率
    // 分子除以分母
    dataRDD.foreach {
      case ((pageid1, pageid2), sum) => {
        val lon = pageIdToCount.getOrElse(pageid1, 0L)
        println(s"页面${pageid1}跳转到页面${pageid2}单跳转换率为:" + (sum.toDouble / lon))
      }
    }


    sc.stop()
  }
}

//用户访问动作表
case class UserVisitAction(
                            date: String, //用户点击行为的日期
                            user_id: Long, //用户的 ID
                            session_id: String, //Session 的 ID
                            page_id: Long, //某个页面的 ID
                            action_time: String, //动作的时间点
                            search_keyword: String, //用户搜索的关键词
                            click_category_id: Long, //某一个商品品类的 ID
                            click_product_id: Long, //某一个商品的 ID
                            order_category_ids: String, //一次订单中所有品类的 ID 集合
                            order_product_ids: String, //一次订单中所有商品的 ID 集合
                            pay_category_ids: String, //一次支付中所有品类的 ID 集合
                            pay_product_ids: String, //一次支付中所有商品的 ID 集合
                            city_id: Long //城市 id
                          )

QQ截图20220622113213.png

  • 对指定的页面连续跳转进行统计 ```scala object Spark06_Req3_PageFlowAnalysis { def main(args: Array[String]): Unit = { // TODO TOP10热门品类 val sparkConf = new SparkConf().setMaster(“local[]”).setAppName(“HotCatagoryTop10Analysis”) // 表示当前最大可用核数 val sc = new SparkContext(sparkConf)

    val actionRDD = sc.textFile(“datas/user_visit_action.txt”)

    val actionDataRDD = actionRDD.map(

    action => {
      val datum = action.split("_")
      UserVisitAction(
        datum(0),
        datum(1).toLong,
        datum(2),
        datum(3).toLong,
        datum(4),
        datum(5),
        datum(6).toLong,
        datum(7).toLong,
        datum(8),
        datum(9),
        datum(10),
        datum(11),
        datum(12).toLong
      )
    }
    

    ) actionDataRDD.cache()

    // TODO 对指定的页面连续跳转进行统计 val ids = ListLong val okflowIDs: List[(Long, Long)] = ids.zip(ids.tail)

    // TODO 计算分母 val pageidToCountMap: Map[Long, Long] = actionDataRDD.filter(

    action => {
      ids.init.contains(action.page_id)
    }
    

    ).map(

    action => {
      (action.page_id, 1L)
    }
    

    ).reduceByKey( + ).collect().toMap

    // TODO 计算分子

    // 根据session进行分组 val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)

    // 分组后,根据访问时间进行排序(升序) val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(

    iter => {
      val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
      val flowIds: List[Long] = sortList.map(_.page_id)
      // [1,2,3,4]
      // [1,2],[2,3],[3,4]
      // [1,2,3,4]和[2,3,4] zip拉链
      val pageflowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)
    
      // 将不合法的页面跳转进行过滤
      pageflowIds.filter(
        t => {
          okflowIDs.contains(t)
        }
      ).map(
        t => {
          (t, 1)
        }
      )
    }
    

    ) val flatRDD: RDD[((Long, Long), Int)] = mvRDD.map(._2).flatMap(list => list) val dataRDD = flatRDD.reduceByKey( + _)

    // TODO 计算单跳转换率 // 分子除以分母 dataRDD.foreach {

    case ((pageid1, pageid2), sum) => {
      val lon = pageidToCountMap.getOrElse(pageid1, 0L)
      println(s"页面${pageid1}跳转到页面${pageid2}单跳转换率为:" + (sum.toDouble / lon))
    }
    

    }

    sc.stop() } }

//用户访问动作表 case class UserVisitAction( date: String, //用户点击行为的日期 user_id: Long, //用户的 ID session_id: String, //Session 的 ID page_id: Long, //某个页面的 ID action_time: String, //动作的时间点 search_keyword: String, //用户搜索的关键词 click_category_id: Long, //某一个商品品类的 ID click_product_id: Long, //某一个商品的 ID order_category_ids: String, //一次订单中所有品类的 ID 集合 order_product_ids: String, //一次订单中所有商品的 ID 集合 pay_category_ids: String, //一次支付中所有品类的 ID 集合 pay_product_ids: String, //一次支付中所有商品的 ID 集合 city_id: Long //城市 id )

![QQ截图20220622114315.png](https://cdn.nlark.com/yuque/0/2022/png/26273875/1655869399517-fab4dfef-d67a-419a-bfa7-7ec24257c3d2.png#clientId=u048995da-9368-4&crop=0&crop=0&crop=1&crop=1&from=ui&height=138&id=u7ec90965&margin=%5Bobject%20Object%5D&name=QQ%E6%88%AA%E5%9B%BE20220622114315.png&originHeight=185&originWidth=525&originalType=binary&ratio=1&rotation=0&showTitle=false&size=39183&status=done&style=none&taskId=uf3078d76-1e43-426b-8ecb-94cd69a0e0c&title=&width=393)
<a name="fTH3o"></a>
# 5 架构模式
<a name="T3Zmq"></a>
## 5.1 三层架构
controller(控制层),service(服务层),dao(持久层)<br />![QQ截图20220622190708.png](https://cdn.nlark.com/yuque/0/2022/png/26273875/1655896044973-ce4ca196-25ba-4aea-bfe9-2cca282e1ee3.png#clientId=u330a1702-0af9-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u70667116&margin=%5Bobject%20Object%5D&name=QQ%E6%88%AA%E5%9B%BE20220622190708.png&originHeight=656&originWidth=1348&originalType=binary&ratio=1&rotation=0&showTitle=false&size=73379&status=done&style=none&taskId=u2935a045-36eb-4d7d-b6cb-53d355cc2a6&title=)
<a name="xQ95D"></a>
## 5.2 实现
![QQ截图20220623103221.png](https://cdn.nlark.com/yuque/0/2022/png/26273875/1655951546930-a256d89b-4d9a-4ce6-9d44-efb51a9758a7.png#clientId=udfdcb4d0-7644-4&crop=0&crop=0&crop=1&crop=1&from=ui&height=342&id=uef1afe19&margin=%5Bobject%20Object%5D&name=QQ%E6%88%AA%E5%9B%BE20220623103221.png&originHeight=428&originWidth=295&originalType=binary&ratio=1&rotation=0&showTitle=false&size=17087&status=done&style=none&taskId=ub28242e2-aca4-4d60-8ebb-e062888070c&title=&width=236)
```scala
package com.example.bigdata.spark.core.framework.application

import com.example.bigdata.spark.core.framework.common.TApplication
import com.example.bigdata.spark.core.framework.controller.WordCountController

object WordCountApplication extends App with TApplication {
  // 启动应用程序
  start() {
    val controller = new WordCountController()
    controller.dispatch()
  }
}
package com.example.bigdata.spark.core.framework.common

import com.example.bigdata.spark.core.framework.util.EnvUtil
import org.apache.spark.{SparkConf, SparkContext}

trait TApplication {
  def start(master: String = "local[*]", app: String = "Application")(op: => Unit): Unit = {
    val sparkConf = new SparkConf().setMaster(master).setAppName(app)
    val sc = new SparkContext(sparkConf)
    EnvUtil.put(sc)

    try {
      op
    } catch {
      case ex => println(ex.getMessage)
    }
    sc.stop()
    EnvUtil.clear()
  }
}
package com.example.bigdata.spark.core.framework.common

trait TController {
  def dispatch(): Unit
}
package com.example.bigdata.spark.core.framework.common

import com.example.bigdata.spark.core.framework.util.EnvUtil
import org.apache.spark.rdd.RDD

trait TDao {
  def readFile(path: String): RDD[String] = {
    EnvUtil.take().textFile(path)
  }
}
package com.example.bigdata.spark.core.framework.common

trait TService {
  def dataAnalysis(): Any
}
package com.example.bigdata.spark.core.framework.controller

import com.example.bigdata.spark.core.framework.common.TController
import com.example.bigdata.spark.core.framework.service.WordCountService

/**
 * 控制层
 */
class WordCountController extends TController{
  private val wordCountService = new WordCountService()
  // 调度
  def dispatch(): Unit = {
    // TODO 执行业务操作
    val array = wordCountService.dataAnalysis()
    array.foreach(println)
  }
}
package com.example.bigdata.spark.core.framework.dao

import com.example.bigdata.spark.core.framework.common.TDao

/**
 * 持久层
 */
class WordCountDao extends TDao {

}
package com.example.bigdata.spark.core.framework.service

import com.example.bigdata.spark.core.framework.common.TService
import com.example.bigdata.spark.core.framework.dao.WordCountDao
import org.apache.spark.rdd.RDD

/**
 * 服务层
 */
class WordCountService extends TService{
  private val wordCountDao = new WordCountDao()

  // 数据分析
  def dataAnalysis(): Array[(String, Int)] = {
    val lines = wordCountDao.readFile("datas/word.txt")
    val words: RDD[String] = lines.flatMap(_.split(" "))
    val wordToOne = words.map(word => (word, 1))
    val wordToSum = wordToOne.reduceByKey(_ + _)
    val array: Array[(String, Int)] = wordToSum.collect()
    array
  }
}
package com.example.bigdata.spark.core.framework.util

import org.apache.spark.SparkContext

object EnvUtil {
  private val scLocal = new ThreadLocal[SparkContext]()

  def put(sc: SparkContext): Unit = {
    scLocal.set(sc)
  }

  def take(): SparkContext = {
    scLocal.get()
  }

  def clear(): Unit = {
    scLocal.remove()
  }
}