作用
使用func先对数据进行处理,按照处理后结果排序,默认为正序。
案例
对数字进行排序
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Transformation_sortBy2 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val numRDD: RDD[Int] = sc.makeRDD(List(1, 4, 3, 2))
// 升序排序
val sortedRDD1: RDD[Int] = numRDD.sortBy(num => num)
println(sortedRDD1.collect().mkString(",")) //输出: 1,2,3,4
// 降序排序
val sortedRDD3: RDD[Int] = numRDD.sortBy(num => -num)
println(sortedRDD3.collect().mkString(",")) // //输出: 4,3,2,1
//参数2 false 就是降序,不写的话默认为true,就是升序
val sortedRDD4: RDD[Int] = numRDD.sortBy(num => num, false)
println(sortedRDD4.collect().mkString(",")) //输出: 4,3,2,1
sc.stop()
}
}
对字符串排序
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Transformation_sortBy {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val strRDD: RDD[String] = sc.makeRDD(List("1", "4", "3", "22"))
//按照字符串转换为数字类型进行排序
val sortedRDD: RDD[String] = strRDD.sortBy(x => x.toInt) // sortBy(x=>x.toInt) 可以简写成sortBy(_.toInt)
println(sortedRDD.collect().mkString(",")) //输出: 1,3,4,22
//按照字符串字符字典顺序进行排序
val sortedRDD2: RDD[String] = strRDD.sortBy(elem => elem)
println(sortedRDD2.collect().mkString(",")) //输出: 1,22,3,4
sc.stop()
}
}
根据实体类的某个字段进行排序
根据person的年龄进行排序
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//自定义类型的排序
class Person(val age: Int, val name: String) extends Serializable {
override def toString: String = s"$age"
}
object SortBy2 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SortBy").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val rdd1 = sc.parallelize(new Person(10, "lisi") :: new Person(20, "zs") :: new Person(15, "ww") :: Nil)
implicit val ord: Ordering[Person] = new Ordering[Person] {
override def compare(x: Person, y: Person): Int = x.age - y.age
}
// 如果是样例类, 或者元组, ClassTag不需要穿
val rdd2: RDD[Person] = rdd1.sortBy(x => x)
rdd2.collect.foreach(println)
sc.stop()
}
}
根据字符串的长度进行排序
import org.apache.spark.{SparkConf, SparkContext}
object SortBy2 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SortBy").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val list1 = List("aa", "ccc", "bdddd", "d", "b")
val rdd1 = sc.parallelize(list1, 2)
val rdd2 = rdd1.sortBy(_.length)
println(rdd2.collect.mkString(",")) //输出: d,b,aa,ccc,bdddd
sc.stop()
}
}