行动算子是触发了整个job的执行。因为转换算子都是懒加载,并不会立即执行。

reduce

/*
reduce(func: (RDD元素类型,RDD元素类型)=> RDD元素类型 ) : 对RDD所有元素聚合
*/

  1. @Test
  2. def reduce(): Unit ={
  3. val rdd = sc.parallelize(List(10,2,5,8,4))
  4. val result = rdd.reduce(_+_)
  5. println(result)
  6. }

collect

/*
collect是收集RDD每个分区的数据,然后以数组的形式封装之后返回给Driver(内存)
collect是收集RDD每个分区的数据,如果分区数据比较多,Driver内存默认是1G,如果数据比较多Driver内存放不下就会出现内存溢出,Driver内存工作中一般设置为5-10G
/

  1. @Test
  2. def collect(): Unit ={
  3. val rdd = sc.parallelize(List(10,2,5,8,4),2)
  4. val result = rdd.collect()
  5. println(result.toList)
  6. }

count

/*
count统计RDD元素个数
*/

  1. @Test
  2. def count(): Unit ={
  3. val rdd = sc.parallelize(List(10,2,5,8,4),2)
  4. println(rdd.count())
  5. }

first

/*
first获取RDD第一个元素
*/

  1. @Test
  2. def first(): Unit ={
  3. val rdd = sc.parallelize(List(10,2,5,8,4),2)
  4. println(rdd.first)
  5. }

take

/*
take获取RDD前N个元素
*/

  1. @Test
  2. def take(): Unit ={
  3. val rdd = sc.parallelize(List(10,2,5,8,4),2)
  4. println(rdd.take(3).toList)
  5. }

takeOrdered

/*
takeOrdered: 获取排序之后的前N个元素
*/

  1. @Test
  2. def takeOrered(): Unit ={
  3. val rdd = sc.parallelize(List(10,2,5,8,4),2)
  4. val result = rdd.takeOrdered(3)
  5. println(result.toList)
  6. }

aggregate

  1. @Test
  2. def aggregate(): Unit ={
  3. val rdd = sc.parallelize(List(10,2,5,8,4),2)
  4. rdd.mapPartitionsWithIndex((index,it)=>{
  5. println(s"index=${index} datas=${it.toList}")
  6. it
  7. }).collect
  8. val result = rdd.aggregate(0)((agg,curr)=>{
  9. println(s"combine: agg=${agg} curr=${curr}")
  10. agg+curr
  11. },(agg,curr)=>{
  12. println(s"reduce: agg=${agg} curr=${curr}")
  13. agg+curr
  14. })
  15. println(result)
  16. }

fold

  1. @Test
  2. def fold(): Unit ={
  3. val rdd = sc.parallelize(List(10,2,5,8,4),2)
  4. val result = rdd.fold(0)((agg,curr)=>{
  5. println(s"agg=${agg} curr=${curr}")
  6. agg+curr
  7. })
  8. println(result)
  9. }

countByKey

/*
统计key在RDD中出现的次数
countByKey一般结合sample用于数据倾斜场景,查看哪些元素明显多与其他
/

  1. @Test
  2. def countByKey(): Unit ={
  3. val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6))
  4. println(rdd.countByKey())
  5. }

save

  1. @Test
  2. def save(): Unit ={
  3. val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6),2)
  4. rdd.saveAsTextFile("output/text")
  5. }

foreach

/*
foreach(func: RDD元素类型=> Unit):Unit
foreach与map的区别:
map有返回值会生成一个新的RDD,map是转换算子
foreach没有返回值,foreach是action算子
foreach里面的函数针对RDD每个元素操作
*/

  1. @Test
  2. def foreach(): Unit ={
  3. val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6),2)
  4. rdd.foreach(x=>println(x))
  5. }

foreachPartition

/*
foreachPartition(func: 迭代器[RDD元素类型]=>Unit):Unit
foreachPartition针对的是RDD每个分区操作
foreachPartition 一般用于将数据保存在Mysql\hbase\redis等需要打开外部链接的地方
*/

  1. @Test
  2. def foreachPartition(): Unit ={
  3. val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6),2)
  4. rdd.foreachPartition(it=>{
  5. var connection:Connection = null
  6. var statement:PreparedStatement = null
  7. try{
  8. connection = DriverManager.getConnection("..")
  9. statement = connection.prepareStatement("...")
  10. var i = 0
  11. it.foreach(x=>{
  12. statement.setString(1,x._1)
  13. statement.setInt(2,x._2)
  14. statement.addBatch()
  15. if(i%1000==0){
  16. statement.executeBatch()
  17. statement.clearBatch()
  18. }
  19. i = i+1
  20. })
  21. statement.executeBatch()
  22. }catch {
  23. case e:Exception=>
  24. }finally {
  25. statement.close()
  26. connection.close()
  27. }
  28. })
  29. }