概念

函数声明:

  1. def combineByKey[C](
  2. createCombiner: V => C,
  3. mergeValue: (C, V) => C,
  4. mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
  5. combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
  6. partitioner, mapSideCombine, serializer)(null)
  7. }

作用: 针对每个K, 将V进行合并成C, 得到

createCombiner: 将读进来的数据进行初始化,当前的值作为参数,可以对这个值做一一些转换操作.转换为我们想要的数据格式. 比如说我RDD类型的keyvalue类型.我需要对这些值做个转换.
combineByKey会遍历分区中的每个key-value对. 如果第一次碰到这个key, 则调用createCombiner函数,传入value, 得到一个C类型的值.(如果不是第一次碰到这个 key, 则不会调用这个方法)
mergeValue: 如果不是第一个遇到这个key, 则调用这个函数进行合并操作. 分区内合并
mergeCombiners 跨分区合并相同的key的值(C). 跨分区合并

案例

求出每一个学生的平均成绩

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Transformation_combineByKey {
  4. def main(args: Array[String]): Unit = {
  5. //创建SparkConf并设置App名称
  6. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  7. //创建SparkContext,该对象是提交Spark App的入口
  8. val sc: SparkContext = new SparkContext(conf)
  9. //需求:求出每一个学生的平均成绩
  10. //创建RDD
  11. val scoreRDD: RDD[(String, Int)] = sc.makeRDD(List(("zhangsan", 90), ("lisi", 60), ("zhangsan", 96), ("lisi", 62), ("zhangsan", 100), ("lisi", 50)))
  12. //createCombiner: V => C, 对RDD中当前key取出第一个value做一个初始化
  13. //mergeValue: (C, V) => C, 分区内计算规则,主要在分区内进行,将当前分区的value值,合并到初始化得到的c上面
  14. //mergeCombiners: (C, C) => C 分区间计算规则
  15. // 0---("zhangsan",90),("zhangsan",95)
  16. // 1---("zhangsan",100)
  17. val combineRDD: RDD[(String, (Int, Int))] = scoreRDD.combineByKey(
  18. (_, 1), //对取出的第一个value做初始化
  19. (t1: (Int, Int), v) => {
  20. (t1._1 + v, t1._2 + 1)
  21. },
  22. (t2: (Int, Int), t3: (Int, Int)) => {
  23. (t2._1 + t3._1, t2._2 + t3._2)
  24. }
  25. )
  26. //求平均成绩
  27. val resRDD: RDD[(String, Int)] = combineRDD.map {
  28. case (name, (score, count)) => {
  29. (name, score / count)
  30. }
  31. }
  32. resRDD.collect().foreach(println)
  33. /* 输出:
  34. (zhangsan,95)
  35. (lisi,57)
  36. */
  37. // 关闭连接
  38. sc.stop()
  39. }
  40. }