第一章.Transformation转换算子(Value类型)

1.map映射

  1. package com.atguigu.spark.day03
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.junit.Test
  4. class $01Transformation {
  5. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  6. /**
  7. * map(func:RDD中元素类型=>B):映射
  8. * map的使用场景:一对一,主要用于对每个元素进行类型/值的转换
  9. * map里面的函数是针对RDD 中的每个元素操作,每个元素操作完成会返回一个结果
  10. * rdd中有多少个元素,map中的函数就会调用多少次
  11. * val rdd2 = rdd1.map(..) rdd2的元素个数=rdd1的元素个数
  12. */
  13. @Test
  14. def map()={
  15. val rdd = sc.parallelize(List("hello", "spark", "hadoop", "flume"))
  16. println(rdd.map(x => {
  17. println(s"${Thread.currentThread().getName}->${x}")
  18. x.length
  19. }).collect().toList)
  20. }
  21. }

2.mapPartitions

  1. /**
  2. * Spark算子里面的代码是在Executor中执行的
  3. * Spark算子外面的代码是在Driver中执行的
  4. *
  5. * mapPartitions(func: Iterator[Rdd元素类型]=>Iterator[B])
  6. * mapPartitions里面的函数是针对每个分区操作
  7. * rdd有多少个分区,mapPartitions里面的函数就会调用多少次
  8. * mapPartitions里面函数的返回值是作为新的RDD分区的所有数据
  9. */
  10. @Test
  11. def mapPartitions()={
  12. /**
  13. * rdd1里面的数据是用户id
  14. * 用户详细信息存储在mySQL中
  15. * 需求:获取用户的详细信息[id,name,age]
  16. */
  17. val rdd1 = sc.parallelize(List(1, 4, 2, 6, 3, 7))
  18. val rdd2 = rdd1.map(id=>{
  19. var connection:Connection = null
  20. var statement:PreparedStatement = null
  21. connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test","root","321074" )
  22. statement = connection.prepareStatement("select * from person where id =?")
  23. var name:String = ""
  24. var age:Int = 0
  25. try{
  26. println(s"-------->${id}")
  27. statement.setInt(1,id)
  28. val resultSet = statement.executeQuery()
  29. while(resultSet.next()){
  30. name = resultSet.getString("name")
  31. age = resultSet.getInt("age")
  32. }
  33. }catch{
  34. case e:Exception =>e.printStackTrace()
  35. }finally {
  36. if(statement!=null)
  37. statement.close()
  38. if(connection!=null)
  39. connection.close()
  40. }
  41. (id,name,age)
  42. })
  43. println(rdd2.collect().toList)
  44. val rdd3 = rdd1.mapPartitions(it=>{
  45. var connection:Connection = null
  46. var statement:PreparedStatement = null
  47. connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test","root","321074")
  48. statement = connection.prepareStatement("select * from person where id =?")
  49. println(s"${connection}")
  50. var name:String = ""
  51. var age:Int = 0
  52. var list:List[(Int,String,Int)] = Nil
  53. try{
  54. it.foreach(id=>{
  55. statement.setInt(1,id)
  56. val resultSet = statement.executeQuery()
  57. while(resultSet.next()){
  58. name = resultSet.getString("name")
  59. age = resultSet.getInt("age")
  60. list = (id,name,age)::list
  61. }
  62. })
  63. }catch{
  64. case e:Exception=>
  65. }finally{
  66. if(statement!=null)
  67. statement.close()
  68. if(connection!=null)
  69. connection.close()
  70. }
  71. list.toIterator
  72. })
  73. println(rdd3.collect().toList)
  74. }

