3.3.1 aggregate

aggregate 详解

aggregateU: ClassTag(seqOp: (U, T) => U, combOp: (U, U) => U): U 对给定的数据集进行方法设定 zeroValue_ 运算符的每个分区的累积结果的初始值,以及combOp运算符的不同分区的合并结果的初始值-这通常是中性元素(例如,对于列表串联为零,对于求和为0) seqOp–用于在分区内累积结果的运算符 commOp–用于组合来自不同分区的结果的关联运算符

aggregate 代码示例

  1. object ChapterRDD1 {
  2. // 环境
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
  4. val sc: SparkContext = new SparkContext(conf)
  5. def main(args: Array[String]): Unit = {
  6. System.setProperty("hadoop.home.dir", "H:\\winutils\\winutils-master\\hadoop-2.6.0")
  7. // 生成一个RDD[Int] 并且规定numSlices为1 这样就代表一个节点1
  8. val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 1)
  9. // 传入初始值为0 传入math.max方法 聚合结果 _+_
  10. // Array(1, 2, 3, 4, 5, 6) 聚合来结果为 6
  11. val i1: Int = rdd1.aggregate(0)(math.max(_: Int, _: Int), (_: Int) + (_: Int))
  12. println(i1)
  13. // 得出结果为6
  14. // 生成一个RDD[Int] 并且规定numSlices为2 这样就代表一个节点2
  15. // Array(1,2,3) Array(4,5,6)
  16. val rdd2: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)
  17. // 传入初始值为0 传入math.max方方法
  18. // Array(1,2,3) Array(4,5,6) 两个节点得出结果 3 , 6 _+_ 聚合之后就变成了 9
  19. val i2: Int = rdd2.aggregate(0)(math.max(_: Int, _: Int), (_: Int) + (_: Int))
  20. println(i2)
  21. // 得出结果为 9
  22. // 加深印象 操作字符串
  23. val rdd3: RDD[String] = sc.parallelize(Array("a", "b", "c", "d", "e", "f"), 1)
  24. val str: String = rdd3.aggregate("")((_: String) + (_: String), (_: String) + (_: String))
  25. println(str)
  26. }
  27. }

上面的案列不仅仅使我们更加了解 aggregate 也了解了 SparkContext.parallelize 如何创建RDD 和 parallelize的分区

3.3.2 提前计算的cache方法

cache 详解

cache方法的作用是将数据内容计算并保存在计算节点的内存中,这个方法的使用是针对Spark的Lazy数据处理模式
在Lazy模式中,数据在i安逸和未使用时是不进行计算的,而仅仅保存其存储地址,只有在Action方法到来时才正式计算。这样做的好处在于可以极大的减少存储空间,从而·提高利用率,而有时必须要求数据及进行计算,此时就需要用cache方法, 可以直接调用

cache 代码示例

  1. object ChapterRDD_cache {
  2. // 环境
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
  4. val sc: SparkContext = new SparkContext(conf)
  5. def main(args: Array[String]): Unit = {
  6. System.setProperty("hadoop.home.dir", "H:\\winutils\\winutils-master\\hadoop-2.6.0")
  7. val rdd: RDD[String] = sc.parallelize(Array("a", "b", "c", "d", "e", "f"), 1)
  8. val value: RDD[String] = rdd.filter((_: String) != "a").map((_: String).toLowerCase())
  9. // 这样写的话 每一次输入都要重新进行计算
  10. value.foreach(println)
  11. value.foreach(println)
  12. value.foreach(println)
  13. println("*************************")
  14. // 先把计算结果放入缓存 这样输出的时候 或者再一次进行操作 不需要对前面的操作重复
  15. value.cache()
  16. value.foreach(println)
  17. value.foreach(println)
  18. value.foreach(println)
  19. // 感兴趣的小伙伴可以用时间 来看看运行时间差
  20. }
  21. }

3.3.3 笛卡尔操作的cartesian方法

cartesian 详解

此方法是用于对不同的数组进行笛卡尔操作,要求是数据集的长度必须相同, 此结果作为一个新的数据集返回。

cartesian 代码示例

  1. object ChapterRDD_cartesian {
  2. // 环境
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
  4. val sc: SparkContext = new SparkContext(conf)
  5. def main(args: Array[String]): Unit = {
  6. val rdd1: RDD[String] = sc.parallelize(Array("a", "b", "c"), 1)
  7. val rdd2: RDD[String] = sc.parallelize(Array("d", "e", "f"), 1)
  8. val value: RDD[(String, String)] = rdd1.cartesian(rdd2)
  9. value.foreach(println)
  10. // 输出的结果 就是每一个值 去匹配第二个rdd的每一个值
  11. // (a,d)
  12. // (a,e)
  13. // (a,f)
  14. // (b,d)
  15. // (b,e)
  16. // (b,f)
  17. // (c,d)
  18. // (c,e)
  19. // (c,f)
  20. }
  21. }

