行动算子是触发了整个job的执行。因为转换算子都是懒加载,并不会立即执行。
reduce
/*
reduce(func: (RDD元素类型,RDD元素类型)=> RDD元素类型 ) : 对RDD所有元素聚合
*/
@Testdef reduce(): Unit ={val rdd = sc.parallelize(List(10,2,5,8,4))val result = rdd.reduce(_+_)println(result)}
collect
/*
collect是收集RDD每个分区的数据,然后以数组的形式封装之后返回给Driver(内存)
collect是收集RDD每个分区的数据,如果分区数据比较多,Driver内存默认是1G,如果数据比较多Driver内存放不下就会出现内存溢出,Driver内存工作中一般设置为5-10G
/
@Testdef collect(): Unit ={val rdd = sc.parallelize(List(10,2,5,8,4),2)val result = rdd.collect()println(result.toList)}
count
/*
count统计RDD元素个数
*/
@Testdef count(): Unit ={val rdd = sc.parallelize(List(10,2,5,8,4),2)println(rdd.count())}
first
/*
first获取RDD第一个元素
*/
@Testdef first(): Unit ={val rdd = sc.parallelize(List(10,2,5,8,4),2)println(rdd.first)}
take
/*
take获取RDD前N个元素
*/
@Testdef take(): Unit ={val rdd = sc.parallelize(List(10,2,5,8,4),2)println(rdd.take(3).toList)}
takeOrdered
/*
takeOrdered: 获取排序之后的前N个元素
*/
@Testdef takeOrered(): Unit ={val rdd = sc.parallelize(List(10,2,5,8,4),2)val result = rdd.takeOrdered(3)println(result.toList)}
aggregate
@Testdef aggregate(): Unit ={val rdd = sc.parallelize(List(10,2,5,8,4),2)rdd.mapPartitionsWithIndex((index,it)=>{println(s"index=${index} datas=${it.toList}")it}).collectval result = rdd.aggregate(0)((agg,curr)=>{println(s"combine: agg=${agg} curr=${curr}")agg+curr},(agg,curr)=>{println(s"reduce: agg=${agg} curr=${curr}")agg+curr})println(result)}
fold
@Testdef fold(): Unit ={val rdd = sc.parallelize(List(10,2,5,8,4),2)val result = rdd.fold(0)((agg,curr)=>{println(s"agg=${agg} curr=${curr}")agg+curr})println(result)}
countByKey
/*
统计key在RDD中出现的次数
countByKey一般结合sample用于数据倾斜场景,查看哪些元素明显多与其他
/
@Testdef countByKey(): Unit ={val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6))println(rdd.countByKey())}
save
@Testdef save(): Unit ={val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6),2)rdd.saveAsTextFile("output/text")}
foreach
/*
foreach(func: RDD元素类型=> Unit):Unit
foreach与map的区别:
map有返回值会生成一个新的RDD,map是转换算子
foreach没有返回值,foreach是action算子
foreach里面的函数针对RDD每个元素操作
*/
@Testdef foreach(): Unit ={val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6),2)rdd.foreach(x=>println(x))}
foreachPartition
/*
foreachPartition(func: 迭代器[RDD元素类型]=>Unit):Unit
foreachPartition针对的是RDD每个分区操作
foreachPartition 一般用于将数据保存在Mysql\hbase\redis等需要打开外部链接的地方
*/
@Testdef foreachPartition(): Unit ={val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6),2)rdd.foreachPartition(it=>{var connection:Connection = nullvar statement:PreparedStatement = nulltry{connection = DriverManager.getConnection("..")statement = connection.prepareStatement("...")var i = 0it.foreach(x=>{statement.setString(1,x._1)statement.setInt(2,x._2)statement.addBatch()if(i%1000==0){statement.executeBatch()statement.clearBatch()}i = i+1})statement.executeBatch()}catch {case e:Exception=>}finally {statement.close()connection.close()}})}
