1、对文本文件内的数字,取最大的前3个。

  1. /**
  2. * 取最大的前3个数字
  3. * @author Administrator
  4. *
  5. */
  6. public class Top3 {
  7. public static void main(String[] args) {
  8. SparkConf conf = new SparkConf()
  9. .setAppName("Top3")
  10. .setMaster("local");
  11. JavaSparkContext sc = new JavaSparkContext(conf);
  12. JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//top.txt");
  13. JavaPairRDD<Integer, String> pairs = lines.mapToPair(
  14. new PairFunction<String, Integer, String>() {
  15. private static final long serialVersionUID = 1L;
  16. @Override
  17. public Tuple2<Integer, String> call(String t) throws Exception {
  18. return new Tuple2<Integer, String>(Integer.valueOf(t), t);
  19. }
  20. });
  21. JavaPairRDD<Integer, String> sortedPairs = pairs.sortByKey(false);
  22. JavaRDD<Integer> sortedNumbers = sortedPairs.map(
  23. new Function<Tuple2<Integer,String>, Integer>() {
  24. private static final long serialVersionUID = 1L;
  25. @Override
  26. public Integer call(Tuple2<Integer, String> v1) throws Exception {
  27. return v1._1;
  28. }
  29. });
  30. List<Integer> sortedNumberList = sortedNumbers.take(3);
  31. for(Integer num : sortedNumberList) {
  32. System.out.println(num);
  33. }
  34. sc.close();
  35. }
  36. }

2、对每个班级内的学生成绩,取出前3名。(分组取topn)

  1. /**
  2. * 分组取top3
  3. * @author Administrator
  4. *
  5. */
  6. public class GroupTop3 {
  7. public static void main(String[] args) {
  8. SparkConf conf = new SparkConf()
  9. .setAppName("Top3")
  10. .setMaster("local");
  11. JavaSparkContext sc = new JavaSparkContext(conf);
  12. JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//score.txt");
  13. JavaPairRDD<String, Integer> pairs = lines.mapToPair(
  14. new PairFunction<String, String, Integer>() {
  15. private static final long serialVersionUID = 1L;
  16. @Override
  17. public Tuple2<String, Integer> call(String line) throws Exception {
  18. String[] lineSplited = line.split(" ");
  19. return new Tuple2<String, Integer>(lineSplited[0],
  20. Integer.valueOf(lineSplited[1]));
  21. }
  22. });
  23. JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();
  24. JavaPairRDD<String, Iterable<Integer>> top3Score = groupedPairs.mapToPair(
  25. new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() {
  26. private static final long serialVersionUID = 1L;
  27. @Override
  28. public Tuple2<String, Iterable<Integer>> call(
  29. Tuple2<String, Iterable<Integer>> classScores)
  30. throws Exception {
  31. Integer[] top3 = new Integer[3];
  32. String className = classScores._1;
  33. Iterator<Integer> scores = classScores._2.iterator();
  34. while(scores.hasNext()) {
  35. Integer score = scores.next();
  36. for(int i = 0; i < 3; i++) {
  37. if(top3[i] == null) {
  38. top3[i] = score;
  39. break;
  40. } else if(score > top3[i]) {
  41. for(int j = 2; j > i; j--) {
  42. top3[j] = top3[j - 1];
  43. }
  44. top3[i] = score;
  45. break;
  46. }
  47. }
  48. }
  49. return new Tuple2<String,
  50. Iterable<Integer>>(className, Arrays.asList(top3));
  51. }
  52. });
  53. top3Score.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
  54. private static final long serialVersionUID = 1L;
  55. @Override
  56. public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
  57. System.out.println("class: " + t._1);
  58. Iterator<Integer> scoreIterator = t._2.iterator();
  59. while(scoreIterator.hasNext()) {
  60. Integer score = scoreIterator.next();
  61. System.out.println(score);
  62. }
  63. System.out.println("=======================================");
  64. }
  65. });
  66. sc.close();
  67. }

scala版本

  1. def main(args: Array[String]): Unit = {
  2. val conf = new SparkConf().setAppName("top3").setMaster("local");
  3. val sc = new SparkContext(conf);
  4. val lines = sc.textFile("/Users/gaozhen/tmp/top.txt");
  5. val pairs = lines.map{ line => (line.toInt, line)}
  6. val sortedPairs = pairs.sortByKey(false);
  7. val sortedNumbers = sortedPairs.map(sortedPair => sortedPair._1)
  8. val top3Number = sortedNumbers.take(3);
  9. for( num <- top3Number) {
  10. println(num)
  11. }
  12. }