一、SparkStreaming 概述

1. 是什么

image.png
image.png
Spark Streaming 用于流式数据处理,输入源支持 kafka、Flume、Twitter、ZeroMQ和简单的 TCP套接字,只是对数据的数据进行一系 Spark 基本的运算 比如 map、reduce、join。window 等,运算的结果可以保存在很多地方 ,比如 hdfs 数据库等
image.png
Spark Streaming 使用离散化流作为抽象表示,叫做 DStream,它随着时间的推移,不断收到数据。在内部,它把每个时间区间内收到的数据当做 RDD 存在,一系列 RDD 共同组成了 DStream。简单来说 DStream 也是对 RDD 在实时数据处理场景的一种封装。

2. Spark Streaming 特点

  • 易用 提供了一些简单的API去使用
  • 容错 任务出错自动进行处理
  • 易整合到 Spark 体系中

    3. 架构

  • 整体

image.png

  • SparkStreaming 架构图

image.png

4. 背压机制

在 Spark 1.5 版本之前,用户如果想限制 Receiver 数据的接受速度,可以通过设置静态配置参数“spark.streaming.receiver.maxRate”来实现,但是这样固定速率,如果太小,集群资源利用率就会下降,如果太高,内存容易溢出。为了解决这个问题,Spark 从1.5 版本开始,引入 背压机制(Spark Streaming Backpressure )动态控制数据接收速率,实际上是根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver的接受率。可以通过设置属性 性“spark.streaming.backpressure.enabled” 来控制能否使用 backpressure 机制,默认是 false 。

二、DStream 入门

1. 需求

使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数

2. 实操

  • 引入 依赖

    1. <dependency>
    2. <groupId>org.apache.spark</groupId>
    3. <artifactId>spark-streaming_2.12</artifactId>
    4. <version>3.0.0</version>
    5. </dependency>
  • 代码 ```xml import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamWorkCount { def main(args: Array[String]): Unit = { //1.初始化 Spark 配置信息 val sparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“StreamWordCount”)

  1. // 2. 初始化 StreamingContext
  2. val streamingContext = new StreamingContext(sparkConf, Seconds(3))
  3. // 3. 通过监控端口创建 DStream,读进来的数据为一行行
  4. val linStreams = streamingContext.socketTextStream("localhost", 9999)
  5. // 4. 将每一行数据做切分,形成一个个单词
  6. val wordStream : DStream[String] = linStreams.flatMap(_.split(" "))
  7. // 5. 将单词映射成元组(word,1)
  8. val wordCount: DStream[(String, Int)] = wordStream.map((_, 1))
  9. // 6. 将相同的单词次数做统计 打印
  10. val wordNums = wordCount.reduceByKey(_ + _)
  11. wordNums.print()
  12. // 7. 启动 SparkStreamingContext
  13. streamingContext.start()
  14. streamingContext.awaitTermination()

} }

  1. - 启动程序并通过 netcat 发送数据:
  2. ` nc -lp 9999 `
  3. <a name="VVP04"></a>
  4. ### 3. 分析
  5. Discretized Stream Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 源语操作后的结果数据流,内部其实是 一些列连续的 RDD ,每个 RDD 含有一段时间间隔内的数据<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23124036/1648459840846-aaa6761a-07e0-4cc9-a21b-938f9cba0a09.png#clientId=ubd091462-5cc7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=273&id=u792c6220&margin=%5Bobject%20Object%5D&name=image.png&originHeight=300&originWidth=1416&originalType=binary&ratio=1&rotation=0&showTitle=false&size=81999&status=done&style=none&taskId=u0ac2e641-4484-4ed6-a641-bdf1be4f1cd&title=&width=1287.272699371843)<br />对数据的操作也是以 RDD 为单位进行的<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23124036/1648459879740-08d47584-6d3e-4ec9-9301-3f5fd112b648.png#clientId=ubd091462-5cc7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=377&id=ua13543f2&margin=%5Bobject%20Object%5D&name=image.png&originHeight=415&originWidth=1443&originalType=binary&ratio=1&rotation=0&showTitle=false&size=126884&status=done&style=none&taskId=u4024fc19-b732-415f-9e08-09eb7483f6e&title=&width=1311.8181533852892)<br />计算过程 有Spark Engine 来完成<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23124036/1648459906492-57a94433-b578-49f0-b69a-fd816a8444b9.png#clientId=ubd091462-5cc7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=223&id=ua68d07a5&margin=%5Bobject%20Object%5D&name=image.png&originHeight=245&originWidth=1423&originalType=binary&ratio=1&rotation=0&showTitle=false&size=93960&status=done&style=none&taskId=u98325556-bede-4022-a0d0-77c196dbc0f&title=&width=1293.6363355975511)
  6. <a name="kj7bT"></a>
  7. ## 三、DStream 创建
  8. <a name="FcP81"></a>
  9. ### 1. RDD 队列
  10. - 需求: 循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算WordCount
  11. - 分析: 使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列的RDD 都会被当做 一个 DSsteam 来处理
  12. - 代码
  13. ```scala
  14. import org.apache.spark.SparkConf
  15. import org.apache.spark.rdd.RDD
  16. import org.apache.spark.streaming.{Seconds, StreamingContext}
  17. import scala.collection.mutable
  18. object RDDStream {
  19. def main(args: Array[String]) {
  20. //1.初始化 Spark 配置信息
  21. val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
  22. //2.初始化 SparkStreamingContext
  23. val ssc = new StreamingContext(conf, Seconds(4))
  24. //3.创建 RDD 队列
  25. val rddQueue = new mutable.Queue[RDD[Int]]()
  26. //4.创建 QueueInputDStream
  27. val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
  28. //5.处理队列中的 RDD 数据
  29. val mappedStream = inputStream.map((_,1))
  30. val reducedStream = mappedStream.reduceByKey(_ + _)
  31. //6.打印结果
  32. reducedStream.print()
  33. //7.启动任务
  34. ssc.start()
  35. //8.循环创建并向 RDD 队列中放入 RDD
  36. for (i <- 1 to 5) {
  37. rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
  38. Thread.sleep(2000)
  39. }
  40. ssc.awaitTermination()
  41. }
  42. }

