根据第一列降序,第二列降序。第一列相等,第二列降序。

    步骤:
    1)混入Ordered 和 Serializable 特质(trait),实现自定义的用于排序的key
    2)将要进行二次排序的文件加载进来生成(key,value)类型的RDD
    3)使用sortByKey()基于自定义的key进行二次排序
    4)去除排序的key,只保留排序的结果。

    实现:

    1. class SecondarySortKey(val first:Int, val second:Int) extends Orderd[SecondarySortKey] with Serializable {
    2. def compare(other: SecondarySortKey):Int = {
    3. if(this.first - other.first !=0){
    4. this.first - other.first
    5. } else{
    6. this.second - other.second
    7. }
    8. }
    9. }
    10. Object SecondarySortApp{
    11. def main(args:Array[String]) {
    12. val conf = new SparkConf().setAppName("SecondAry").setMaster("local")
    13. val sc = new SparkContext(conf)
    14. val lines = sc.textFile("file://aa.txt")
    15. val pariWithSortKey = lines.map( line => (new SecondarySortKey(line.split(" ")(0).toInt),line.split(" ")(1).toInt),line))
    16. val sorted = pariWithSortKey.sortByKey(false)
    17. val sortedResult = sorted.map(sortedLine => sortedLine._2)
    18. sortedResult.collect().foreach(println(_))
    19. }
    20. }
    1. package spark.rdd
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.{SparkConf, SparkContext}
    4. object SecondarySortApp {
    5. // 第一列升序,第二列降序
    6. def main(args: Array[String]): Unit = {
    7. val conf = new SparkConf().setAppName("SortByKey").setMaster("local[*]")
    8. val sc = new SparkContext(conf)
    9. sc.setLogLevel("ERROR")
    10. val data = sc.textFile("/test/file/secondarySort.txt",1)
    11. // 巧妙运用List的默认排序方法
    12. val value: RDD[(String, String)] = data.coalesce(1,false).map(line => (line, line)).sortByKey(true)
    13. val value1: RDD[(String, List[String])] = data.map(line => (line.split(",")(0), line)).groupByKey(1).sortByKey(true).map(line => (line._1, line._2.toList.sortWith(_.compareTo(_) > 0)))
    14. value1.map(_._2).flatMap(_.mkString("@").split("@")).foreach(println)
    15. }
    16. }