概念
功能说明
对内部元素进行去重复,并将去重复的元素放到新的RDD里面 ,默认情况下distinct算子会生成与原来RDD分区个数一致的分区数
distinct算子也可以指定分区数,比如说我有10W条数据,去重复之后就变成了1W条数据了,那么之前的分区就可能不需要那么多了.
distinct算子底层有shuffle过程,会给分区元素打乱重组.
实操
不指定分区
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark09_Transformation_distinct22 {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//创建RDD
val numRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,5,4,3,3),5)
numRDD.mapPartitionsWithIndex{
(index,datas)=>{
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
/* 没去重复之前是5个分区
输出:
1--->2,3
3--->5,4
2--->4,5
4--->3,3
0--->1
*/
println("---------------")
//对RDD中的数据进行去重, 100W条数据去重复之后可能还剩下10W条数据,那么 distinct 也可以重新指定分区
val newRDD: RDD[Int] = numRDD.distinct()
newRDD.mapPartitionsWithIndex{
(index,datas)=>{
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
/*
去重复之后还是五个分区
4--->4
2--->2
1--->1
0--->5
3--->3
*/
//newRDD.collect().foreach(println)
// 关闭连接
sc.stop()
}
}
指定分区
distinct算子可以指定分区数.
五个分区的元素经过去重复之后变成两个分区了
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object demo {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//创建RDD
val numRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 3, 3), 5)
numRDD.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
/* 没去重复之前是5个分区
输出:
1--->2,3
3--->5,4
2--->4,5
4--->3,3
0--->1
*/
println("---------------")
//指定两个分区数
val newRDD: RDD[Int] = numRDD.distinct(2)
newRDD.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
/*输出
1--->1,3,5
0--->4,2
*/
//newRDD.collect().foreach(println)
// 关闭连接
sc.stop()
}
}
根据 bean的年龄字段去重复
import org.apache.spark.{SparkConf, SparkContext}
case class User(age: Int, name: String) {
override def hashCode(): Int = this.age
override def equals(obj: Any): Boolean = obj match {
case User(age, _) => this.age == age
case _ => false
}
}
object demo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Distinct").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
//年龄相等就重复,需要覆写hashCode和equals这两个方法.
val rdd1 = sc.parallelize(List(User(10, "lisi"), User(20, "zs"), User(10, "ab")))
val rdd2 = rdd1.distinct(2)
rdd2.collect.foreach(println)
sc.stop()
}
}
输出结果就是:
User(20,zs)
User(10,lisi)