Transformation 和 Action 常用算子

一、Transformation1.1 map1.2 filter1.3 flatMap1.4 mapPartitions1.5 mapPartitionsWithIndex1.6 sample1.7 union1.8 intersection1.9 distinct1.10 groupByKey1.11 reduceByKey1.12 sortBy & sortByKey 1.13 join1.14 cogroup1.15 cartesian1.16 aggregateByKey

二、Action2.1 reduce2.2 takeOrdered2.3 countByKey2.4 saveAsTextFile

一、Transformation

spark 常用的 Transformation 算子如下表:

Transformation 算子 Meaning(含义)
map(func) 对原 RDD 中每个元素运用 func 函数,并生成新的 RDD
filter(func) 对原 RDD 中每个元素使用func 函数进行过滤,并生成新的 RDD
flatMap(func) 与 map 类似,但是每一个输入的 item 被映射成 0 个或多个输出的 items( func 返回类型需要为 Seq )。
mapPartitions(func) 与 map 类似,但函数单独在 RDD 的每个分区上运行, func函数的类型为 Iterator => Iterator ,其中 T 是 RDD 的类型,即 RDD[T]
mapPartitionsWithIndex(func) 与 mapPartitions 类似,但 func 类型为 (Int, Iterator) => Iterator ,其中第一个参数为分区索引
sample(withReplacement, fraction, seed) 数据采样,有三个可选参数:设置是否放回(withReplacement)、采样的百分比(fraction)、随机数生成器的种子(seed);
union(otherDataset) 合并两个 RDD
intersection(otherDataset) 求两个 RDD 的交集
distinct([numTasks])) 去重
groupByKey([numTasks]) 按照 key 值进行分区,即在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, Iterable)
Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey
aggregateByKey
性能会更好
Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传入 numTasks
参数进行修改。
reduceByKey(func, [numTasks]) 按照 key 值进行分组,并对分组后的数据执行归约操作。
aggregateByKey(zeroValue,numPartitions)(seqOp, combOp, [numTasks]) 当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和 zeroValue 聚合每个键的值。与 groupByKey 类似,reduce 任务的数量可通过第二个参数进行配置。
sortByKey([ascending], [numTasks]) 按照 key 进行排序,其中的 key 需要实现 Ordered 特质,即可比较
join(otherDataset, [numTasks]) 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,等价于内连接操作。如果想要执行外连接,可以使用 leftOuterJoin
, rightOuterJoin
fullOuterJoin
等算子。
cogroup(otherDataset, [numTasks]) 在一个 (K, V) 对的 dataset 上调用时,返回一个 (K, (Iterable, Iterable)) tuples 的 dataset。
cartesian(otherDataset) 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) 类型的 dataset(即笛卡尔积)。
coalesce(numPartitions) 将 RDD 中的分区数减少为 numPartitions。
repartition(numPartitions) 随机重新调整 RDD 中的数据以创建更多或更少的分区,并在它们之间进行平衡。
repartitionAndSortWithinPartitions(partitioner) 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并对分区中的数据按照 key 值进行排序。这比调用 repartition
然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作所在的机器。

下面分别给出这些算子的基本使用示例:

1.1 map

map(func:RDD元素类型 => B):一对一映射[原RDD一个元素计算得到新RDD一个元素]

  • map里面的函数是针对每个元素操作,元素有多少个,函数就执行多少次;
  • map生产新RDD元素个数 = 原RDD元素个数;
  • map的使用场景:一般用于数据类型/值的转换
  1. val list = List(1,2,3)
  2. sc.parallelize(list).map(_ * 10).foreach(println)
  3. // 输出结果: 10 20 30 (这里为了节省篇幅去掉了换行,后文亦同)

1.2 mapPartitions

mapPartitions(func:Iterator[RDD元素类型] => Iterator[B]):与 map 类似,但函数单独在 RDD 的每个分区上运行, func函数的类型为 Iterator<T> => Iterator<U> (其中 T 是 RDD 的类型),即输入和输出都必须是可迭代类型。
使用场景:一般用于mysql/redis/hbase等存储介质中查询数据,此时可以避免资源链接的创建与销毁的次数。

  1. val list = List(1, 2, 3, 4, 5, 6)
  2. sc.parallelize(list, 3).mapPartitions(iterator => {
  3. val buffer = new ListBuffer[Int]
  4. while (iterator.hasNext) {
  5. buffer.append(iterator.next() * 100)
  6. }
  7. buffer.toIterator
  8. }).foreach(println)
  9. //输出结果
  10. 100 200 300 400 500 600

