java 版本
    自定义secondarySortKey

    1. /**
    2. * 自定义的二次排序key
    3. * @author Administrator
    4. *
    5. */
    6. public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable {
    7. private static final long serialVersionUID = -2366006422945129991L;
    8. // 首先在自定义key里面,定义需要进行排序的列
    9. private int first;
    10. private int second;
    11. public SecondarySortKey(int first, int second) {
    12. this.first = first;
    13. this.second = second;
    14. }
    15. @Override
    16. public boolean $greater(SecondarySortKey other) {
    17. if(this.first > other.getFirst()) {
    18. return true;
    19. } else if(this.first == other.getFirst() &&
    20. this.second > other.getSecond()) {
    21. return true;
    22. }
    23. return false;
    24. }
    25. @Override
    26. public boolean $greater$eq(SecondarySortKey other) {
    27. if(this.$greater(other)) {
    28. return true;
    29. } else if(this.first == other.getFirst() &&
    30. this.second == other.getSecond()) {
    31. return true;
    32. }
    33. return false;
    34. }
    35. @Override
    36. public boolean $less(SecondarySortKey other) {
    37. if(this.first < other.getFirst()) {
    38. return true;
    39. } else if(this.first == other.getFirst() &&
    40. this.second < other.getSecond()) {
    41. return true;
    42. }
    43. return false;
    44. }
    45. @Override
    46. public boolean $less$eq(SecondarySortKey other) {
    47. if(this.$less(other)) {
    48. return true;
    49. } else if(this.first == other.getFirst() &&
    50. this.second == other.getSecond()) {
    51. return true;
    52. }
    53. return false;
    54. }
    55. @Override
    56. public int compare(SecondarySortKey other) {
    57. if(this.first - other.getFirst() != 0) {
    58. return this.first - other.getFirst();
    59. } else {
    60. return this.second - other.getSecond();
    61. }
    62. }
    63. @Override
    64. public int compareTo(SecondarySortKey other) {
    65. if(this.first - other.getFirst() != 0) {
    66. return this.first - other.getFirst();
    67. } else {
    68. return this.second - other.getSecond();
    69. }
    70. }
    71. // 为要进行排序的多个列,提供getter和setter方法,以及hashcode和equals方法
    72. public int getFirst() {
    73. return first;
    74. }
    75. public void setFirst(int first) {
    76. this.first = first;
    77. }
    78. public int getSecond() {
    79. return second;
    80. }
    81. public void setSecond(int second) {
    82. this.second = second;
    83. }
    84. @Override
    85. public int hashCode() {
    86. final int prime = 31;
    87. int result = 1;
    88. result = prime * result + first;
    89. result = prime * result + second;
    90. return result;
    91. }
    92. @Override
    93. public boolean equals(Object obj) {
    94. if (this == obj)
    95. return true;
    96. if (obj == null)
    97. return false;
    98. if (getClass() != obj.getClass())
    99. return false;
    100. SecondarySortKey other = (SecondarySortKey) obj;
    101. if (first != other.first)
    102. return false;
    103. if (second != other.second)
    104. return false;
    105. return true;
    106. }

    应用程序

    1. /**
    2. * 二次排序
    3. * 1、实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法
    4. * 2、将包含文本的RDD,映射成key为自定义key,value为文本的JavaPairRDD
    5. * 3、使用sortByKey算子按照自定义的key进行排序
    6. * 4、再次映射,剔除自定义的key,只保留文本行
    7. * @author Administrator
    8. *
    9. */
    10. public class SecondarySort {
    11. public static void main(String[] args) {
    12. SparkConf conf = new SparkConf()
    13. .setAppName("SecondarySort")
    14. .setMaster("local");
    15. JavaSparkContext sc = new JavaSparkContext(conf);
    16. JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt");
    17. JavaPairRDD<SecondarySortKey, String> pairs = lines.mapToPair(
    18. new PairFunction<String, SecondarySortKey, String>() {
    19. private static final long serialVersionUID = 1L;
    20. @Override
    21. public Tuple2<SecondarySortKey, String> call(String line) throws Exception {
    22. String[] lineSplited = line.split(" ");
    23. SecondarySortKey key = new SecondarySortKey(
    24. Integer.valueOf(lineSplited[0]),
    25. Integer.valueOf(lineSplited[1]));
    26. return new Tuple2<SecondarySortKey, String>(key, line);
    27. }
    28. });
    29. JavaPairRDD<SecondarySortKey, String> sortedPairs = pairs.sortByKey();
    30. JavaRDD<String> sortedLines = sortedPairs.map(
    31. new Function<Tuple2<SecondarySortKey,String>, String>() {
    32. private static final long serialVersionUID = 1L;
    33. @Override
    34. public String call(Tuple2<SecondarySortKey, String> v1) throws Exception {
    35. return v1._2;
    36. }
    37. });
    38. sortedLines.foreach(new VoidFunction<String>() {
    39. private static final long serialVersionUID = 1L;
    40. @Override
    41. public void call(String t) throws Exception {
    42. System.out.println(t);
    43. }
    44. });
    45. sc.close();
    46. }

    scala 版
    secondarykey

    1. class SecondSortKey(val first: Int, val second: Int)
    2. extends Ordered[SecondSortKey] with Serializable {
    3. def compare(that: SecondSortKey): Int = {
    4. if(this.first - that.first != 0) {
    5. this.first - that.first
    6. } else {
    7. this.second - that.second
    8. }
    9. }

    应用程序

    1. def main(args: Array[String]): Unit = {
    2. val conf = new SparkConf()
    3. .setAppName("SecondSort")
    4. .setMaster("local")
    5. val sc = new SparkContext(conf)
    6. val lines = sc.textFile("C://Users//Administrator//Desktop//sort.txt", 1)
    7. val pairs = lines.map { line => (
    8. new SecondSortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),
    9. line)}
    10. val sortedPairs = pairs.sortByKey()
    11. val sortedLines = sortedPairs.map(sortedPair => sortedPair._2)
    12. sortedLines.foreach { sortedLine => println(sortedLine) }
    13. }