概念

作用: 和mapPartitions(func)类似. 但是会给func多提供一个Int值来表示分区的索引. 所以func的类型是:(Int, Iterator) => Iterator

不光以分区为单位进行操作,还带分区的编号.mapPartitionsWithIndex算子和mapPartitions运算效果是一样的,只不过比mapPartitions算子多了一个分区编号.

需求:

1.创建一个RDD,使每个元素跟所在的分区号形成一个元祖,组成一个新的RDD
**

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * Desc: 转换算子-mapPartitionsWithIndex
  5. * 以分区为单位,对RDD中的元素进行映射,并且带分区编号
  6. */
  7. object Spark02_Transformation_mapPartitionsWithIndex {
  8. def main(args: Array[String]): Unit = {
  9. //创建SparkConf并设置App名称
  10. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  11. //创建SparkContext,该对象是提交Spark App的入口
  12. val sc: SparkContext = new SparkContext(conf)
  13. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8),3)
  14. //一个是当前分区编号,另外一个是分区里面的数据
  15. val newRDD: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(
  16. //index是当前分区编号 , datas 当前分区数据
  17. (index, datas) => {
  18. datas.map((index, _))
  19. }
  20. )
  21. newRDD.collect().foreach(println)
  22. // 关闭连接
  23. sc.stop()
  24. }
  25. }

需求二:


第二个分区的元素每个元素乘以2 ,其余的元素不变.

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * Desc: 转换算子-mapPartitionsWithIndex
  5. * 以分区为单位,对RDD中的元素进行映射,并且带分区编号
  6. */
  7. object Spark02_Transformation_mapPartitionsWithIndex {
  8. def main(args: Array[String]): Unit = {
  9. //创建SparkConf并设置App名称
  10. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  11. //创建SparkContext,该对象是提交Spark App的入口
  12. val sc: SparkContext = new SparkContext(conf)
  13. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8),3)
  14. //一个是当前分区编号,另外一个是分区里面的数据
  15. //需求:第二个分区数据*2,其余分区数据保持不变
  16. val newRDD: RDD[Int] = rdd.mapPartitionsWithIndex {
  17. //index 当前分区的编号, datas分区的数据
  18. (index, datas) => {
  19. //用模式匹配去匹配第二个分区
  20. index match {
  21. // 如果是第二个分区(分区是从0开始计算的)
  22. case 1 => datas.map(_ * 2)
  23. // 否则就是原来分区数据
  24. case _ => datas
  25. }
  26. }
  27. }
  28. newRDD.collect().foreach(println)
  29. // 关闭连接
  30. sc.stop()
  31. }
  32. }