概念

概念: 以分区为单位执行map算子

假如说rdd里面有10个元素,那么map算子执行10次.


mapPartitions是按分区处理数据 ,有两个分区就执行两次mapPartitions算子,mapPartitions有点批量处理的意思.
假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
其实mapPartitions和map处理次数是一样的,mapPartitions是一次给一个分区的数据取出来作为函数来处理,在底层再依次对每个元素进行处理..

mapPartitions适合批量操作.
使用场景之一比如说从MySQL里面拉数据,mapPartitions每个分区执行一次连接MySQL,这样的话,MySQL连接消耗会少一些.
如果是map的话,一个元素连接一次MySQL,那样性能就不太好了.

注意:
如果分区的数据量太大的话需要考虑OOM问题, 如果内存紧张的话,还是建议用map.具体需要根据业务场景来判断.
**

案例

需求: 对分区里面的每个数据都乘以2
image.png

  1. package com.atguigu.spark.day03
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object Spark01_Transformation_mapPartitions {
  5. def main(args: Array[String]): Unit = {
  6. //创建SparkConf并设置App名称
  7. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  8. //创建SparkContext,该对象是提交Spark App的入口
  9. val sc: SparkContext = new SparkContext(conf)
  10. val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
  11. //以分区为单位,对RDD中的元素进行映射
  12. //一般适用于批处理的操作,比如:将RDD中的元素插入到数据库中,需要数据库连接,
  13. // 如果每一个元素都创建一个连接,效率很低;可以对每个分区的元素,创建一个连接
  14. //val newRDD: RDD[Int] = rdd.mapPartitions(_.map(_ * 2))
  15. // datas 是当前分区数的数据, 如果你想对当前分区数的数据进行操作的话,去需要用map对分区内的数据进行操作
  16. // datas是自己命名的变量名,你可以随便命名
  17. val newRDD: RDD[Int] = rdd.mapPartitions(datas => {
  18. datas.map(elem => elem * 2)
  19. })
  20. //输出newRDD
  21. newRDD.collect().foreach(println)
  22. // 关闭连接
  23. sc.stop()
  24. }
  25. }