概念

功能说明
对内部元素进行去重复,并将去重复的元素放到新的RDD里面 ,默认情况下distinct算子会生成与原来RDD分区个数一致的分区数
distinct算子也可以指定分区数,比如说我有10W条数据,去重复之后就变成了1W条数据了,那么之前的分区就可能不需要那么多了.
distinct算子底层有shuffle过程,会给分区元素打乱重组.

image.png

实操

不指定分区

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark09_Transformation_distinct22 {
  4. def main(args: Array[String]): Unit = {
  5. //创建SparkConf并设置App名称
  6. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  7. //创建SparkContext,该对象是提交Spark App的入口
  8. val sc: SparkContext = new SparkContext(conf)
  9. //创建RDD
  10. val numRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,5,4,3,3),5)
  11. numRDD.mapPartitionsWithIndex{
  12. (index,datas)=>{
  13. println(index + "--->" + datas.mkString(","))
  14. datas
  15. }
  16. }.collect()
  17. /* 没去重复之前是5个分区
  18. 输出:
  19. 1--->2,3
  20. 3--->5,4
  21. 2--->4,5
  22. 4--->3,3
  23. 0--->1
  24. */
  25. println("---------------")
  26. //对RDD中的数据进行去重, 100W条数据去重复之后可能还剩下10W条数据,那么 distinct 也可以重新指定分区
  27. val newRDD: RDD[Int] = numRDD.distinct()
  28. newRDD.mapPartitionsWithIndex{
  29. (index,datas)=>{
  30. println(index + "--->" + datas.mkString(","))
  31. datas
  32. }
  33. }.collect()
  34. /*
  35. 去重复之后还是五个分区
  36. 4--->4
  37. 2--->2
  38. 1--->1
  39. 0--->5
  40. 3--->3
  41. */
  42. //newRDD.collect().foreach(println)
  43. // 关闭连接
  44. sc.stop()
  45. }
  46. }

指定分区

distinct算子可以指定分区数.
五个分区的元素经过去重复之后变成两个分区了

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object demo {
  4. def main(args: Array[String]): Unit = {
  5. //创建SparkConf并设置App名称
  6. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  7. //创建SparkContext,该对象是提交Spark App的入口
  8. val sc: SparkContext = new SparkContext(conf)
  9. //创建RDD
  10. val numRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 5, 4, 3, 3), 5)
  11. numRDD.mapPartitionsWithIndex {
  12. (index, datas) => {
  13. println(index + "--->" + datas.mkString(","))
  14. datas
  15. }
  16. }.collect()
  17. /* 没去重复之前是5个分区
  18. 输出:
  19. 1--->2,3
  20. 3--->5,4
  21. 2--->4,5
  22. 4--->3,3
  23. 0--->1
  24. */
  25. println("---------------")
  26. //指定两个分区数
  27. val newRDD: RDD[Int] = numRDD.distinct(2)
  28. newRDD.mapPartitionsWithIndex {
  29. (index, datas) => {
  30. println(index + "--->" + datas.mkString(","))
  31. datas
  32. }
  33. }.collect()
  34. /*输出
  35. 1--->1,3,5
  36. 0--->4,2
  37. */
  38. //newRDD.collect().foreach(println)
  39. // 关闭连接
  40. sc.stop()
  41. }
  42. }

根据 bean的年龄字段去重复

  1. import org.apache.spark.{SparkConf, SparkContext}
  2. case class User(age: Int, name: String) {
  3. override def hashCode(): Int = this.age
  4. override def equals(obj: Any): Boolean = obj match {
  5. case User(age, _) => this.age == age
  6. case _ => false
  7. }
  8. }
  9. object demo {
  10. def main(args: Array[String]): Unit = {
  11. val conf: SparkConf = new SparkConf().setAppName("Distinct").setMaster("local[2]")
  12. val sc: SparkContext = new SparkContext(conf)
  13. //年龄相等就重复,需要覆写hashCode和equals这两个方法.
  14. val rdd1 = sc.parallelize(List(User(10, "lisi"), User(20, "zs"), User(10, "ab")))
  15. val rdd2 = rdd1.distinct(2)
  16. rdd2.collect.foreach(println)
  17. sc.stop()
  18. }
  19. }

输出结果就是:
User(20,zs)
User(10,lisi)