1. 1Socket:之前的wordcount例子,已经演示过了,StreamingContext.socketTextStream()
    2. 2HDFS文件
    3. 基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。
    4. streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory)
    5. streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
    6. Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此不会占用一个cpu core

    java

    1. /**
    2. * 基于HDFS文件的实时wordcount程序
    3. * @author Administrator
    4. *
    5. */
    6. public class HDFSWordCount {
    7. public static void main(String[] args) {
    8. SparkConf conf = new SparkConf()
    9. .setMaster("local[2]")
    10. .setAppName("HDFSWordCount");
    11. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
    12. // 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流
    13. JavaDStream<String> lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir");
    14. // 执行wordcount操作
    15. JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    16. private static final long serialVersionUID = 1L;
    17. @Override
    18. public Iterable<String> call(String line) throws Exception {
    19. return Arrays.asList(line.split(" "));
    20. }
    21. });
    22. JavaPairDStream<String, Integer> pairs = words.mapToPair(
    23. new PairFunction<String, String, Integer>() {
    24. private static final long serialVersionUID = 1L;
    25. @Override
    26. public Tuple2<String, Integer> call(String word)
    27. throws Exception {
    28. return new Tuple2<String, Integer>(word, 1);
    29. }
    30. });
    31. JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
    32. new Function2<Integer, Integer, Integer>() {
    33. private static final long serialVersionUID = 1L;
    34. @Override
    35. public Integer call(Integer v1, Integer v2) throws Exception {
    36. return v1 + v2;
    37. }
    38. });
    39. wordCounts.print();
    40. jssc.start();
    41. jssc.awaitTermination();
    42. jssc.close();
    43. }
    44. }