banner.webp

1.Spark Streaming

  1. Spark StreamingSpark核心API的一个扩展,可以实现高吞吐、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,从数据源获取数据之后,可以使用诸如mapreducejoinwindow等高级函数进行复杂语法的处理。最后还可以将处理结果存储到文件系统。
  2. Spark的各个子框架都是基于核心Spark的,Spark Streaming在内部的处理机制是,接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处理这些批数据,最终得到处理后的一批批结果数据。
  3. 对应的批数据,在Spark内核对应一个RDD实例,因此,对应流数据的DStream可以看成是一组RDDs,即RDD的一个序列。通俗点理解的话就是,在流数据分成一批一批后,通过一个先进先出的队列,然后Spark Engine从该队列中依次取出一个个批数据,把批数据封装成一个RDD,然后进行处理,这是一个典型的生产者消费者模型,对应的就有生产者消费者模型的问题,即如何协调生产效率和消费速率。

2.模拟Spark Streaming

首先我们安装在服务器安装nc(netcat)

  1. yum install -y nc

新建一个Maven项目,导入如下依赖

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming_2.12</artifactId>
  4. <version>3.2.1</version>
  5. </dependency>

新建SparkStreaming.java类

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.streaming.Durations;
  3. import org.apache.spark.streaming.api.java.JavaDStream;
  4. import org.apache.spark.streaming.api.java.JavaPairDStream;
  5. import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  6. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  7. import scala.Tuple2;
  8. import java.util.Arrays;
  9. public class SparkStreaming {
  10. public static void main(String[] args) throws InterruptedException {
  11. // Create a local StreamingContext with two working thread and batch interval of 1 second
  12. SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
  13. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
  14. // Create a DStream that will connect to hostname:port, like localhost:9999
  15. JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
  16. // Split each line into words
  17. JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
  18. // Count each word in each batch
  19. JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
  20. JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
  21. // Print the first ten elements of each RDD generated in this DStream to the console
  22. wordCounts.print();
  23. jssc.start(); // Start the computation
  24. jssc.awaitTermination(); // Wait for the computation to terminate
  25. }
  26. }

打包上传到服务器,执行下面命令

  1. spark-submit --class SparkStreaming --master local[2] --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:/usr/local/log4j.xml" /usr/local/spark_streaming-1.0.jar

log4j.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
  3. <log4j:configuration xmlns:log4j='http://jakarta.apache.org/log4j/' >
  4. <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
  5. <param name="file" value="spark_streaming.log" />
  6. <param name="threshold" value="INFO"/>
  7. <param name="DatePattern" value="yyyyMMdd"/>
  8. <param name="append" value="true" />
  9. <layout class="org.apache.log4j.PatternLayout">
  10. <param name="ConversionPattern" value="%d [%t] %-5p %c(%L) - %m%n"/>
  11. </layout>
  12. </appender>
  13. <root>
  14. //指出日志级别
  15. <priority value ="INFO"/>
  16. <appender-ref ref="FILE"/>
  17. </root>
  18. </log4j:configuration>

然后我们启动nc

  1. nc -lk 9999

然后我们随便输入一个单词,比如abc,然后查看控制台
1.png
然后再输入abcdef
2.png
OK,到这里SparkStreaming的基本Demo就已完成,想让spark-submit后台执行只需要在命令前加nohup即可~