2. 自定义数据源

需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

  • 需求 自定义数据源,实现监控某个端口号,获取该端口号内容。
  • 实现 ```scala

class CustomerReceiver(host: String, port: Int) extends ReceiverString { //最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Spark override def onStart(): Unit = { new Thread(“Socket Receiver”) { override def run() { receive() } }.start() }

//读数据并将数据发送给 Spark def receive(): Unit = { //创建一个 Socket var socket: Socket = new Socket(host, port) //定义一个变量,用来接收端口传过来的数据 var input: String = null //创建一个 BufferedReader 用于读取端口传来的数据 val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8)) //读取数据 input = reader.readLine() //当 receiver 没有关闭并且输入数据不为空,则循环发送数据给 Spark while (!isStopped() && input != null) { store(input) input = reader.readLine() } //跳出循环则关闭资源 reader.close() socket.close() //重启任务 restart(“restart”) }

override def onStop(): Unit = {} }

  1. - 使用
  2. ```scala
  3. object FileStream {
  4. def main(args: Array[String]): Unit = {
  5. //1.初始化 Spark 配置信息
  6. val sparkConf = new SparkConf().setMaster("local[*]")
  7. .setAppName("StreamWordCount")
  8. //2.初始化 SparkStreamingContext
  9. val ssc = new StreamingContext(sparkConf, Seconds(5))
  10. //3.创建自定义 receiver 的 Streaming
  11. val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))
  12. //4.将每一行数据做切分,形成一个个单词
  13. val wordStream = lineStream.flatMap(_.split("\t"))
  14. //5.将单词映射成元组(word,1)
  15. val wordAndOneStream = wordStream.map((_, 1))
  16. //6.将相同的单词次数做统计
  17. val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)
  18. //7.打印
  19. wordAndCountStream.print()
  20. //8.启动 SparkStreamingContext
  21. ssc.start()
  22. ssc.awaitTermination()
  23. }

3. kafka 数据源