map与mapPartitions的区别

  • 函数针对的对象不一样
  • map里面的函数是针对RDD每个元素操作,RDD里面元素有多少个,函数就会调用多少次
  • mapPartitions里面的函数是针对RDD每个分区操作,RDD有多少个分区,函数就调用多少次
  • 函数的返回值不一样
  • map是一个元素返回一个结果,map新生产的RDD的元素个数=原RDD的元素个数
  • mapPartitions是返回一个迭代器,mapPartitions新RDD的元素个数可能与原RDD元素个数不一致
  • 内存回收时机不一样
  • map里面的函数是针对RDD每个元素操作,所以元素操作完成之后可以直接进行回收内存
  • mapPartitions里面的函数是针对RDD每个分区所有数据的迭代器操作,所以必须等到迭代器所有数据都处理完成才会回收内存,如果分区数据特别大,可能出现内存溢出,此时可以用map代替[完成比完美更重要

3.mapPartitionsWithIndex

  1. /**
  2. * mapPartitionswithIndex(func:(Int,Iterator[RDD元素类型])=Iterator[B] )
  3. * mapPartitionsWithIndex里面的函数也是针对RDD每个分区操作
  4. * mapPartitionsWithIndex与mapPartitions的区别
  5. * mapPartitions的函数没有分区号
  6. * mapPartitionsWithIndex的函数有分区号
  7. */
  8. @Test
  9. def mapPartitionsWithIndex()={
  10. val rdd1 = sc.parallelize(List(1, 4, 2, 6, 3, 7, 5, 10))
  11. val rdd2 = rdd1.mapPartitions(it => {
  12. println(s"data=${it.toList}")
  13. it
  14. })
  15. rdd2.collect()
  16. val rdd3 = rdd1.mapPartitionsWithIndex((index, it) => {
  17. //println(s"index=${index} data =${it.toList}")
  18. //it
  19. it.filter(_%2==0)
  20. })
  21. println(rdd3.collect().toList)
  22. }

4.flatMap

  1. /**
  2. * flatMap(func:RDD元素类型=>集合) = map + flatten
  3. * flatMap里面的函数也是针对每个元素操作,元素有多少个,函数就会调用多少次
  4. */
  5. @Test
  6. def flatMap()={
  7. val rdd1 = sc.parallelize(List("hello spark", "hello java"))
  8. val rdd2 = rdd1.flatMap(x => x.split(" "))
  9. println(rdd2.collect().toList)
  10. }

5.glom

  1. /**
  2. * glom:将RDD每个分区的数据用数组包装
  3. * glom新生成的RDD 元素个数 = 分区数
  4. */
  5. @Test
  6. def glom()={
  7. val rdd1 = sc.parallelize(List(1, 3, 4, 6, 8, 20, 30, 22, 11, 54))
  8. rdd1.mapPartitionsWithIndex((index,it)=>{
  9. println(s"index:${index} data:${it.toList}")
  10. it
  11. }).collect()
  12. val rdd2 = rdd1.glom()
  13. val arr = rdd2.collect()
  14. arr.foreach(x=>println(x.toList))
  15. }

6.groupBy

  1. /**
  2. * groupBy(func:RDD元素类型 => K):按照指定字段分组
  3. * groupBy是根据函数的返回值进行分组
  4. * groupBy生成的RDD里面元素是K,V键值对,K就是函数的返回值,v就是K对应原RDD所有元素的集合
  5. *
  6. * MR整个流程: 数据 -> InputFormat ->mapper[map方法] -> 环形缓冲区[80%,分区排序] ->combiner ->磁盘 ->reduce拉取数据[归并排序] ->reduce ->磁盘
  7. * MR shuffle阶段: 环形缓冲区[80%,分区排序] -> combiner ->磁盘 ->reduce拉取数据[归并排序]
  8. * Spark shuffle阶段: ->缓冲区[分区排序] ->combiner ->磁盘->分区拉取数据[归并排序]
  9. *
  10. */
  11. @Test
  12. def groupBy()={
  13. val rdd = sc.parallelize(List("hadoop","spark","hadoop","spark","flume","kafka"))
  14. val rdd2 = rdd.groupBy(x=>x)
  15. println(rdd2.collect().toList)
  16. Thread.sleep(100000)
  17. }

7.filter

  1. /**
  2. * filter(func:RDD元素类型=>Boolean):按照指定条件过滤
  3. * filter里面的函数也是针对每个元素操作
  4. * filter保留的是函数返回值为true的数据
  5. */
  6. @Test
  7. def filter()={
  8. val rdd = sc.parallelize(List(1,4,2,7,8,10))
  9. val rdd2 = rdd.filter(x=>x%2==0)
  10. println(rdd2.collect().toList)
  11. }

8.sample

  1. /**
  2. * sample(withReplacement,fraction[,seed]):采样
  3. * withReplacement:同一个元素是否可以被多次采样[true代表同一个元素可以被多次采样,false代表同一个元素最多被采样一次]
  4. * fraction:
  5. * withReplacement=true,fraction代表每个元素期望被采样的次数
  6. * withReplacement=false,fraction代表每个元素被采样的概率<一般设置为0.1-0.2>
  7. * seed:随机数种子
  8. * 应用场景:用于数据倾斜场景,通过采样的小样本数据映射整个数据集的整体情况
  9. *
  10. */
  11. @Test
  12. def sample()={
  13. val rdd = sc.parallelize(List(1,4,2,7,8,10,23,12,15,14,19,18,24,30,27,29))
  14. val rdd2 = rdd.sample(true,4)
  15. println(rdd2.collect().toList)
  16. val rdd3 = rdd.sample(false,0)
  17. println(rdd3.collect().toList)
  18. val rdd4 = rdd.sample(false,0.8)
  19. println(rdd4.collect().toList)
  20. }

9.distinct

  1. /**
  2. * distinct:去重
  3. * distinct会产生shuffle操作
  4. */
  5. @Test
  6. def distinct()={
  7. val rdd = sc.parallelize(List(1,4,2,7,1,4,3,2,1,10,1,3,2,1))
  8. val rdd2 = rdd.distinct()
  9. println(rdd2.collect().toList)
  10. val rdd3 = rdd.groupBy(x=>x).map(x=>x._1)
  11. println(rdd3.collect().toList)
  12. }

10.coalesce

  1. /**
  2. * coalesce:合并分区
  3. * coalesce默认只能减少分区数,此时不会产生shuffle操作
  4. * 如果想要增大分区,需要将shuffle参数设置为true,此时会产生shuffle操作
  5. */
  6. @Test
  7. def coalesce()={
  8. val rdd = sc.parallelize(List(1,4,2,7,1,4,3,2,1,10,1,3,2,1))
  9. val rdd2 = rdd.distinct()
  10. println(rdd.getNumPartitions)
  11. println(rdd2.getNumPartitions)
  12. rdd.mapPartitionsWithIndex((index,it)=>{
  13. println(s"rdd index=${index} data=${it.toList}")
  14. it
  15. }).collect()
  16. val rdd3 = rdd.coalesce(3)
  17. rdd3.mapPartitionsWithIndex((index,it)=>{
  18. println(s"rdd3 index=${index} data=${it.toList}")
  19. it
  20. }).collect()
  21. println(rdd3.getNumPartitions)
  22. }

11.repartition

  1. /**
  2. * repartition不管是增大分区还是减少分区都会产生shuffle操作
  3. * coalesce与repartition的区别
  4. * coalesce默认只能减少分区,不会产生shuffle操作
  5. * reparation既可以增大分区也可以减少分区,都会产生shuffle操作
  6. * coalesce与reparation的使用场景
  7. * coalesce:一般搭配filter使用用于减少分区数
  8. * reparation:一般用于增大分区,因为使用简单
  9. */
  10. @Test
  11. def repartition()={
  12. val rdd = sc.parallelize(List(1,4,2,7,1,4,3,2,1,10,1,3,2,1))
  13. val rdd2 = rdd.repartition(10)
  14. println(rdd2.getNumPartitions)
  15. }

12.sortBy

  1. /**
  2. * sortBy(func:RDD元素类型=>B):根据指定字段排序
  3. * sortBy后续是按照函数的返回值进行排序
  4. */
  5. @Test
  6. def sortBy()={
  7. val rdd = sc.parallelize(List(1,4,2,7,1,4,3,2,1,10,1,3,2,1))
  8. val rdd2 = rdd.sortBy(x=>x,true)
  9. println(rdd2.collect().toList)
  10. }

13.pipe

  1. pipe(脚本路径)
  2. pipe是每个分区调用一次脚本
  3. pipe会生成一个新的RDD,新RDD的数据在脚本中通过echo返回的

$03[SparkCore(transformation转换算子)上] - 图1

$03[SparkCore(transformation转换算子)上] - 图2

第二章.Transformation转换算子(双Value类型)

1.交集

  1. /**
  2. * 交集
  3. */
  4. @Test
  5. def intersection()={
  6. val rdd1 = sc.parallelize(List(1,2,3,4,5))
  7. val rdd2 = sc.parallelize(List(4,5,6,7,8))
  8. val rdd3 = rdd1.intersection(rdd2)
  9. println(rdd3.collect().toList)
  10. }

2.差集

  1. /**
  2. * 差集
  3. */
  4. @Test
  5. def subtract()={
  6. val rdd1 = sc.parallelize(List(1,2,3,4,5))
  7. val rdd2 = sc.parallelize(List(4,5,6,7,8))
  8. val rdd3 = rdd1.subtract(rdd2)
  9. println(rdd3.collect().toList)
  10. }

3.并集

  1. /**
  2. * 并集
  3. */
  4. @Test
  5. def union()={
  6. val rdd1 = sc.parallelize(List(1,2,3,4,5),5)
  7. val rdd2 = sc.parallelize(List(4,5,6,7,8),2)
  8. println(rdd1.getNumPartitions)
  9. println(rdd2.getNumPartitions)
  10. val rdd3 = rdd1.union(rdd2)
  11. println(rdd3.getNumPartitions)
  12. println(rdd3.collect().toList)
  13. }

4.拉链

  1. /**
  2. * 拉链
  3. * 两个RDD要想拉链必须分区数与元素个数都一样
  4. */
  5. @Test
  6. def zip()={
  7. val rdd1 = sc.parallelize(List("zhangsan","lisi","wangwu"))
  8. val rdd2 = sc.parallelize(List(10,20,30))
  9. val rdd3 = rdd1.zip(rdd2)
  10. println(rdd3.collect().toList)
  11. }