功能说明
在一个(Key,Value)的RDD上调用,Key必须实现Ordered接口,返回一个按照Key进行排序的(Key,Value)的RDD
案例
根据key升序或者降序排序
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object demo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((3, "aa"), (6, "cc"), (2, "bb"), (1, "dd")))
//按照key对rdd中的元素进行排序 默认升序
val newRDD1: RDD[(Int, String)] = rdd.sortByKey()
println(newRDD1.collect().mkString(",")) //输出: (1,dd),(2,bb),(3,aa),(6,cc)
println("----------")
//降序
val newRDD2: RDD[(Int, String)] = rdd.sortByKey(false)
println(newRDD2.collect().mkString(",")) //输出: (6,cc),(3,aa),(2,bb),(1,dd)
sc.stop()
}
}
对对象进行排序 根据名字排序,如果名字相同就根据年龄排序
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object demo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//如果key为自定义类型,要求必须混入Ordered特质
val stdList: List[(Student, Int)] = List(
(new Student("jingjing", 18), 1),
(new Student("bangzhang", 18), 1),
(new Student("jingjing", 19), 1),
(new Student("luoxiang", 18), 1),
(new Student("jingjing", 20), 1)
)
val stdRDD: RDD[(Student, Int)] = sc.makeRDD(stdList)
val resRDD: RDD[(Student, Int)] = stdRDD.sortByKey()
println(resRDD.collect().mkString(" -- "))
/*输出:
(Student(bangzhang, 18),1)
-- (Student(jingjing, 20),1)
-- (Student(jingjing, 19),1)
-- (Student(jingjing, 18),1)
-- (Student(luoxiang, 18),1)
*/
sc.stop()
}
}
class Student(var name: String, var age: Int) extends Ordered[Student] with Serializable {
//指定比较规则
override def compare(that: Student): Int = {
//先按照名称排序升序,如果名称相同的话,再按照年龄降序排序
var res: Int = this.name.compareTo(that.name) //当前名称对象和传递过来的名称对象去做比较,
if (res == 0) { //如果为0说明名字是一样的.
res = this.age - that.age // 升序
res = that.age - this.age // 降序
}
res
}
override def toString = s"Student($name, $age)"
}