transformation算子

单value类型

//一次处理一个元素
map(f: T => U): RDD[U]
//一次处理一个分区。优点:创建链接时比map效率高 缺点:把一个分区的数据全部加载到内存中处理,可能OOM
mapPartitions(f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
//获得分区编号(从0开始)
mapPartitionsWithIndex(f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false):

// 压扁
flatten:Array(Array(hello, h1, h2), Array(nihao, zs, lisi))>>>Array(hello, h1, h2, nihao, zs, lisi)
// flatMap = map + flatten
flatMap(f: T => TraversableOnce[U])

//一般使用场景:coalesce(false)减少分区、rePartition(true)扩大分区
coalesce(分区数,true/false):有、无shuffle 默认shuffle=false
rePartition(分区数): 有shuffle 底层是coalesce(true)

//把一个分区的数据转换成一个Array
glom(): RDD[Array[T]]
//过滤
filter(f: T => Boolean): RDD[T]
//去重、默认结果分区数=原RDD分区数,也可指定、有shuffle
distinct(numPartitions: Int)

//分组:原分组规则作为返回RDD的Key 原RDD的K作为返回RDD的迭代器、默认结果分区数=原RDD分区数,也可指定、有shuffle
groupByK: RDD[(K, Iterable[T])]
groupByK: RDD[(K, Iterable[T])]
//排序
sortByK => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)

双value类型

//union:并集
def union(other: RDD[T]): RDD[T]

//subtract:差集=左边RDD去除交集
def subtract(other: RDD[T]): RDD[T]

//intersection:交集=inner join
def intersection(other: RDD[T]): RDD[T]

//zip:两个RDD 分区数和元素个数必须相同
def zipU: ClassTag: RDD[(T, U)]

action算子

//分区间操作和分区内操作相同,在分区间操作时还生效一次
fold(zeroValue: T)(op: (T, T) => T): T
//分区间操作和分区内操作相同, 在分区间操作时还生效一次(3个分区 zerovalue=10:10*3+10 标红的10是分区间操作时生效的)
aggregate(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

reduce
collect
foreach
foreachPartition

//返回RDD中元素个数
count
//返回RDD中第一个元素
first
//返回由RDD前n个元素组成的数组
take

aggregate

  • 和aggregateByKey差不多,只不过不适用key-value型数据

image.png三个参数:
(1)zeroValue:给每一个分区中的每一个key一个初始值;
(2)seqOp:函数用于在每一个分区
(3)combOp:函数用于合并每个分区的结果。
作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合
然后用combine函数将每个分区的结果和初始值再进行combine操作

举栗子:创建一个RDD,将所有元素相加得到结果
var rdd = sc.makeRDD(1 to 10,2)
rdd.aggregate(0)(+,//分区内相加
+) // 两个分区相加
>>>res22: Int = 55
rdd.aggregate(1)(+,+) >>>res2: Int = 58 因为一共进行了三次初始值聚合

fold

  • aggregate的简化操作,seqop和combop一样

image.png
举栗子:创建一个RDD,将所有元素相加得到结果
var rdd = sc.makeRDD(1 to 10,2)
rdd.fold(0)(+)