java 版本
自定义secondarySortKey
/*** 自定义的二次排序key* @author Administrator**/public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable {private static final long serialVersionUID = -2366006422945129991L;// 首先在自定义key里面,定义需要进行排序的列private int first;private int second;public SecondarySortKey(int first, int second) {this.first = first;this.second = second;}@Overridepublic boolean $greater(SecondarySortKey other) {if(this.first > other.getFirst()) {return true;} else if(this.first == other.getFirst() &&this.second > other.getSecond()) {return true;}return false;}@Overridepublic boolean $greater$eq(SecondarySortKey other) {if(this.$greater(other)) {return true;} else if(this.first == other.getFirst() &&this.second == other.getSecond()) {return true;}return false;}@Overridepublic boolean $less(SecondarySortKey other) {if(this.first < other.getFirst()) {return true;} else if(this.first == other.getFirst() &&this.second < other.getSecond()) {return true;}return false;}@Overridepublic boolean $less$eq(SecondarySortKey other) {if(this.$less(other)) {return true;} else if(this.first == other.getFirst() &&this.second == other.getSecond()) {return true;}return false;}@Overridepublic int compare(SecondarySortKey other) {if(this.first - other.getFirst() != 0) {return this.first - other.getFirst();} else {return this.second - other.getSecond();}}@Overridepublic int compareTo(SecondarySortKey other) {if(this.first - other.getFirst() != 0) {return this.first - other.getFirst();} else {return this.second - other.getSecond();}}// 为要进行排序的多个列,提供getter和setter方法,以及hashcode和equals方法public int getFirst() {return first;}public void setFirst(int first) {this.first = first;}public int getSecond() {return second;}public void setSecond(int second) {this.second = second;}@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + first;result = prime * result + second;return result;}@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;SecondarySortKey other = (SecondarySortKey) obj;if (first != other.first)return false;if (second != other.second)return false;return true;}
应用程序
/*** 二次排序* 1、实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法* 2、将包含文本的RDD,映射成key为自定义key,value为文本的JavaPairRDD* 3、使用sortByKey算子按照自定义的key进行排序* 4、再次映射,剔除自定义的key,只保留文本行* @author Administrator**/public class SecondarySort {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("SecondarySort").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt");JavaPairRDD<SecondarySortKey, String> pairs = lines.mapToPair(new PairFunction<String, SecondarySortKey, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<SecondarySortKey, String> call(String line) throws Exception {String[] lineSplited = line.split(" ");SecondarySortKey key = new SecondarySortKey(Integer.valueOf(lineSplited[0]),Integer.valueOf(lineSplited[1]));return new Tuple2<SecondarySortKey, String>(key, line);}});JavaPairRDD<SecondarySortKey, String> sortedPairs = pairs.sortByKey();JavaRDD<String> sortedLines = sortedPairs.map(new Function<Tuple2<SecondarySortKey,String>, String>() {private static final long serialVersionUID = 1L;@Overridepublic String call(Tuple2<SecondarySortKey, String> v1) throws Exception {return v1._2;}});sortedLines.foreach(new VoidFunction<String>() {private static final long serialVersionUID = 1L;@Overridepublic void call(String t) throws Exception {System.out.println(t);}});sc.close();}
scala 版
secondarykey
class SecondSortKey(val first: Int, val second: Int)extends Ordered[SecondSortKey] with Serializable {def compare(that: SecondSortKey): Int = {if(this.first - that.first != 0) {this.first - that.first} else {this.second - that.second}}
应用程序
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SecondSort").setMaster("local")val sc = new SparkContext(conf)val lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt", 1)val pairs = lines.map { line => (new SecondSortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),line)}val sortedPairs = pairs.sortByKey()val sortedLines = sortedPairs.map(sortedPair => sortedPair._2)sortedLines.foreach { sortedLine => println(sortedLine) }}
