根据第一列降序,第二列降序。第一列相等,第二列降序。
步骤:
1)混入Ordered 和 Serializable 特质(trait),实现自定义的用于排序的key
2)将要进行二次排序的文件加载进来生成(key,value)类型的RDD
3)使用sortByKey()基于自定义的key进行二次排序
4)去除排序的key,只保留排序的结果。
实现:
class SecondarySortKey(val first:Int, val second:Int) extends Orderd[SecondarySortKey] with Serializable {def compare(other: SecondarySortKey):Int = {if(this.first - other.first !=0){this.first - other.first} else{this.second - other.second}}}Object SecondarySortApp{def main(args:Array[String]) {val conf = new SparkConf().setAppName("SecondAry").setMaster("local")val sc = new SparkContext(conf)val lines = sc.textFile("file://aa.txt")val pariWithSortKey = lines.map( line => (new SecondarySortKey(line.split(" ")(0).toInt),line.split(" ")(1).toInt),line))val sorted = pariWithSortKey.sortByKey(false)val sortedResult = sorted.map(sortedLine => sortedLine._2)sortedResult.collect().foreach(println(_))}}
package spark.rddimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object SecondarySortApp {// 第一列升序,第二列降序def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SortByKey").setMaster("local[*]")val sc = new SparkContext(conf)sc.setLogLevel("ERROR")val data = sc.textFile("/test/file/secondarySort.txt",1)// 巧妙运用List的默认排序方法val value: RDD[(String, String)] = data.coalesce(1,false).map(line => (line, line)).sortByKey(true)val value1: RDD[(String, List[String])] = data.map(line => (line.split(",")(0), line)).groupByKey(1).sortByKey(true).map(line => (line._1, line._2.toList.sortWith(_.compareTo(_) > 0)))value1.map(_._2).flatMap(_.mkString("@").split("@")).foreach(println)}}
