功能说明
在一个(Key,Value)的RDD上调用,Key必须实现Ordered接口,返回一个按照Key进行排序的(Key,Value)的RDD
案例
根据key升序或者降序排序
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object demo {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)val rdd: RDD[(Int, String)] = sc.makeRDD(Array((3, "aa"), (6, "cc"), (2, "bb"), (1, "dd")))//按照key对rdd中的元素进行排序 默认升序val newRDD1: RDD[(Int, String)] = rdd.sortByKey()println(newRDD1.collect().mkString(",")) //输出: (1,dd),(2,bb),(3,aa),(6,cc)println("----------")//降序val newRDD2: RDD[(Int, String)] = rdd.sortByKey(false)println(newRDD2.collect().mkString(",")) //输出: (6,cc),(3,aa),(2,bb),(1,dd)sc.stop()}}
对对象进行排序 根据名字排序,如果名字相同就根据年龄排序
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object demo {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)//如果key为自定义类型,要求必须混入Ordered特质val stdList: List[(Student, Int)] = List((new Student("jingjing", 18), 1),(new Student("bangzhang", 18), 1),(new Student("jingjing", 19), 1),(new Student("luoxiang", 18), 1),(new Student("jingjing", 20), 1))val stdRDD: RDD[(Student, Int)] = sc.makeRDD(stdList)val resRDD: RDD[(Student, Int)] = stdRDD.sortByKey()println(resRDD.collect().mkString(" -- "))/*输出:(Student(bangzhang, 18),1)-- (Student(jingjing, 20),1)-- (Student(jingjing, 19),1)-- (Student(jingjing, 18),1)-- (Student(luoxiang, 18),1)*/sc.stop()}}class Student(var name: String, var age: Int) extends Ordered[Student] with Serializable {//指定比较规则override def compare(that: Student): Int = {//先按照名称排序升序,如果名称相同的话,再按照年龄降序排序var res: Int = this.name.compareTo(that.name) //当前名称对象和传递过来的名称对象去做比较,if (res == 0) { //如果为0说明名字是一样的.res = this.age - that.age // 升序res = that.age - this.age // 降序}res}override def toString = s"Student($name, $age)"}
