作用

使用func先对数据进行处理,按照处理后结果排序,默认为正序。

案例

对数字进行排序

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Transformation_sortBy2 {
  4. def main(args: Array[String]): Unit = {
  5. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  6. val sc: SparkContext = new SparkContext(conf)
  7. val numRDD: RDD[Int] = sc.makeRDD(List(1, 4, 3, 2))
  8. // 升序排序
  9. val sortedRDD1: RDD[Int] = numRDD.sortBy(num => num)
  10. println(sortedRDD1.collect().mkString(",")) //输出: 1,2,3,4
  11. // 降序排序
  12. val sortedRDD3: RDD[Int] = numRDD.sortBy(num => -num)
  13. println(sortedRDD3.collect().mkString(",")) // //输出: 4,3,2,1
  14. //参数2 false 就是降序,不写的话默认为true,就是升序
  15. val sortedRDD4: RDD[Int] = numRDD.sortBy(num => num, false)
  16. println(sortedRDD4.collect().mkString(",")) //输出: 4,3,2,1
  17. sc.stop()
  18. }
  19. }

对字符串排序

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Transformation_sortBy {
  4. def main(args: Array[String]): Unit = {
  5. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  6. val sc: SparkContext = new SparkContext(conf)
  7. val strRDD: RDD[String] = sc.makeRDD(List("1", "4", "3", "22"))
  8. //按照字符串转换为数字类型进行排序
  9. val sortedRDD: RDD[String] = strRDD.sortBy(x => x.toInt) // sortBy(x=>x.toInt) 可以简写成sortBy(_.toInt)
  10. println(sortedRDD.collect().mkString(",")) //输出: 1,3,4,22
  11. //按照字符串字符字典顺序进行排序
  12. val sortedRDD2: RDD[String] = strRDD.sortBy(elem => elem)
  13. println(sortedRDD2.collect().mkString(",")) //输出: 1,22,3,4
  14. sc.stop()
  15. }
  16. }

根据实体类的某个字段进行排序

根据person的年龄进行排序

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. //自定义类型的排序
  4. class Person(val age: Int, val name: String) extends Serializable {
  5. override def toString: String = s"$age"
  6. }
  7. object SortBy2 {
  8. def main(args: Array[String]): Unit = {
  9. val conf: SparkConf = new SparkConf().setAppName("SortBy").setMaster("local[2]")
  10. val sc: SparkContext = new SparkContext(conf)
  11. val rdd1 = sc.parallelize(new Person(10, "lisi") :: new Person(20, "zs") :: new Person(15, "ww") :: Nil)
  12. implicit val ord: Ordering[Person] = new Ordering[Person] {
  13. override def compare(x: Person, y: Person): Int = x.age - y.age
  14. }
  15. // 如果是样例类, 或者元组, ClassTag不需要穿
  16. val rdd2: RDD[Person] = rdd1.sortBy(x => x)
  17. rdd2.collect.foreach(println)
  18. sc.stop()
  19. }
  20. }

根据字符串的长度进行排序

  1. import org.apache.spark.{SparkConf, SparkContext}
  2. object SortBy2 {
  3. def main(args: Array[String]): Unit = {
  4. val conf: SparkConf = new SparkConf().setAppName("SortBy").setMaster("local[2]")
  5. val sc: SparkContext = new SparkContext(conf)
  6. val list1 = List("aa", "ccc", "bdddd", "d", "b")
  7. val rdd1 = sc.parallelize(list1, 2)
  8. val rdd2 = rdd1.sortBy(_.length)
  9. println(rdd2.collect.mkString(",")) //输出: d,b,aa,ccc,bdddd
  10. sc.stop()
  11. }
  12. }