概念
作用: 和mapPartitions(func)类似. 但是会给func多提供一个Int值来表示分区的索引. 所以func的类型是:(Int, Iterator) => Iterator
不光以分区为单位进行操作,还带分区的编号.mapPartitionsWithIndex算子和mapPartitions运算效果是一样的,只不过比mapPartitions算子多了一个分区编号.
需求:
1.创建一个RDD,使每个元素跟所在的分区号形成一个元祖,组成一个新的RDD
**
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/*** Desc: 转换算子-mapPartitionsWithIndex* 以分区为单位,对RDD中的元素进行映射,并且带分区编号*/object Spark02_Transformation_mapPartitionsWithIndex {def main(args: Array[String]): Unit = {//创建SparkConf并设置App名称val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")//创建SparkContext,该对象是提交Spark App的入口val sc: SparkContext = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8),3)//一个是当前分区编号,另外一个是分区里面的数据val newRDD: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(//index是当前分区编号 , datas 当前分区数据(index, datas) => {datas.map((index, _))})newRDD.collect().foreach(println)// 关闭连接sc.stop()}}
需求二:
第二个分区的元素每个元素乘以2 ,其余的元素不变.
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/*** Desc: 转换算子-mapPartitionsWithIndex* 以分区为单位,对RDD中的元素进行映射,并且带分区编号*/object Spark02_Transformation_mapPartitionsWithIndex {def main(args: Array[String]): Unit = {//创建SparkConf并设置App名称val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")//创建SparkContext,该对象是提交Spark App的入口val sc: SparkContext = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8),3)//一个是当前分区编号,另外一个是分区里面的数据//需求:第二个分区数据*2,其余分区数据保持不变val newRDD: RDD[Int] = rdd.mapPartitionsWithIndex {//index 当前分区的编号, datas分区的数据(index, datas) => {//用模式匹配去匹配第二个分区index match {// 如果是第二个分区(分区是从0开始计算的)case 1 => datas.map(_ * 2)// 否则就是原来分区数据case _ => datas}}}newRDD.collect().foreach(println)// 关闭连接sc.stop()}}
