1、对文本文件内的数字,取最大的前3个。
/*** 取最大的前3个数字* @author Administrator**/public class Top3 {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("Top3").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//top.txt");JavaPairRDD<Integer, String> pairs = lines.mapToPair(new PairFunction<String, Integer, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(String t) throws Exception {return new Tuple2<Integer, String>(Integer.valueOf(t), t);}});JavaPairRDD<Integer, String> sortedPairs = pairs.sortByKey(false);JavaRDD<Integer> sortedNumbers = sortedPairs.map(new Function<Tuple2<Integer,String>, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Integer call(Tuple2<Integer, String> v1) throws Exception {return v1._1;}});List<Integer> sortedNumberList = sortedNumbers.take(3);for(Integer num : sortedNumberList) {System.out.println(num);}sc.close();}}
2、对每个班级内的学生成绩,取出前3名。(分组取topn)
/*** 分组取top3* @author Administrator**/public class GroupTop3 {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("Top3").setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//score.txt");JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String line) throws Exception {String[] lineSplited = line.split(" ");return new Tuple2<String, Integer>(lineSplited[0],Integer.valueOf(lineSplited[1]));}});JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();JavaPairRDD<String, Iterable<Integer>> top3Score = groupedPairs.mapToPair(new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> classScores)throws Exception {Integer[] top3 = new Integer[3];String className = classScores._1;Iterator<Integer> scores = classScores._2.iterator();while(scores.hasNext()) {Integer score = scores.next();for(int i = 0; i < 3; i++) {if(top3[i] == null) {top3[i] = score;break;} else if(score > top3[i]) {for(int j = 2; j > i; j--) {top3[j] = top3[j - 1];}top3[i] = score;break;}}}return new Tuple2<String,Iterable<Integer>>(className, Arrays.asList(top3));}});top3Score.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Iterable<Integer>> t) throws Exception {System.out.println("class: " + t._1);Iterator<Integer> scoreIterator = t._2.iterator();while(scoreIterator.hasNext()) {Integer score = scoreIterator.next();System.out.println(score);}System.out.println("=======================================");}});sc.close();}
scala版本
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("top3").setMaster("local");val sc = new SparkContext(conf);val lines = sc.textFile("/Users/gaozhen/tmp/top.txt");val pairs = lines.map{ line => (line.toInt, line)}val sortedPairs = pairs.sortByKey(false);val sortedNumbers = sortedPairs.map(sortedPair => sortedPair._1)val top3Number = sortedNumbers.take(3);for( num <- top3Number) {println(num)}}
