概念
函数声明:
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}
作用: 针对每个K, 将V进行合并成C, 得到
createCombiner: 将读进来的数据进行初始化,当前的值作为参数,可以对这个值做一一些转换操作.转换为我们想要的数据格式. 比如说我RDD类型的keyvalue类型.我需要对这些值做个转换.
combineByKey会遍历分区中的每个key-value对. 如果第一次碰到这个key, 则调用createCombiner函数,传入value, 得到一个C类型的值.(如果不是第一次碰到这个 key, 则不会调用这个方法)
mergeValue: 如果不是第一个遇到这个key, 则调用这个函数进行合并操作. 分区内合并
mergeCombiners 跨分区合并相同的key的值(C). 跨分区合并
案例
求出每一个学生的平均成绩
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Transformation_combineByKey {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//需求:求出每一个学生的平均成绩
//创建RDD
val scoreRDD: RDD[(String, Int)] = sc.makeRDD(List(("zhangsan", 90), ("lisi", 60), ("zhangsan", 96), ("lisi", 62), ("zhangsan", 100), ("lisi", 50)))
//createCombiner: V => C, 对RDD中当前key取出第一个value做一个初始化
//mergeValue: (C, V) => C, 分区内计算规则,主要在分区内进行,将当前分区的value值,合并到初始化得到的c上面
//mergeCombiners: (C, C) => C 分区间计算规则
// 0---("zhangsan",90),("zhangsan",95)
// 1---("zhangsan",100)
val combineRDD: RDD[(String, (Int, Int))] = scoreRDD.combineByKey(
(_, 1), //对取出的第一个value做初始化
(t1: (Int, Int), v) => {
(t1._1 + v, t1._2 + 1)
},
(t2: (Int, Int), t3: (Int, Int)) => {
(t2._1 + t3._1, t2._2 + t3._2)
}
)
//求平均成绩
val resRDD: RDD[(String, Int)] = combineRDD.map {
case (name, (score, count)) => {
(name, score / count)
}
}
resRDD.collect().foreach(println)
/* 输出:
(zhangsan,95)
(lisi,57)
*/
// 关闭连接
sc.stop()
}
}