第一章.Transformation转换算子(Value类型)
1.map映射
package com.atguigu.spark.day03import org.apache.spark.{SparkConf, SparkContext}import org.junit.Testclass $01Transformation {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))/*** map(func:RDD中元素类型=>B):映射* map的使用场景:一对一,主要用于对每个元素进行类型/值的转换* map里面的函数是针对RDD 中的每个元素操作,每个元素操作完成会返回一个结果* rdd中有多少个元素,map中的函数就会调用多少次* val rdd2 = rdd1.map(..) rdd2的元素个数=rdd1的元素个数*/@Testdef map()={val rdd = sc.parallelize(List("hello", "spark", "hadoop", "flume"))println(rdd.map(x => {println(s"${Thread.currentThread().getName}->${x}")x.length}).collect().toList)}}
2.mapPartitions
/*** Spark算子里面的代码是在Executor中执行的* Spark算子外面的代码是在Driver中执行的** mapPartitions(func: Iterator[Rdd元素类型]=>Iterator[B])* mapPartitions里面的函数是针对每个分区操作* rdd有多少个分区,mapPartitions里面的函数就会调用多少次* mapPartitions里面函数的返回值是作为新的RDD分区的所有数据*/@Testdef mapPartitions()={/*** rdd1里面的数据是用户id* 用户详细信息存储在mySQL中* 需求:获取用户的详细信息[id,name,age]*/val rdd1 = sc.parallelize(List(1, 4, 2, 6, 3, 7))val rdd2 = rdd1.map(id=>{var connection:Connection = nullvar statement:PreparedStatement = nullconnection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test","root","321074" )statement = connection.prepareStatement("select * from person where id =?")var name:String = ""var age:Int = 0try{println(s"-------->${id}")statement.setInt(1,id)val resultSet = statement.executeQuery()while(resultSet.next()){name = resultSet.getString("name")age = resultSet.getInt("age")}}catch{case e:Exception =>e.printStackTrace()}finally {if(statement!=null)statement.close()if(connection!=null)connection.close()}(id,name,age)})println(rdd2.collect().toList)val rdd3 = rdd1.mapPartitions(it=>{var connection:Connection = nullvar statement:PreparedStatement = nullconnection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test","root","321074")statement = connection.prepareStatement("select * from person where id =?")println(s"${connection}")var name:String = ""var age:Int = 0var list:List[(Int,String,Int)] = Niltry{it.foreach(id=>{statement.setInt(1,id)val resultSet = statement.executeQuery()while(resultSet.next()){name = resultSet.getString("name")age = resultSet.getInt("age")list = (id,name,age)::list}})}catch{case e:Exception=>}finally{if(statement!=null)statement.close()if(connection!=null)connection.close()}list.toIterator})println(rdd3.collect().toList)}
map与mapPartitions的区别
- 函数针对的对象不一样
- map里面的函数是针对RDD每个元素操作,RDD里面元素有多少个,函数就会调用多少次
- mapPartitions里面的函数是针对RDD每个分区操作,RDD有多少个分区,函数就调用多少次
- 函数的返回值不一样
- map是一个元素返回一个结果,map新生产的RDD的元素个数=原RDD的元素个数
- mapPartitions是返回一个迭代器,mapPartitions新RDD的元素个数可能与原RDD元素个数不一致
- 内存回收时机不一样
- map里面的函数是针对RDD每个元素操作,所以元素操作完成之后可以直接进行回收内存
- mapPartitions里面的函数是针对RDD每个分区所有数据的迭代器操作,所以必须等到迭代器所有数据都处理完成才会回收内存,如果分区数据特别大,可能出现内存溢出,此时可以用map代替[完成比完美更重要
3.mapPartitionsWithIndex
/*** mapPartitionswithIndex(func:(Int,Iterator[RDD元素类型])=Iterator[B] )* mapPartitionsWithIndex里面的函数也是针对RDD每个分区操作* mapPartitionsWithIndex与mapPartitions的区别* mapPartitions的函数没有分区号* mapPartitionsWithIndex的函数有分区号*/@Testdef mapPartitionsWithIndex()={val rdd1 = sc.parallelize(List(1, 4, 2, 6, 3, 7, 5, 10))val rdd2 = rdd1.mapPartitions(it => {println(s"data=${it.toList}")it})rdd2.collect()val rdd3 = rdd1.mapPartitionsWithIndex((index, it) => {//println(s"index=${index} data =${it.toList}")//itit.filter(_%2==0)})println(rdd3.collect().toList)}
4.flatMap
/*** flatMap(func:RDD元素类型=>集合) = map + flatten* flatMap里面的函数也是针对每个元素操作,元素有多少个,函数就会调用多少次*/@Testdef flatMap()={val rdd1 = sc.parallelize(List("hello spark", "hello java"))val rdd2 = rdd1.flatMap(x => x.split(" "))println(rdd2.collect().toList)}
5.glom
/*** glom:将RDD每个分区的数据用数组包装* glom新生成的RDD 元素个数 = 分区数*/@Testdef glom()={val rdd1 = sc.parallelize(List(1, 3, 4, 6, 8, 20, 30, 22, 11, 54))rdd1.mapPartitionsWithIndex((index,it)=>{println(s"index:${index} data:${it.toList}")it}).collect()val rdd2 = rdd1.glom()val arr = rdd2.collect()arr.foreach(x=>println(x.toList))}
6.groupBy
/*** groupBy(func:RDD元素类型 => K):按照指定字段分组* groupBy是根据函数的返回值进行分组* groupBy生成的RDD里面元素是K,V键值对,K就是函数的返回值,v就是K对应原RDD所有元素的集合** MR整个流程: 数据 -> InputFormat ->mapper[map方法] -> 环形缓冲区[80%,分区排序] ->combiner ->磁盘 ->reduce拉取数据[归并排序] ->reduce ->磁盘* MR shuffle阶段: 环形缓冲区[80%,分区排序] -> combiner ->磁盘 ->reduce拉取数据[归并排序]* Spark shuffle阶段: ->缓冲区[分区排序] ->combiner ->磁盘->分区拉取数据[归并排序]**/@Testdef groupBy()={val rdd = sc.parallelize(List("hadoop","spark","hadoop","spark","flume","kafka"))val rdd2 = rdd.groupBy(x=>x)println(rdd2.collect().toList)Thread.sleep(100000)}
7.filter
/*** filter(func:RDD元素类型=>Boolean):按照指定条件过滤* filter里面的函数也是针对每个元素操作* filter保留的是函数返回值为true的数据*/@Testdef filter()={val rdd = sc.parallelize(List(1,4,2,7,8,10))val rdd2 = rdd.filter(x=>x%2==0)println(rdd2.collect().toList)}
8.sample
/*** sample(withReplacement,fraction[,seed]):采样* withReplacement:同一个元素是否可以被多次采样[true代表同一个元素可以被多次采样,false代表同一个元素最多被采样一次]* fraction:* withReplacement=true,fraction代表每个元素期望被采样的次数* withReplacement=false,fraction代表每个元素被采样的概率<一般设置为0.1-0.2>* seed:随机数种子* 应用场景:用于数据倾斜场景,通过采样的小样本数据映射整个数据集的整体情况**/@Testdef sample()={val rdd = sc.parallelize(List(1,4,2,7,8,10,23,12,15,14,19,18,24,30,27,29))val rdd2 = rdd.sample(true,4)println(rdd2.collect().toList)val rdd3 = rdd.sample(false,0)println(rdd3.collect().toList)val rdd4 = rdd.sample(false,0.8)println(rdd4.collect().toList)}
9.distinct
/*** distinct:去重* distinct会产生shuffle操作*/@Testdef distinct()={val rdd = sc.parallelize(List(1,4,2,7,1,4,3,2,1,10,1,3,2,1))val rdd2 = rdd.distinct()println(rdd2.collect().toList)val rdd3 = rdd.groupBy(x=>x).map(x=>x._1)println(rdd3.collect().toList)}
10.coalesce
/*** coalesce:合并分区* coalesce默认只能减少分区数,此时不会产生shuffle操作* 如果想要增大分区,需要将shuffle参数设置为true,此时会产生shuffle操作*/@Testdef coalesce()={val rdd = sc.parallelize(List(1,4,2,7,1,4,3,2,1,10,1,3,2,1))val rdd2 = rdd.distinct()println(rdd.getNumPartitions)println(rdd2.getNumPartitions)rdd.mapPartitionsWithIndex((index,it)=>{println(s"rdd index=${index} data=${it.toList}")it}).collect()val rdd3 = rdd.coalesce(3)rdd3.mapPartitionsWithIndex((index,it)=>{println(s"rdd3 index=${index} data=${it.toList}")it}).collect()println(rdd3.getNumPartitions)}
11.repartition
/*** repartition不管是增大分区还是减少分区都会产生shuffle操作* coalesce与repartition的区别* coalesce默认只能减少分区,不会产生shuffle操作* reparation既可以增大分区也可以减少分区,都会产生shuffle操作* coalesce与reparation的使用场景* coalesce:一般搭配filter使用用于减少分区数* reparation:一般用于增大分区,因为使用简单*/@Testdef repartition()={val rdd = sc.parallelize(List(1,4,2,7,1,4,3,2,1,10,1,3,2,1))val rdd2 = rdd.repartition(10)println(rdd2.getNumPartitions)}
12.sortBy
/*** sortBy(func:RDD元素类型=>B):根据指定字段排序* sortBy后续是按照函数的返回值进行排序*/@Testdef sortBy()={val rdd = sc.parallelize(List(1,4,2,7,1,4,3,2,1,10,1,3,2,1))val rdd2 = rdd.sortBy(x=>x,true)println(rdd2.collect().toList)}
13.pipe
pipe(脚本路径)pipe是每个分区调用一次脚本pipe会生成一个新的RDD,新RDD的数据在脚本中通过echo返回的
![$03[SparkCore(transformation转换算子)上] - 图1](/uploads/projects/liuye-6lcqc@gws1uf/4f4f5fdffe4675ef495d7a4ef16a40d3.png)
![$03[SparkCore(transformation转换算子)上] - 图2](/uploads/projects/liuye-6lcqc@gws1uf/1d48695c569231ccbb57a873dcd06092.png)
第二章.Transformation转换算子(双Value类型)
1.交集
/*** 交集*/@Testdef intersection()={val rdd1 = sc.parallelize(List(1,2,3,4,5))val rdd2 = sc.parallelize(List(4,5,6,7,8))val rdd3 = rdd1.intersection(rdd2)println(rdd3.collect().toList)}
2.差集
/*** 差集*/@Testdef subtract()={val rdd1 = sc.parallelize(List(1,2,3,4,5))val rdd2 = sc.parallelize(List(4,5,6,7,8))val rdd3 = rdd1.subtract(rdd2)println(rdd3.collect().toList)}
3.并集
/*** 并集*/@Testdef union()={val rdd1 = sc.parallelize(List(1,2,3,4,5),5)val rdd2 = sc.parallelize(List(4,5,6,7,8),2)println(rdd1.getNumPartitions)println(rdd2.getNumPartitions)val rdd3 = rdd1.union(rdd2)println(rdd3.getNumPartitions)println(rdd3.collect().toList)}
4.拉链
/*** 拉链* 两个RDD要想拉链必须分区数与元素个数都一样*/@Testdef zip()={val rdd1 = sc.parallelize(List("zhangsan","lisi","wangwu"))val rdd2 = sc.parallelize(List(10,20,30))val rdd3 = rdd1.zip(rdd2)println(rdd3.collect().toList)}