1. 版本选择

  • ReceiverAPI : 使用一个专门的 Executor 去接受数据,然后将数据发送给其他 Executor 去执行。这样存在一个大的弊端,接受的 Executor 和执行的 Executor 速度可能会不一样,如果 接受数据的速度 大于计算的速度,那么数据会积压,最终导致内存溢出。早期版本使用的是这个,现在不用了
  • DirectAPI : 直接有计算节点去接受kafka 的数据进行主动消费,这样接受的速度取决于自身。

    2. Kafka 0-10 Direct 模式

  • 需求 : 通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

  • 导入依赖
    1. <dependency>
    2. <groupId>org.apache.spark</groupId>
    3. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    4. <version>3.0.0</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>com.fasterxml.jackson.core</groupId>
    8. <artifactId>jackson-core</artifactId>
    9. <version>2.10.1</version>
    10. </dependency>

    3. 编写代码

    ```scala import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils,
    1. LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext} object DirectAPI { def main(args: Array[String]): Unit = { //1.创建 SparkConf val sparkConf: SparkConf = new SparkConf().setAppName(“ReceiverWordCount”).setMaster(“local[*]”) //2.创建 StreamingContext val ssc = new StreamingContext(sparkConf, Seconds(3)) //3.定义 Kafka 参数 val kafkaPara: Map[String, Object] = Map[String, Object](
    1. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
    2. "linux1:9092,linux2:9092,linux3:9092",
    3. ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
    4. "key.deserializer" ->
    5. "org.apache.kafka.common.serialization.StringDeserializer",
    6. "value.deserializer" ->
    7. "org.apache.kafka.common.serialization.StringDeserializer"
    ) //4.读取 Kafka 数据创建 DStream val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
    1. LocationStrategies.PreferConsistent,
    2. ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))
    //5.将每条消息的 KV 取出 val valueDStream: DStream[String] = kafkaDStream.map(record => record.value()) //6.计算 WordCount valueDStream.flatMap(.split(“ “)) .map((, 1)) .reduceByKey( + ) .print() //7.开启任务 ssc.start() ssc.awaitTermination() } }
  1. <a name="n6mpc"></a>
  2. ## 四、DStream 转换
  3. DStream 上的操作 与RDD类似,分为 Transformations(转换) 和 output operations(输出),此外还有一些比较特殊的源语 : updateStateByKey()、transform()以及各种 Window 相关的原语。
  4. <a name="HXnHH"></a>
  5. ### 1. 无状态转化操作
  6. 无状态转化操作 就是简单把 RDD的 转化操作应用到每个批次上,并不会保存之前的数据。下面是常见的无状态转化操作。<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/23124036/1648462398410-47c519fd-d3fe-47b0-8bd0-cb668f58866a.png#clientId=ubd091462-5cc7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=522&id=u51adbc61&margin=%5Bobject%20Object%5D&name=image.png&originHeight=574&originWidth=1166&originalType=binary&ratio=1&rotation=0&showTitle=false&size=151317&status=done&style=none&taskId=u6e0ef463-150a-4596-a3c9-fc98a479207&title=&width=1059.9999770251193)<br />注意,针对键值对的 DStream 转化操作(比如 reduceByKey()) 要添加 import StreamingContext._才能在 Scala 中使用。<br />这些函数虽然看起来像是作用到整个流上一样,其实是作用在 DSteam 内部的每一个 RDD上,并且是无状态的,比如 reduceBykey,会聚合每个 RDD 内部是数据,但是不会聚合 RDD 之间的数据。
  7. <a name="FISaD"></a>
  8. #### 1. TransForm
  9. Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。默认就是
  10. ```scala
  11. import org.apache.spark.SparkConf
  12. import org.apache.spark.rdd.RDD
  13. import org.apache.spark.streaming.dstream.DStream
  14. import org.apache.spark.streaming.{Seconds, StreamingContext}
  15. object StreamWorkCount {
  16. def main(args: Array[String]): Unit = {
  17. //1.初始化 Spark 配置信息
  18. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
  19. // 2. 初始化 StreamingContext
  20. val streamingContext = new StreamingContext(sparkConf, Seconds(3))
  21. // 3. 通过监控端口创建 DStream,读进来的数据为一行行
  22. val linStreams = streamingContext.socketTextStream("localhost", 9999)
  23. val wordAndCountDStream: DStream[(String, Int)] = linStreams.transform(rdd =>
  24. {
  25. val words: RDD[String] = rdd.flatMap(_.split(" "))
  26. val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
  27. val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
  28. value
  29. })
  30. wordAndCountDStream.print()
  31. // 7. 启动 SparkStreamingContext
  32. streamingContext.start()
  33. streamingContext.awaitTermination()
  34. }
  35. }

2. join

只有两个流之间的 join 批次大小一致才能同时触发计算,计算过程就是对当前批次的两个流中各自的 RDD 进行 join,跟Spark core 中两个 RDD join 效果相同

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.dstream.DStream
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. object SparkStreaming06_State_Join {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  7. val ssc = new StreamingContext(sparkConf, Seconds(5))
  8. val data9999 = ssc.socketTextStream("localhost", 9999)
  9. val data8888 = ssc.socketTextStream("localhost", 8888)
  10. val map9999: DStream[(String, Int)] = data9999.map((_,9))
  11. val map8888: DStream[(String, Int)] = data8888.map((_,8))
  12. // 所谓的DStream的Join操作,其实就是两个RDD的join
  13. val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)
  14. joinDS.print()
  15. ssc.start()
  16. ssc.awaitTermination()
  17. }
  18. }

2.有状态转化操作

1. UpdateStateByKey

