1.Spark Streaming
Spark Streaming是Spark核心API的一个扩展,可以实现高吞吐、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂语法的处理。最后还可以将处理结果存储到文件系统。
Spark的各个子框架都是基于核心Spark的,Spark Streaming在内部的处理机制是,接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处理这些批数据,最终得到处理后的一批批结果数据。
对应的批数据,在Spark内核对应一个RDD实例,因此,对应流数据的DStream可以看成是一组RDDs,即RDD的一个序列。通俗点理解的话就是,在流数据分成一批一批后,通过一个先进先出的队列,然后Spark Engine从该队列中依次取出一个个批数据,把批数据封装成一个RDD,然后进行处理,这是一个典型的生产者消费者模型,对应的就有生产者消费者模型的问题,即如何协调生产效率和消费速率。
2.模拟Spark Streaming
首先我们安装在服务器安装nc(netcat)
yum install -y nc
新建一个Maven项目,导入如下依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.2.1</version>
</dependency>
新建SparkStreaming.java类
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
public class SparkStreaming {
public static void main(String[] args) throws InterruptedException {
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
}
}
打包上传到服务器,执行下面命令
spark-submit --class SparkStreaming --master local[2] --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:/usr/local/log4j.xml" /usr/local/spark_streaming-1.0.jar
log4j.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j='http://jakarta.apache.org/log4j/' >
<appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
<param name="file" value="spark_streaming.log" />
<param name="threshold" value="INFO"/>
<param name="DatePattern" value="yyyyMMdd"/>
<param name="append" value="true" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d [%t] %-5p %c(%L) - %m%n"/>
</layout>
</appender>
<root>
//指出日志级别
<priority value ="INFO"/>
<appender-ref ref="FILE"/>
</root>
</log4j:configuration>
然后我们启动nc
nc -lk 9999
然后我们随便输入一个单词,比如abc,然后查看控制台
然后再输入abcdef
OK,到这里SparkStreaming的基本Demo就已完成,想让spark-submit后台执行只需要在命令前加nohup即可~