map()与mapPartitions()的区别

  1. 函数针对的对象不一样;
    • map里面的函数时针对每个元素操作
    • mapPartitions的函数是针对每个分区的数据迭代器操作
  2. 函数的返回值不一样;
    • map里面的函数是针对每个元素操作,操作完之后返回新RDD一个元素,多以新RDD元素个数 = 原RDD元素个数
    • mapPartitions的函数是针对每个分区的数据迭代器操作,操作完之后返回新RDD一个分区数据封装的迭代器,所以新RDD元素不一定等于原RDD元素个数
  3. 内存回收的时机不一样。
    • map里面的函数是针对每个元素操作,元素操作完成之后就可以垃圾回收
    • mapPartitions的函数是针对每个分区的数据迭代器操作,必须等到迭代器中所有元素都处理完成之后才能垃圾回收,所以如果RDD分区数据量很大,可能出现内存溢出,此时可以用map代替

1.3 mapPartitionsWithIndex

mapPartitionsWithIndex(func:(Int,Iterator[RDD元素类型]) => Iterator[B]):与 mapPartitions 类似,但 func 类型为 (Int, Iterator<T>) => Iterator<U> ,其中第一个参数为分区索引。

  1. val list = List(1, 2, 3, 4, 5, 6)
  2. sc.parallelize(list, 3).mapPartitionsWithIndex((index, iterator) => {
  3. val buffer = new ListBuffer[String]
  4. while (iterator.hasNext) {
  5. buffer.append(index + "分区:" + iterator.next() * 100)
  6. }
  7. buffer.toIterator
  8. }).foreach(println)
  9. //输出
  10. 0 分区:100
  11. 0 分区:200
  12. 1 分区:300
  13. 1 分区:400
  14. 2 分区:500
  15. 2 分区:600

mapPartitions()与mapPartitionsWithIndex()的区别:

mapPartitionsWithIndex的函数相比mapPartitions多了一个分区号的参数。

1.4 filter

filter(func:RDD元素类型 => Boolean):按照指定条件过滤

  • filter里面的函数是针对每个元素操作,元素有多少个,函数执行多少次;
  • filter是保留函数返回值为true的数据;
  • filter操作分区不变,但分区内的数据可能不均衡,生产环境下,可能会导致数据倾斜。
  1. val list = List(3, 6, 9, 10, 12, 21)
  2. sc.parallelize(list).filter(_ >= 10).foreach(println)
  3. // 输出: 10 12 21

1.5 flatMap

flatMap(func)map 类似,但每一个输入的 item 会被映射成 0 个或多个输出的 items( func 返回类型需要为 Seq)。

  1. val list = List(List(1, 2), List(3), List(), List(4, 5))
  2. sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)
  3. // 输出结果 : 10 20 30 40 50

flatMap 这个算子在日志分析中使用概率非常高,这里进行一下演示:拆分输入的每行数据为单个单词,并赋值为 1,代表出现一次,之后按照单词分组并统计其出现总次数,代码如下:

  1. val lines = List("spark flume spark",
  2. "hadoop flume hive")
  3. sc.parallelize(lines).flatMap(line => line.split(" ")).
  4. map(word=>(word,1)).reduceByKey(_+_).foreach(println)
  5. // 输出:
  6. (spark,2)
  7. (hive,1)
  8. (hadoop,1)
  9. (flume,2)

1.6 sample

数据采样。有三个可选参数:设置是否放回 (withReplacement)、采样的百分比 (fraction)、随机数生成器的种子 (seed) :

  1. val list = List(1, 2, 3, 4, 5, 6)
  2. sc.parallelize(list).sample(withReplacement = false, fraction = 0.5).foreach(println)

1.7 union

union:并集

  • 分区合并;
  • 数据合并;
  • 两个RDD的数据类型必须保持一致,否则编译时报错。
  1. val list1 = List(1, 2, 3)
  2. val list2 = List(4, 5, 6)
  3. sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println)
  4. // 输出: 1 2 3 4 5 6

1.8 intersection

intersection:交集

  • 数据打乱重组,有shuffle过程;
  • 两个RDD的数据类型必须保持一致,否则编译时报错;
  • 返回的RDD的分区数量保留两个RDD最大的分区数量。
  1. val list1 = List(1, 2, 3, 4, 5)
  2. val list2 = List(4, 5, 6)
  3. sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println)
  4. // 输出: 4 5