UpdateStateByKey 原语 用于记录历史记录,它提供了一个对状态变量的访问,用于键值形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。返回结果是有个新的 DStream,内部的RDD 序列是每个时间区间对应的(键、状态)组成

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.streaming.dstream.DStream
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. object SparkStreaming06_State_Join {
  5. def main(args: Array[String]): Unit = {
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  7. val ssc = new StreamingContext(sparkConf, Seconds(3))
  8. ssc.checkpoint("cp")
  9. // 无状态数据操作,只对当前的采集周期内的数据进行处理
  10. // 在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
  11. // 使用有状态操作时,需要设定检查点路径
  12. val datas = ssc.socketTextStream("localhost", 9999)
  13. val wordToOne = datas.map((_,1))
  14. //val wordToCount = wordToOne.reduceByKey(_+_)
  15. // updateStateByKey:根据key对数据的状态进行更新
  16. // 传递的参数中含有两个值
  17. // 第一个值表示相同的key的value数据
  18. // 第二个值表示缓存区相同key的value数据
  19. val state = wordToOne.updateStateByKey(
  20. ( seq:Seq[Int], buff:Option[Int] ) => {
  21. val newCount = buff.getOrElse(0) + seq.sum
  22. Option(newCount)
  23. }
  24. )
  25. state.print()
  26. ssc.start()
  27. ssc.awaitTermination()
  28. }
  29. }
  • 发送数据

    nc -lp 999
    hello spark
    hello spark
    hello allen
    
  • 结果

image.png

2. WindowOperations

WindowOperations 可以设置 窗口的大小和滑动窗口的间隔来动态获取当前 Streaming 允许的状态。需要两个参数,窗口时长和滑动步长, 注意:这两者都必须为采集周期大小的整数倍。

  • 窗口时长 : 计算内容的时间范围
  • 滑动步长:间隔多久执行一次

      def main(args: Array[String]): Unit = {
    
          val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
          val ssc = new StreamingContext(sparkConf, Seconds(3))
    
          val lines = ssc.socketTextStream("localhost", 9999)
          val wordToOne = lines.map((_,1))
    
          // 窗口的范围应该是采集周期的整数倍
          // 窗口可以滑动的,但是默认情况下,一个采集周期进行滑动
          // 这样的话,可能会出现重复数据的计算,为了避免这种情况,可以改变滑动的滑动(步长)
          val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6))
    
          val wordToCount = windowDS.reduceByKey(_+_)
    
          wordToCount.print()
    
          ssc.start()
          ssc.awaitTermination()
      }
    
  • 常用方法

    • window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream;
    • countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
    • reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
    • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。
    • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的 reduce 函数”,也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式传入)。如前述函数,reduce 任务的数量通过可选参数来配置。
    • countByWindow()和 countByValueAndWindow()作为对数据进行计数操作的简写。countByWindow()返回一个表示每个窗口中元素个数的 DStream,而 countByValueAndWindow()返回的 DStream 则包含窗口中每个值的个数。

image.png

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_,1))

    // reduceByKeyAndWindow : 当窗口范围比较大,但是滑动幅度比较小,那么可以采用增加数据和删除数据的方式
    //参数意义:加上新进入窗口的批次中的元素 移除离开窗口的老批次中的元素 窗口时长  滑动步长
    // 无需重复计算,提升性能。
    val windowDS: DStream[(String, Int)] =
    wordToOne.reduceByKeyAndWindow(
      (x:Int, y:Int) => { x + y},
      (x:Int, y:Int) => {x - y},
      Seconds(9), Seconds(3))

    windowDS.print()

    ssc.start()
    ssc.awaitTermination()
  }

五、DStream

输出操作是将处理转化之后的数据进行输出,DStream 也是一个惰性操作,只有在执行输出操作后才会执行。常见的输出操作有:

  • print(): 打印每一批次数据的最开始前10个数据,一般只用于开发和调试

  • saveAsTextFiles(prefix, [suffix]):以text 文件的形式将每一批次的数据 存储 DStream 的内容。

文件命名格式 : prefix-Time_IN_MS[.suffix]

  • saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中每一批次的数据保存为 SequenceFiles .文件命名格式 : prefix-Time_IN_MS[.suffix] ,目前 python 中不可用

  • saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的每一批次的数据保存为 Hadoop files.

文件命名格式 : prefix-Time_IN_MS[.suffix] Python API 中目前不可用。

  • foreachRDD(func):这是最通用的输出操作,参数 func 作用于每一个 rdd,一般涌入将RDD 存入文件或者写入数据库

注意:

  • 连接不能写在 driver 层面(序列化)
  • 如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失;
  • 如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失;

    六、关闭

    流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内部程序关闭。
                   val state: StreamingContextState = ssc.getState()
                   if ( state == StreamingContextState.ACTIVE ) {
                      ssc.stop(stopSparkContext = true, stopGracefully = true)
                   }
                   System.exit(0)