3.3.4 分片存储的coalesce

coalesce 详解

:::tips def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
// 第一个参数是将数据重新分成的片数
// 布尔值指的是将数据分成更小的片时使用 举例中将其设置为true :::

coalesce 代码示例

  1. object ChapterRDD_coaleasce {
  2. // 环境
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
  4. val sc: SparkContext = new SparkContext(conf)
  5. def main(args: Array[String]): Unit = {
  6. val rdd2: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 3)
  7. val rdd2Cpy: RDD[Int] = rdd2.coalesce(1, true)
  8. val ressult: Int = rdd2.aggregate(0)(math.max(_: Int, _: Int), (_: Int) + (_: Int))
  9. val ressult2: Int = rdd2Cpy.aggregate(0)(math.max(_: Int, _: Int), (_: Int) + (_: Int))
  10. println(s"result: ${ressult}")
  11. println(s"rdd2.getNumPartitions: ${rdd2.getNumPartitions}")
  12. println(s"rdd2Cpy.getNumPartitions: ${rdd2Cpy.getNumPartitions}")
  13. println(s"result: ${ressult2}")
  14. //result: 12
  15. //rdd2.getNumPartitions: 3
  16. //rdd2Cpy.getNumPartitions: 1
  17. //result: 6
  18. }
  19. }
  1. RDD还有一个repartition方法

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
} // 可以看到 repartition就是coalesce 为true的时候

3.3.5 以value计算的 countByValue方法

countByValue 详解

countByValue 方法是计算数据集某个数据出现的个数,并且以map的形式返回

countByValue代码示例

  1. object ChapterRDD_countByValue {
  2. // 环境
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
  4. val sc: SparkContext = new SparkContext(conf)
  5. def main(args: Array[String]): Unit = {
  6. System.setProperty("hadoop.home.dir", "H:\\winutils\\winutils-master\\hadoop-2.6.0")
  7. // 生成一个RDD[Int] 并且规定numSlices为1 这样就代表一个节点1
  8. val rdd1: RDD[Int] = sc.parallelize(Array(1, 2))
  9. rdd1.countByValue().foreach(println)
  10. // 输出结果
  11. //(1,1)
  12. //(2,1)
  13. }
  14. }

3.3.6 以key计算的 countByKey方法

countByKey详解

countByKey方法与countByValue 方法有本质的区别。countByKey是计算数组中元数据键值对key出现的个数

countByKey代码示例

  1. object ChapterRDD_countByKey {
  2. // 环境
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
  4. val sc: SparkContext = new SparkContext(conf)
  5. def main(args: Array[String]): Unit = {
  6. val rdd1: RDD[Int] = sc.parallelize(Array(5,5,6,6,7,7,5))
  7. // 1 map 把每一个元素变成 (element, 1)
  8. // 2 countByKey 进行key的计算
  9. val map: collection.Map[Int, Long] = rdd1.map(((_: Int),1)).countByKey()
  10. println(map)
  11. // 输出结果
  12. //Map(6 -> 2, 7 -> 2, 5 -> 3)
  13. }
  14. }

3.3.7 除去数据集中重复项的distinct方法

distinct 详解

distinct 方法的作用是取出数据集中重复的项

distinct代码演示

rdd.distinct() // 去除重复项

3.3.8 过滤数据的filter方法

filter详解

filter是一个非常常用的方法 传递一个bool判断

filter代码演示

rdd.filter(bool) 保留满足bool判断的数据

3.3.9 flatMap & Map

flatMap & Map 详解

flatMap以行为单位 ,map以单个元素为单位,最终返回的就是一个数据集
flatmap会把结果里的集合展开。而map不会。
flatMap 就是先map 然后flat

flatMap & Map 代码示例

  1. object ChapterRDD_flatMap_Map {
  2. // 环境
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")
  4. val sc: SparkContext = new SparkContext(conf)
  5. def main(args: Array[String]): Unit = {
  6. var rdd1: RDD[Int] = sc.parallelize(Array(5,5,7,5))
  7. val resultMap: RDD[List[Int]] = rdd1.map((x: Int) =>List(x+2))
  8. resultMap.foreach(println)
  9. // 输出结果
  10. //List(7)
  11. //List(9)
  12. //List(7)
  13. //List(7)
  14. println("--------------------------")
  15. val resultFlatMap: RDD[Int] = rdd1.flatMap((x: Int) =>List(x+2))
  16. resultFlatMap.foreach(println)
  17. // 输出结果
  18. //7
  19. //7
  20. //7
  21. //9
  22. }
  23. }

分组数据的groupBy方法