1.9 subtract

subtract:差集

  • 有数据打乱重组过程,有shuffle过程;
  • 两个RDD的数据类型必须保持一致,否则编译时报错;
  • 返回的RDD的分区数量等于调用这个方法的RDD的分区数量;
  • 返回当前RDD除去和参数RDD共同的数据集。
    1. val list1 = List(1, 2, 3, 4, 5)
    2. val list2 = List(4, 5, 6)
    3. sc.parallelize(list1).subtract(sc.parallelize(list2)).foreach(println)
    4. // 输出: 1 2 3

1.10 distinct

distinct:去重,会产生shuffle操作

  1. val list = List(1, 2, 2, 4, 4)
  2. sc.parallelize(list).distinct().foreach(println)
  3. // 输出: 4 1 2

1.11 groupBy & groupByKey

groupBy(func:RDD元素类型 => K):按照指定字段分组

  • groupBy里面的函数是针对每个元素操作,元素有多少个,函数就执行多少次;
  • groupBy是根据函数的返回值对元素进行分区;
  • groupBy生成的RDD元素是KV键值对,K就是函数的返回值,V是K对应原RDD所有元素的集合;
  • groupBY会产生shuffle操作。

存在的问题:groupBy方法会导致数据重新组合以后不均匀;
解决方案:通过传递参数,改变下游分区的数量

  1. /*
  2. 1.一个组的数据在一个分区,但是并不是说一个分区中只有一个组
  3. 奇偶分组,将数据分成两个组,结果文件中只有一个分区文件,分区文件中有两个分组。
  4. */
  5. val list = List(1,2,3,4,5,6,7,8)
  6. val rdd: RDD[Int] = sc.makeRDD(list,1)
  7. val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
  8. rdd1.saveAsTextFile("output")
  9. /*
  10. 2.当前有4个分区,奇偶分组只会有两个分组,所以结果文件中有4个分区文件,但是有两个分区分件中没有数据
  11. */
  12. val rdd: RDD[Int] = sc.makeRDD(list,4)
  13. val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
  14. rdd1.saveAsTextFile("output")
  15. /*
  16. 3.通过设置下游的分区数量解决分区无数据的情况,此时生成的结果文件只有两个分区
  17. */
  18. val rdd: RDD[Int] = sc.makeRDD(list,4)
  19. val rdd1: RDD[(Int, Iterable[Int])] = rdd.groupBy(((num :Int) => num % 2),2)
  20. rdd1.saveAsTextFile("output")

groupByKey:根据key对元素进行分组

  • groupByKey生成的新RDD元素类型是KV键值对,K是原RDD元素的key,V是K对应原RDD的所有value值集合
  1. val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
  2. sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println)
  3. //输出:
  4. (spark,List(3, 5))
  5. (hadoop,List(2, 2))
  6. (storm,List(6))

1.12 reduceByKey

reduceByKey(func:(Value值类型,Value值类型) => Value值类型):根据key分组,对该key所有value聚合

  • reduceByKey的函数时针对每个key所有value值聚合;
  • reduceByKey函数第一个参数代表该组上一次聚合结果,第一次聚合初始值 = 该组第一个value值;
  • reduceByKey函数第二个参数代表该组待聚合的value值。
  1. val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
  2. sc.parallelize(list).reduceByKey(_ + _).foreach(println)
  3. //输出
  4. (spark,8)
  5. (hadoop,4)
  6. (storm,6)

groupbyKey和reduceByKey的区别

  1. 算子的作用
    1. reduceByKey:根据key进行分组,对相同的key的value进行操作
    2. groupByKey:对key进行分组
  2. groupByKey
    1. 对一个分区的数据分区后不能继续执行后续操作,需要等到其他分区的数据全部到达后,才能执行后续的操作
    2. groupByKey是面向整个数据集,而不是面向一个分区
    3. groupByKey没有combiner预聚合操作
  3. reduceByKey
    1. 分区内和分区间的规则相同
    2. reduceByKey有预聚合操作,性能比groupByKey要高,后续工作中推荐使用reduceByKey这种高性能的shuffle算子。

1.13 sortBy & sortByKey

sortByKey:根据key对元素排序

  1. val list01 = List((100, "hadoop"), (90, "spark"), (120, "storm"))
  2. sc.parallelize(list01).sortByKey(ascending = false).foreach(println)
  3. // 输出
  4. (120,storm)
  5. (90,spark)
  6. (100,hadoop)

