概念
作用: 和mapPartitions(func)类似. 但是会给func多提供一个Int值来表示分区的索引. 所以func的类型是:(Int, Iterator) => Iterator
不光以分区为单位进行操作,还带分区的编号.mapPartitionsWithIndex算子和mapPartitions运算效果是一样的,只不过比mapPartitions算子多了一个分区编号.
需求:
1.创建一个RDD,使每个元素跟所在的分区号形成一个元祖,组成一个新的RDD
**
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Desc: 转换算子-mapPartitionsWithIndex
* 以分区为单位,对RDD中的元素进行映射,并且带分区编号
*/
object Spark02_Transformation_mapPartitionsWithIndex {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8),3)
//一个是当前分区编号,另外一个是分区里面的数据
val newRDD: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(
//index是当前分区编号 , datas 当前分区数据
(index, datas) => {
datas.map((index, _))
}
)
newRDD.collect().foreach(println)
// 关闭连接
sc.stop()
}
}
需求二:
第二个分区的元素每个元素乘以2 ,其余的元素不变.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Desc: 转换算子-mapPartitionsWithIndex
* 以分区为单位,对RDD中的元素进行映射,并且带分区编号
*/
object Spark02_Transformation_mapPartitionsWithIndex {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8),3)
//一个是当前分区编号,另外一个是分区里面的数据
//需求:第二个分区数据*2,其余分区数据保持不变
val newRDD: RDD[Int] = rdd.mapPartitionsWithIndex {
//index 当前分区的编号, datas分区的数据
(index, datas) => {
//用模式匹配去匹配第二个分区
index match {
// 如果是第二个分区(分区是从0开始计算的)
case 1 => datas.map(_ * 2)
// 否则就是原来分区数据
case _ => datas
}
}
}
newRDD.collect().foreach(println)
// 关闭连接
sc.stop()
}
}