常用算子图表

image.png
image.png

union

  1. // T:数据类型
  2. val rdd1: RDD[T] = _
  3. val rdd2: RDD[T] = _
  4. val rdd = rdd1.union(rdd2)
  5. // 或者rdd1 union rdd2
  6. // T:数据类型
  7. val rdd1: RDD[T] = _
  8. val rdd2: RDD[T] = _
  9. val rdd3: RDD[T] = _
  10. val rdd = (rdd1.union(rdd2)).union(rdd3)
  11. // 或者 val rdd = rdd1 union rdd2 union rdd3

给定两个RDD:rdd1和rdd2,其结果是两个RDD的并集

sample

sample(withReplacement, fraction, seed)

withReplacement: boolean 采样是否又放回,true:采样结果中可能包含重复的数据记录
fraction: Double,值域0-1,采样比例,结果集尺寸/原数据集尺寸
seed:Long,控制每次采样的结果是否一致

  1. // 生成0到99的整型数组
  2. val arr = (0 until 100).toArray
  3. // 使用parallelize生成RDD
  4. val rdd = sc.parallelize(arr)
  5. // 不带seed,每次采样结果都不同
  6. rdd.sample(false, 0.1).collect
  7. // 结果集:Array(11, 13, 14, 39, 43, 63, 73, 78, 83, 88, 89, 90)
  8. rdd.sample(false, 0.1).collect
  9. // 结果集:Array(6, 9, 10, 11, 17, 36, 44, 53, 73, 74, 79, 97, 99)
  10. // 带seed,每次采样结果都一样
  11. rdd.sample(false, 0.1, 123).collect
  12. // 结果集:Array(3, 11, 26, 59, 82, 89, 96, 99)
  13. rdd.sample(false, 0.1, 123).collect
  14. // 结果集:Array(3, 11, 26, 59, 82, 89, 96, 99)
  15. // 有放回采样,采样结果可能包含重复值
  16. rdd.sample(true, 0.1, 456).collect
  17. // 结果集:Array(7, 11, 11, 23, 26, 26, 33, 41, 57, 74, 96)
  18. rdd.sample(true, 0.1, 456).collect
  19. // 结果集:Array(7, 11, 11, 23, 26, 26, 33, 41, 57, 74, 96)

repartition

repartition(n)

用于调整RDD的并行度

RDD的并行度很大程度上决定了分布式系统中CPU的使用效率,进而会影响分布式系统并行计算的执行效率
把并行度调整为可用CPU的2~3倍(Executors数量为N,每个Executor配置的CPU个数为C,那么推荐并行度坐落在NC2到NC3这个范围)是个不错的开始,但真正合理的并行度,取决于系统可用资源,分布式数据集大小,以及执行内存

尽管repartition非常灵活,但一定会引入shuffle。如果你想增加并行度,那么repartition是不二之选;但如果你想降低并行度,那么coalesce可以在不引入shuffle的情况下进行

coalesce

image.png
具体来说,给定任意一条数据记录,repartition 的计算过程都是先哈希、再取模,得到的结果便是该条数据的目标分区索引。对于绝大多数的数据记录,目标分区往往坐落在另一个 Executor、甚至是另一个节点之上,因此 Shuffle 自然也就不可避免。coalesce 则不然,在降低并行度的计算中,它采取的思路是把同一个 Executor 内的不同数据分区进行合并,如此一来,数据并不需要跨 Executors、跨节点进行分发,因而自然不会引入 Shuffle。

first, take和collect

first 用于收集RDD数据集中的一条数据
take(n: Int) 用于收集多条记录
collect 拿到全量数据,把RDD的计算结果全量的收集到Driver端
collect两处性能隐患:拉取数据过程中的网络开销 + Driver的OOM,因此,如果想收集全量数据,可用saveAsTextFile

saveAsTextFile

image.png
直接通过Executors将RDD数据分区物化到文件系统,这个过程并不涉及与Driver端的任何交互