sortBy(func:RDD元素类型 => K):按照指定字段排序

  • sortBy里面的函数是针对每个元素操作,元素有多少个,函数就执行多少次;
  • sortBy是根据函数的返回值对RDD元素排序;
  • sortBy会产生shuffle操作。
  1. val list02 = List(("hadoop",100), ("spark",90), ("storm",120))
  2. sc.parallelize(list02).sortBy(x=>x._2,ascending=false).foreach(println)
  3. // 输出
  4. (storm,120)
  5. (hadoop,100)
  6. (spark,90)

1.14 join

在一个 (K, V) 和 (K, W) 类型的 Dataset 上调用时,返回一个 (K, (V, W)) 的 Dataset,等价于内连接操作。如果想要执行外连接,可以使用 leftOuterJoin, rightOuterJoinfullOuterJoin 等算子。

  1. val list01 = List((1, "student01"), (2, "student02"), (3, "student03"))
  2. val list02 = List((1, "teacher01"), (2, "teacher02"), (3, "teacher03"))
  3. sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println)
  4. // 输出
  5. (1,(student01,teacher01))
  6. (3,(student03,teacher03))
  7. (2,(student02,teacher02))

1.15 cogroup

cogroup:按照key分组之后再全连接

  1. val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e"))
  2. val list02 = List((1, "A"), (2, "B"), (3, "E"))
  3. val list03 = List((1, "[ab]"), (2, "[bB]"), (3, "eE"),(3, "eE"))
  4. sc.parallelize(list01).cogroup(sc.parallelize(list02),sc.parallelize(list03)).foreach(println)
  5. // 输出: 同一个 RDD 中的元素先按照 key 进行分组,然后再对不同 RDD 中的元素按照 key 进行分组
  6. (1,(CompactBuffer(a, a),CompactBuffer(A),CompactBuffer([ab])))
  7. (3,(CompactBuffer(e),CompactBuffer(E),CompactBuffer(eE, eE)))
  8. (2,(CompactBuffer(b),CompactBuffer(B),CompactBuffer([bB])))

1.16 cartesian

计算笛卡尔积:

  1. val list1 = List("A", "B", "C")
  2. val list2 = List(1, 2, 3)
  3. sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println)
  4. //输出笛卡尔积
  5. (A,1)
  6. (A,2)
  7. (A,3)
  8. (B,1)
  9. (B,2)
  10. (B,3)
  11. (C,1)
  12. (C,2)
  13. (C,3)

1.17 aggregateByKey

aggregateByKey(默认值)(combinerFunc:(默认值类型,Value值类型) => 默认值类型,reduceFunc:(默认值类型,Value值类型) => 默认值类型):根据key分组,对该key所有value值聚合

  • combinerFunc是combiner预聚合逻辑,reduceFunc是reducer聚合逻辑;
  • combinerFunc函数第一个参数代表该组上一次聚合结果,第一次聚合值 = 默认值;
  • combinerFunc函数第二个参数代表该组带聚合的value值;
  • reduceFunc函数第一个参数代表该组上一次聚合结果,第一次聚合初始值 = 该组第一个value值;
  • reduceFunc函数第二个参数代表该组带聚合的value值。

当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和 zeroValue 聚合每个键的值。与 groupByKey 类似,reduce 任务的数量可通过第二个参数 numPartitions 进行配置。
使用场景:当出现分区内和分区间对数据处理的规则不一样时,可以使用这个算子
示例如下:

  1. // 为了清晰,以下所有参数均使用具名传参
  2. val list = List(("hadoop", 3), ("hadoop", 2), ("spark", 4), ("spark", 3), ("storm", 6), ("storm", 8))
  3. sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions = 3)(
  4. seqOp = math.max(_, _),
  5. combOp = _ + _
  6. ).collect.foreach(println)
  7. //输出结果:
  8. (hadoop,3)
  9. (storm,8)
  10. (spark,7)

这里使用了 numSlices = 2 指定 aggregateByKey 父操作 parallelize 的分区数量为 2,其执行流程如下:

Spark_Transformation和Action算子 - 图1

基于同样的执行流程,如果 numSlices = 1,则意味着只有输入一个分区,则其最后一步 combOp 相当于是无效的,执行结果为:

  1. (hadoop,3)
  2. (storm,8)
  3. (spark,4)

同样的,如果每个单词对一个分区,即 numSlices = 6,此时相当于求和操作,执行结果为:

  1. (hadoop,5)
  2. (storm,14)
  3. (spark,7)

