行动算子是触发了整个job的执行。因为转换算子都是懒加载,并不会立即执行。
reduce
/*
reduce(func: (RDD元素类型,RDD元素类型)=> RDD元素类型 ) : 对RDD所有元素聚合
*/
@Test
def reduce(): Unit ={
val rdd = sc.parallelize(List(10,2,5,8,4))
val result = rdd.reduce(_+_)
println(result)
}
collect
/*
collect是收集RDD每个分区的数据,然后以数组的形式封装之后返回给Driver(内存)
collect是收集RDD每个分区的数据,如果分区数据比较多,Driver内存默认是1G,如果数据比较多Driver内存放不下就会出现内存溢出,Driver内存工作中一般设置为5-10G
/
@Test
def collect(): Unit ={
val rdd = sc.parallelize(List(10,2,5,8,4),2)
val result = rdd.collect()
println(result.toList)
}
count
/*
count统计RDD元素个数
*/
@Test
def count(): Unit ={
val rdd = sc.parallelize(List(10,2,5,8,4),2)
println(rdd.count())
}
first
/*
first获取RDD第一个元素
*/
@Test
def first(): Unit ={
val rdd = sc.parallelize(List(10,2,5,8,4),2)
println(rdd.first)
}
take
/*
take获取RDD前N个元素
*/
@Test
def take(): Unit ={
val rdd = sc.parallelize(List(10,2,5,8,4),2)
println(rdd.take(3).toList)
}
takeOrdered
/*
takeOrdered: 获取排序之后的前N个元素
*/
@Test
def takeOrered(): Unit ={
val rdd = sc.parallelize(List(10,2,5,8,4),2)
val result = rdd.takeOrdered(3)
println(result.toList)
}
aggregate
@Test
def aggregate(): Unit ={
val rdd = sc.parallelize(List(10,2,5,8,4),2)
rdd.mapPartitionsWithIndex((index,it)=>{
println(s"index=${index} datas=${it.toList}")
it
}).collect
val result = rdd.aggregate(0)((agg,curr)=>{
println(s"combine: agg=${agg} curr=${curr}")
agg+curr
},(agg,curr)=>{
println(s"reduce: agg=${agg} curr=${curr}")
agg+curr
})
println(result)
}
fold
@Test
def fold(): Unit ={
val rdd = sc.parallelize(List(10,2,5,8,4),2)
val result = rdd.fold(0)((agg,curr)=>{
println(s"agg=${agg} curr=${curr}")
agg+curr
})
println(result)
}
countByKey
/*
统计key在RDD中出现的次数
countByKey一般结合sample用于数据倾斜场景,查看哪些元素明显多与其他
/
@Test
def countByKey(): Unit ={
val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6))
println(rdd.countByKey())
}
save
@Test
def save(): Unit ={
val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6),2)
rdd.saveAsTextFile("output/text")
}
foreach
/*
foreach(func: RDD元素类型=> Unit):Unit
foreach与map的区别:
map有返回值会生成一个新的RDD,map是转换算子
foreach没有返回值,foreach是action算子
foreach里面的函数针对RDD每个元素操作
*/
@Test
def foreach(): Unit ={
val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6),2)
rdd.foreach(x=>println(x))
}
foreachPartition
/*
foreachPartition(func: 迭代器[RDD元素类型]=>Unit):Unit
foreachPartition针对的是RDD每个分区操作
foreachPartition 一般用于将数据保存在Mysql\hbase\redis等需要打开外部链接的地方
*/
@Test
def foreachPartition(): Unit ={
val rdd = sc.parallelize(List("aa"->1,"cc"->2,"aa"->5,"aa"->6),2)
rdd.foreachPartition(it=>{
var connection:Connection = null
var statement:PreparedStatement = null
try{
connection = DriverManager.getConnection("..")
statement = connection.prepareStatement("...")
var i = 0
it.foreach(x=>{
statement.setString(1,x._1)
statement.setInt(2,x._2)
statement.addBatch()
if(i%1000==0){
statement.executeBatch()
statement.clearBatch()
}
i = i+1
})
statement.executeBatch()
}catch {
case e:Exception=>
}finally {
statement.close()
connection.close()
}
})
}