1、对文本文件内的数字,取最大的前3个。
/**
* 取最大的前3个数字
* @author Administrator
*
*/
public class Top3 {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Top3")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//top.txt");
JavaPairRDD<Integer, String> pairs = lines.mapToPair(
new PairFunction<String, Integer, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Integer, String> call(String t) throws Exception {
return new Tuple2<Integer, String>(Integer.valueOf(t), t);
}
});
JavaPairRDD<Integer, String> sortedPairs = pairs.sortByKey(false);
JavaRDD<Integer> sortedNumbers = sortedPairs.map(
new Function<Tuple2<Integer,String>, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Tuple2<Integer, String> v1) throws Exception {
return v1._1;
}
});
List<Integer> sortedNumberList = sortedNumbers.take(3);
for(Integer num : sortedNumberList) {
System.out.println(num);
}
sc.close();
}
}
2、对每个班级内的学生成绩,取出前3名。(分组取topn)
/**
* 分组取top3
* @author Administrator
*
*/
public class GroupTop3 {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Top3")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//score.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String line) throws Exception {
String[] lineSplited = line.split(" ");
return new Tuple2<String, Integer>(lineSplited[0],
Integer.valueOf(lineSplited[1]));
}
});
JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();
JavaPairRDD<String, Iterable<Integer>> top3Score = groupedPairs.mapToPair(
new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Iterable<Integer>> call(
Tuple2<String, Iterable<Integer>> classScores)
throws Exception {
Integer[] top3 = new Integer[3];
String className = classScores._1;
Iterator<Integer> scores = classScores._2.iterator();
while(scores.hasNext()) {
Integer score = scores.next();
for(int i = 0; i < 3; i++) {
if(top3[i] == null) {
top3[i] = score;
break;
} else if(score > top3[i]) {
for(int j = 2; j > i; j--) {
top3[j] = top3[j - 1];
}
top3[i] = score;
break;
}
}
}
return new Tuple2<String,
Iterable<Integer>>(className, Arrays.asList(top3));
}
});
top3Score.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
System.out.println("class: " + t._1);
Iterator<Integer> scoreIterator = t._2.iterator();
while(scoreIterator.hasNext()) {
Integer score = scoreIterator.next();
System.out.println(score);
}
System.out.println("=======================================");
}
});
sc.close();
}
scala版本
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("top3").setMaster("local");
val sc = new SparkContext(conf);
val lines = sc.textFile("/Users/gaozhen/tmp/top.txt");
val pairs = lines.map{ line => (line.toInt, line)}
val sortedPairs = pairs.sortByKey(false);
val sortedNumbers = sortedPairs.map(sortedPair => sortedPair._1)
val top3Number = sortedNumbers.take(3);
for( num <- top3Number) {
println(num)
}
}