aggregateByKey(zeroValue = 0,numPartitions = 3) 的第二个参数 numPartitions 决定的是输出 RDD 的分区数量,想要验证这个问题,可以对上面代码进行改写,使用 getNumPartitions 方法获取分区数量:

  1. sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)(
  2. seqOp = math.max(_, _),
  3. combOp = _ + _
  4. ).getNumPartitions

Spark_Transformation和Action算子 - 图2

1.18 repartition

repartition(分区数):重分区

  • repartition既可以增大分区数也可以减少分区数,但是都会产生shuffle;
  • reparation一般用于增大分区数,当数据量膨胀的时候需要增大分区数提高运行效率。 ```scala val list = List(1, 2, 3, 5, 4, 12, 3, 1,6) val rdd: RDD[Int] = sc.makeRDD(list,3) val rdd1: RDD[Int] = rdd.distinct() / output1: 分区0:6 3 12 分区1:4 1 分区2:5 2 /

val rdd2: RDD[Int] = rdd1.coalesce(2) / output2:同一分区的数据还在一起 分区0:6 3 12 4 1 分区1:5 2 /

val rdd3: RDD[Int] = rdd1.repartition(2) / output3:数据从原来的分区打乱重组 分区0:6 12 1 5 分区1:3 4 2 /

  1. <a name="F0kJn"></a>
  2. ### 1.19 partitionBy
  3. `partitionBy(Partitioner)`:根据指定分区器重分区
  4. ```scala
  5. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  6. val sc = new SparkContext(sparkConf)
  7. val list = List((1, "a"), (2, "b"), (2, "c"))
  8. val rdd: RDD[(Int, String)] = sc.makeRDD(list, 3)
  9. // rdd.saveAsTextFile("output1")
  10. val rdd1: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(3))
  11. // rdd1.saveAsTextFile("output2")
  12. val list1 = List(
  13. ("cba", "消息1"),
  14. ("nba", "消息5"),
  15. ("wnba", "消息10"),
  16. ("cba", "消息2"),
  17. ("nba", "消息2"),
  18. ("wnba", "消息6"),
  19. ("cba", "消息1"),
  20. )
  21. val rddInfo: RDD[(String, String)] = sc.makeRDD(list1,2)
  22. val partitionRDD: RDD[(String, String)] = rddInfo.partitionBy(new MyPartitioner(2))
  23. partitionRDD.saveAsTextFile("output")
  24. sc.stop()
  25. }
  26. //自定义分区器
  27. class MyPartitioner(num:Int) extends Partitioner {
  28. override def numPartitions: Int = num
  29. override def getPartition(key: Any): Int = {
  30. key match {
  31. case "nba" => 0
  32. case _ => 1
  33. }
  34. }
  35. }

自定义分区器

  1. 创建一个class继承Partitioner
  2. 重写抽象方法
  3. 后续可以创建自定义对象放入groupBy、reduceByKey等shuffle算子中使用

1.20 zip

zip:拉链

  • 只有两个RDD元素个数与分区数都一样的时候才能拉链;
  • 返回的RDD数据是元组.
    1. val result4: RDD[(Int, Int)] = rdd1.zip(rdd2)

1.21 mapValues

mapValues(func:Value值类型 => B):一对一映射(原RDD一个元素的value值计算得到新RDD元素value值,key保持不变)

  • mapValues的函数是针对每个元素的value值操作
    1. val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("b",1)))
    2. println(rdd1.mapValues(_ + 1).collect().mkString(","))

二、Action

所谓行动算子,就是不会产生一个新的RDD,而是触发作业的执行;而之前的转换算子,只是功能的扩展和包装,不会触发作业的执行;
行动算子执行以后,会获取当前作业的执行结果,spark的行动算子执行时,会产生job对象,然后提交job对象。

Spark 常用的 Action 算子如下:

