1、词语计数器

数据文件格式
image.png

image.png

public final class _JavaWordCount {
_private static final _Pattern _SPACE
= Pattern.compile(“\s+”);

_public static void _main(String[] args) _throws _Exception {

  1. SparkConf sparkConf = _new _SparkConf()<br /> .setAppName("javaSparkPi")<br /> .setMaster("local")<br /> .set("spark.driver.host", "localhost").set("spark.testing.memory", "21474800000");
  2. JavaSparkContext jsc=_new _JavaSparkContext(sparkConf);
  3. JavaRDD<String> lines = jsc.textFile("src/main/resources/demo/kdy.txt");<br /> JavaRDD<String> words = lines.flatMap(s -> Arrays._asList_(_SPACE_.split(s)).iterator());
  4. JavaPairRDD<String, Integer> ones = words.mapToPair(s -> _new _Tuple2<>(s, 1));
  5. JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
  6. _List_<Tuple2<String, Integer>> output = counts.collect();<br /> _for _(Tuple2<?,?> tuple : output) {<br /> System._out_.println("输出:"+tuple._1() + ": " + tuple._2());<br /> }<br /> jsc.stop();<br /> }<br />}

2、计算pi

//计算pi
public Map
calPi(_int _slices){

  1. JavaSparkContext javaSparkContext=_new _JavaSparkContext(sparkConf);
  2. _if_(jars!=_null_){<br /> javaSparkContext.addJar(jars);<br /> }
  3. _Map_<String, Object> result = _new _HashMap<>();
  4. _int _n = 100000 * slices;<br /> _List_<Integer> l = _new _ArrayList<>(n);<br /> _for _(_int _i = 0; i < n; i++) {<br /> l.add(i);<br /> }
  5. JavaRDD<Integer> dataSet = javaSparkContext.parallelize(l, slices);
  6. _int _count = dataSet.map(integer -> {<br /> _double _x = Math._random_() * 2 - 1;<br /> _double _y = Math._random_() * 2 - 1;<br /> _return _(x * x + y * y <= 1) ? 1 : 0;<br /> }).reduce((integer, integer2) -> integer + integer2);
  7. System._out_.println("Pi is roughly " + 4.0 * count / n);<br /> result.put("Pi is roughly:" ,4.0 * count / n);
  8. javaSparkContext.stop();<br /> _return _result;<br />}

3、topk

public Map javaStreamCalculateTopTen(String path,_int _k) {

  1. JavaSparkContext javaSparkContext=_new _JavaSparkContext(sparkConf);
  2. _if_(jars!=_null_){<br /> javaSparkContext.addJar(jars);<br /> }
  3. _Map_<String, Object> result = _new _HashMap<>();
  4. _//JavaRDD<String> data = javaSparkContext.textFile("hdfs://mini1:9000/3.txt");//读取hdfs文件<br /> //JavaRDD<String> data = sc.textFile("C:\3.txt");//读取本地文件<br /> //JavaRDD<Object> data = sc.parallelize(new ArrayList<>());//读取内存,将list转化为rdd 这个rdd为空,需要提前存入数据在转化
  5. //text.File 按行读取本地文件<br /> _JavaRDD<String> lines = javaSparkContext.textFile(path).cache();
  6. System._out_.println();<br /> System._out_.println("-------------------------------------------------------");
  7. _//行数<br /> _System._out_.println(lines.count());
  8. result.put("总行数",lines.count());
  9. JavaRDD<String> words = lines.flatMap(str -> Arrays._asList_(str.split("\\s+")).iterator());
  10. System._out_.println();<br /> System._out_.println("-------------------------------------------------------");<br /> System._out_.println(words);
  11. JavaPairRDD<String, Integer> ones = words.mapToPair(str -> _new _Tuple2<String, Integer>(str, 1));
  12. JavaPairRDD<String, Integer> counts = ones.reduceByKey((Integer i1, Integer i2) -> (i1 + i2));
  13. System._out_.println();<br /> System._out_.println("-------------------------------------------------------");<br /> System._out_.println(counts.countByValue());<br /> System._out_.println(counts.countByKey());
  14. JavaPairRDD<Integer, String> temp = counts.mapToPair(tuple -> _new _Tuple2<Integer, String>(tuple._2, tuple._1));
  15. JavaPairRDD<String, Integer> sorted = temp.sortByKey(_false_).mapToPair(tuple -> _new _Tuple2<String, Integer>(tuple._2, tuple._1));
  16. System._out_.println();<br /> System._out_.println("-------------------------------------------------------");<br /> System._out_.println(sorted.count());_<br />_ List_<Tuple2<String, Integer>> output = sorted.top(k,_new _MyComparator());
  17. _for _(Tuple2<String, Integer> tuple : output) {<br /> result.put(tuple._1(), tuple._2());<br /> }<br /> javaSparkContext.stop();<br /> _return _result;<br /> }