根据第一列降序,第二列降序。第一列相等,第二列降序。
步骤:
1)混入Ordered 和 Serializable 特质(trait),实现自定义的用于排序的key
2)将要进行二次排序的文件加载进来生成(key,value)类型的RDD
3)使用sortByKey()基于自定义的key进行二次排序
4)去除排序的key,只保留排序的结果。
实现:
class SecondarySortKey(val first:Int, val second:Int) extends Orderd[SecondarySortKey] with Serializable {
def compare(other: SecondarySortKey):Int = {
if(this.first - other.first !=0){
this.first - other.first
} else{
this.second - other.second
}
}
}
Object SecondarySortApp{
def main(args:Array[String]) {
val conf = new SparkConf().setAppName("SecondAry").setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("file://aa.txt")
val pariWithSortKey = lines.map( line => (new SecondarySortKey(line.split(" ")(0).toInt),line.split(" ")(1).toInt),line))
val sorted = pariWithSortKey.sortByKey(false)
val sortedResult = sorted.map(sortedLine => sortedLine._2)
sortedResult.collect().foreach(println(_))
}
}
package spark.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SecondarySortApp {
// 第一列升序,第二列降序
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SortByKey").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val data = sc.textFile("/test/file/secondarySort.txt",1)
// 巧妙运用List的默认排序方法
val value: RDD[(String, String)] = data.coalesce(1,false).map(line => (line, line)).sortByKey(true)
val value1: RDD[(String, List[String])] = data.map(line => (line.split(",")(0), line)).groupByKey(1).sortByKey(true).map(line => (line._1, line._2.toList.sortWith(_.compareTo(_) > 0)))
value1.map(_._2).flatMap(_.mkString("@").split("@")).foreach(println)
}
}