Action(动作) Meaning(含义)
reduce(func) 使用函数func执行归约操作
collect() 以一个 array 数组的形式返回 dataset 的所有元素,适用于小结果集。
count() 返回 dataset 中元素的个数。
first() 返回 dataset 中的第一个元素,等价于 take(1)。
take(n) 将数据集中的前 n 个元素作为一个 array 数组返回。
takeSample(withReplacement, num, [seed]) 对一个 dataset 进行随机抽样
takeOrdered(n, [ordering]) 按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 n 个元素。只适用于小结果集,因为所有数据都会被加载到驱动程序的内存中进行排序。
saveAsTextFile(path) 将 dataset 中的元素以文本文件的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。
saveAsSequenceFile(path) 将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。该操作要求 RDD 中的元素需要实现 Hadoop 的 Writable 接口。对于 Scala 语言而言,它可以将 Spark 中的基本数据类型自动隐式转换为对应 Writable 类型。(目前仅支持 Java and Scala)
saveAsObjectFile(path) 使用 Java 序列化后存储,可以使用 SparkContext.objectFile()
进行加载。(目前仅支持 Java and Scala)
countByKey() 计算每个键出现的次数。
foreach(func) 遍历 RDD 中每个元素,并对其执行fun函数

2.1 collect

collect:收集RDD所有分区的数据以数组形式封装发给Driver

  • 如果RDD数据量比较大,Driver内存默认只有1G,可能出现内存溢出,所以工作中一般需要设置Driver的内存为5-10G左右,可以通过bin/spark-submit —driver-memory 5G设置。

2.2 count

count:统计RDD元素个数


2.3 first

first:获取RDD第一个元素

  • first会首先启动一个job从0号分区获取第一个元素,如果0号分区没有数据会再启动一个job从其他分区获取数据;
  • first有可能触发两次job执行

2.4 reduce

使用函数func执行归约操作:

  1. val list = List(1, 2, 3, 4, 5)
  2. sc.parallelize(list).reduce((x, y) => x + y)
  3. sc.parallelize(list).reduce(_ + _)
  4. // 输出 15

2.5 take

take:获取RDD前N个元素

  • take会首先启动一个job从0号分区获取前N个元素,如果0号分区不满N条数据会再启动一个job从其他分区获取数据;
  • take有可能触发两次job执行

2.6 takeOrdered

按自然顺序(natural order)或自定义比较器(custom comparator)排序后返回前 n 个元素。需要注意的是 takeOrdered 使用隐式参数进行隐式转换,以下为其源码。所以在使用自定义排序时,需要继承 Ordering[T] 实现自定义比较器,然后将其作为隐式参数引入。

  1. def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  2. .........
  3. }

自定义规则排序:

  1. // 继承 Ordering[T],实现自定义比较器,按照 value 值的长度进行排序
  2. class CustomOrdering extends Ordering[(Int, String)] {
  3. override def compare(x: (Int, String), y: (Int, String)): Int
  4. = if (x._2.length > y._2.length) 1 else -1
  5. }
  6. val list = List((1, "hadoop"), (1, "storm"), (1, "azkaban"), (1, "hive"))
  7. // 引入隐式默认值
  8. implicit val implicitOrdering = new CustomOrdering
  9. sc.parallelize(list).takeOrdered(5)
  10. // 输出: Array((1,hive), (1,storm), (1,hadoop), (1,azkaban)

2.7 countByKey

countByKey:统计每个key的个数
底层源码:

  1. 调用mapValues算子,将V转换为1
  2. 然后再调用reduceByKey,将相同的key的value值进行相加
  3. 最后转换成map结构

self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap

  1. val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
  2. sc.parallelize(list).countByKey()
  3. // 输出: Map(hadoop -> 2, storm -> 2, azkaban -> 1)

2.8 saveAsTextFile

将 dataset 中的元素以文本文件的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将对每个元素调用 toString 方法,将元素转换为文本文件中的一行记录。

  1. val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
  2. sc.parallelize(list).saveAsTextFile("/usr/file/temp")

2.9 foreach

foreach(func:RDD元素类型 => Unit):Unit:对每个元素遍历

  • foreach的函数是针对每个元素操作

rdd.collect().foreach(println) —>foreach:方法
rdd.foreach(println) —>foreach:算子

  1. 只要看到rdd的算子,一定要想到两个块,Driver和Executor
  2. 算子逻辑代码是在分布式计算节点executor中执行的,算子以外代码是在Driver端执行
  3. foreach是算子时,那么将在不同的executor中同时执行,互不影响
  4. foreach是方法时,那么是在当前的节点的内存中完成数据的循环
  5. 结果就是:两种方法的结果的顺序会不同


    2.10 foreachPartition

foreachPartition(func: Iterator[RDD元素类型]=>Unit):Unit: 对每个分区遍历

  • foreachPartition是针对每个分区操作,分区有多少个,函数执行多少次
  • foreachPartition一般用于将数据写入mysql/hbase/redis等存储介质中,可以减少资源链接的创建与销毁的次数。

参考资料

RDD Programming Guide