学习链接:https://www.bilibili.com/video/BV11A411L7CK?p=185&spm_id_from=pageDriver&vd_source=b9e4f35102d61e6d02e0a5e1bbfea480


1 SparkStreaming 概述

1.1 Spark Streaming是什么

Spark 流使得构建可扩展的容错流应用程序变得更加容易。
Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语。如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
图片1.png
Spark Streaming准实时(秒、分钟)、微批次(时间)的数据处理框架。

  • 数据处理的方式角度

流式(Streaming)数据处理 批量(batch)数据处理

  • 数据处理延迟的长短

实时数据处理:毫秒级别 离线数据处理:小时 or 天级别

和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些RDD 所组成的序列(因此得名“离散化”)。所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。

1.2 Spark Streaming的特点

  1. 易用
  2. 容错
  3. 易整合到Spark体系

    1.3 Spark Streaming架构

    1.3.1 架构图

  • 整体架构图

图片2.png

  • Spark Streaming架构图

图片3.png

1.3.2 背压机制

Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer 数据生产高于 maxRate,当前集群处理能力也高于 maxRate,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值false,即不启用。

2 Dstream入门

2.1 WordCount案例实操

需求:使用netcat工具向9999端口不断发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

  1. 添加依赖

    1. <dependency>
    2. <groupId>org.apache.spark</groupId>
    3. <artifactId>spark-streaming_2.12</artifactId>
    4. <version>3.0.0</version>
    5. </dependency>
  2. 代码

    1. object SparkStreaming01_WordCount {
    2. def main(args: Array[String]): Unit = {
    3. // TODO 创建环境对象
    4. // StreamingContext创建时,需要传递两个参数
    5. // 第一个参数表示环境配置
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    7. // 第二个参数表示批量处理的周期(采集周期)
    8. val ssc = new StreamingContext(sparkConf, Seconds(3))
    9. // TODO 逻辑处理
    10. // 获取端口数据
    11. val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    12. val words = lines.flatMap(_.split(" "))
    13. val wordToOne = words.map((_, 1))
    14. val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)
    15. wordToCount.print()
    16. // TODO 关闭环境
    17. // SparkStreaming采集器是长期执行的任务,所以不能直接关闭
    18. // 如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
    19. // ssc.stop()
    20. // 1. 启动采集器
    21. ssc.start()
    22. // 2. 等待采集器的关闭
    23. ssc.awaitTermination()
    24. }
    25. }
  3. 启动程序并通过netcat发送数据

QQ截图20220627104650.png

2.2 WordCount解析

Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据。
图片4.png
对数据的操作也是按照RDD 为单位来进行的
图片5.png
计算过程由 Spark Engine 来完成
图片6.png

3 DStream创建

3.1 RDD队列

3.1.1 用法及说明

测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处理。

3.1.2 案例实操

  • 需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建DStream,计算WordCount

    1. object SparkStreaming02_Queue {
    2. def main(args: Array[String]): Unit = {
    3. // TODO 创建环境对象
    4. // StreamingContext创建时,需要传递两个参数
    5. // 第一个参数表示环境配置
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    7. // 第二个参数表示批量处理的周期(采集周期)
    8. val ssc = new StreamingContext(sparkConf, Seconds(3))
    9. // 创建RDD队列
    10. val rddQueue = new mutable.Queue[RDD[Int]]()
    11. // 创建QueueInputDStream
    12. val inputStream = ssc.queueStream(rddQueue, oneAtATime = false)
    13. val mappedStream = inputStream.map((_, 1))
    14. val reducedStream = mappedStream.reduceByKey(_ + _)
    15. reducedStream.print()
    16. // TODO 关闭环境
    17. // SparkStreaming采集器是长期执行的任务,所以不能直接关闭
    18. // 如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
    19. // ssc.stop()
    20. // 1. 启动采集器
    21. ssc.start()
    22. for (i <- 1 to 5) {
    23. rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
    24. Thread.sleep(2000)
    25. }
    26. // 2. 等待采集器的关闭
    27. ssc.awaitTermination()
    28. }
    29. }

    QQ截图20220627120750.png

    3.2 自定义数据源

    3.2.1 用法及说明

    需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

    3.2.2 案例实操

    1. object SparkStreaming03_DIY {
    2. def main(args: Array[String]): Unit = {
    3. // TODO 创建环境对象
    4. // StreamingContext创建时,需要传递两个参数
    5. // 第一个参数表示环境配置
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    7. // 第二个参数表示批量处理的周期(采集周期)
    8. val ssc = new StreamingContext(sparkConf, Seconds(3))
    9. val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
    10. messageDS.print()
    11. ssc.start()
    12. ssc.awaitTermination()
    13. }
    14. /*
    15. 自定义数据采集器
    16. 1. 继承Receiver,定义泛型,传递参数存储级别
    17. 2. 重写方法
    18. */
    19. class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    20. private var flag = true
    21. override def onStart(): Unit = {
    22. new Thread(new Runnable {
    23. override def run(): Unit = {
    24. while (flag) {
    25. val message = "采集的数据为:" + new Random().nextInt(10).toString
    26. store(message)
    27. Thread.sleep(500)
    28. }
    29. }
    30. }).start()
    31. }
    32. override def onStop(): Unit = {
    33. flag = false
    34. }
    35. }
    36. }

    QQ截图20220627182444.png

    3.3 Kafka(暂时跳过)

    4 DStream转换

    DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。

    4.1 无状态转化操作

    无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。
    每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。
    图片1.png

    4.1.1 Transform

    Transform 允许 DStream 上执行任意的RDD-to-RDD 函数。即使这些函数并没有在DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也就是对 DStream 中的 RDD 应用转换。

    1. object SparkStreaming05_State_Transform {
    2. def main(args: Array[String]): Unit = {
    3. // TODO 创建环境对象
    4. // StreamingContext创建时,需要传递两个参数
    5. // 第一个参数表示环境配置
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    7. // 第二个参数表示批量处理的周期(采集周期)
    8. val ssc = new StreamingContext(sparkConf, Seconds(3))
    9. val lines = ssc.socketTextStream("localhost", 9999)
    10. // transform方法可以将底层RDD获取到后进行操作
    11. // 1. DStream功能不完善
    12. // 2. 需要代码周期性执行
    13. // Code: Driver端
    14. val newDS: DStream[String] = lines.transform(
    15. rdd => {
    16. rdd.map(
    17. // Code:Driver端,周期性执行
    18. str => {
    19. // Code:Executor端
    20. str
    21. }
    22. )
    23. }
    24. )
    25. // Code: Driver端
    26. val newDS1: DStream[String] = lines.map(
    27. data => {
    28. // Code: Executor端
    29. data
    30. }
    31. )
    32. ssc.start()
    33. ssc.awaitTermination()
    34. }
    35. }

    4.1.2 join

    两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD进行 join,与两个 RDD的 join效果相同。

    1. object SparkStreaming06_State_Join {
    2. def main(args: Array[String]): Unit = {
    3. // TODO 创建环境对象
    4. // StreamingContext创建时,需要传递两个参数
    5. // 第一个参数表示环境配置
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    7. // 第二个参数表示批量处理的周期(采集周期)
    8. val ssc = new StreamingContext(sparkConf, Seconds(3))
    9. val data9999 = ssc.socketTextStream("localhost", 9999)
    10. val data8888 = ssc.socketTextStream("localhost", 8888)
    11. val map9999: DStream[(String, Int)] = data9999.map((_, 9))
    12. val map8888: DStream[(String, Int)] = data8888.map((_, 8))
    13. // DStream的Join操作,其实就是两个RDD的join
    14. val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)
    15. joinDS.print()
    16. ssc.start()
    17. ssc.awaitTermination()
    18. }
    19. }

    4.2 有状态转化操作

    4.2.1 UpdateStateByKey

    UpdateStateByKey原语用于记录历史记录,有时需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
    updateStateByKey() 的结果会是一个新的DStream,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。
    updateStateByKey操作使得可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:

  1. 定义状态,状态可以是一个任意的数据类型。
  2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。

  1. object SparkStreaming04_State {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 创建环境对象
  4. // StreamingContext创建时,需要传递两个参数
  5. // 第一个参数表示环境配置
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  7. // 第二个参数表示批量处理的周期(采集周期)
  8. val ssc = new StreamingContext(sparkConf, Seconds(3))
  9. ssc.checkpoint("cp")
  10. // 无状态数据操作,只对当前的采集周期内的数据进行处理
  11. // 在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
  12. // 有状态操作时,需要设定检查点路径
  13. val datas = ssc.socketTextStream("localhost", 9999)
  14. val wordToOne = datas.map((_, 1))
  15. // val wordToCount = wordToOne.reduceByKey(_ + _)
  16. // updateStateByKey:根据数据的状态进行更新
  17. // 传递的参数有两个值
  18. // 第一个值表示相同的key的value数据
  19. // 第二个数据表示缓冲区相同key的value值
  20. val state = wordToOne.updateStateByKey((seq: Seq[Int], buff: Option[Int]) => {
  21. val newCount = buff.getOrElse(0) + seq.sum
  22. Option(newCount)
  23. })
  24. state.print()
  25. ssc.start()
  26. ssc.awaitTermination()
  27. }
  28. }

4.2.2 WindowOperations

· Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

  • 窗口时长:计算内容的时间范围
  • 滑动步长:隔多久触发一次计算

    1. object SparkStreaming06_State_Window {
    2. def main(args: Array[String]): Unit = {
    3. // TODO 创建环境对象
    4. // StreamingContext创建时,需要传递两个参数
    5. // 第一个参数表示环境配置
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    7. // 第二个参数表示批量处理的周期(采集周期)
    8. val ssc = new StreamingContext(sparkConf, Seconds(3))
    9. val lines = ssc.socketTextStream("localhost", 9999)
    10. val wordToOne = lines.map((_, 1))
    11. // 窗口的范围应该是采集周期的整数倍
    12. // 窗口是可以滑动的,但默认情况下,一个采集周期进行滑动
    13. // 可能会出现重复数据的计算,设置步长避免
    14. val windowsDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6))
    15. val wordToCount = windowsDS.reduceByKey(_ + _)
    16. wordToCount.print()
    17. ssc.start()
    18. ssc.awaitTermination()
    19. }
    20. }
    1. object SparkStreaming06_State_Window1 {
    2. def main(args: Array[String]): Unit = {
    3. // TODO 创建环境对象
    4. // StreamingContext创建时,需要传递两个参数
    5. // 第一个参数表示环境配置
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    7. // 第二个参数表示批量处理的周期(采集周期)
    8. val ssc = new StreamingContext(sparkConf, Seconds(3))
    9. ssc.checkpoint("cp")
    10. val lines = ssc.socketTextStream("localhost", 9999)
    11. val wordToOne = lines.map((_, 1))
    12. // reduceByKeyAndWindow:当窗口范围比较大,但是滑动幅度比较小,采用增加数据和删除数据的方式
    13. // 无需重复计算,提升性能
    14. val windowsDS: DStream[(String, Int)] =
    15. wordToOne.reduceByKeyAndWindow(
    16. (x: Int, y: Int) => {
    17. x + y
    18. },
    19. (x: Int, y: Int) => {
    20. x - y
    21. },
    22. Seconds(9), Seconds(3)
    23. )
    24. val wordToCount = windowsDS.reduceByKey(_ + _)
    25. wordToCount.print()
    26. ssc.start()
    27. ssc.awaitTermination()
    28. }
    29. }

    关于Window的操作还有如下方法:
    (1)window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的Dstream;
    (2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
    (3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
    (4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。
    (5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。
    图片1.png
    countByWindow()和countByValueAndWindow()作为对数据进行计数操作的简写。countByWindow()返回一个表示每个窗口中元素个数的DStream,而countByValueAndWindow()返回的DStream则包含窗口中每个值的个数。

    5 DStream输出

    输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。
    输出操作如下:

  • print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。

  • saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”
  • saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为”prefix-TIME_IN_MS[.suffix]”. Python中目前不可用。
  • saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为”prefix-TIME_IN_MS[.suffix]”。Python API 中目前不可用。
  • foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。

通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。
注意:
1) 连接不能写在driver层面(序列化)
2) 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
3) 增加foreachPartition,在分区创建(获取)。

  1. object SparkStreaming07_Output {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 创建环境对象
  4. // StreamingContext创建时,需要传递两个参数
  5. // 第一个参数表示环境配置
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  7. // 第二个参数表示批量处理的周期(采集周期)
  8. val ssc = new StreamingContext(sparkConf, Seconds(3))
  9. ssc.checkpoint("cp")
  10. val lines = ssc.socketTextStream("localhost", 9999)
  11. val wordToOne = lines.map((_, 1))
  12. // reduceByKeyAndWindow:当窗口范围比较大,但是滑动幅度比较小,采用增加数据和删除数据的方式
  13. // 无需重复计算,提升性能
  14. val windowsDS: DStream[(String, Int)] =
  15. wordToOne.reduceByKeyAndWindow(
  16. (x: Int, y: Int) => {
  17. x + y
  18. },
  19. (x: Int, y: Int) => {
  20. x - y
  21. },
  22. Seconds(9), Seconds(3)
  23. )
  24. val wordToCount = windowsDS.reduceByKey(_ + _)
  25. // SparkStreaming没有输出操作,报错
  26. // wordToCount.print()
  27. ssc.start()
  28. ssc.awaitTermination()
  29. }
  30. }
  1. object SparkStreaming07_Output1 {
  2. def main(args: Array[String]): Unit = {
  3. // TODO 创建环境对象
  4. // StreamingContext创建时,需要传递两个参数
  5. // 第一个参数表示环境配置
  6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  7. // 第二个参数表示批量处理的周期(采集周期)
  8. val ssc = new StreamingContext(sparkConf, Seconds(3))
  9. ssc.checkpoint("cp")
  10. val lines = ssc.socketTextStream("localhost", 9999)
  11. val wordToOne = lines.map((_, 1))
  12. // reduceByKeyAndWindow:当窗口范围比较大,但是滑动幅度比较小,采用增加数据和删除数据的方式
  13. // 无需重复计算,提升性能
  14. val windowsDS: DStream[(String, Int)] =
  15. wordToOne.reduceByKeyAndWindow(
  16. (x: Int, y: Int) => {
  17. x + y
  18. },
  19. (x: Int, y: Int) => {
  20. x - y
  21. },
  22. Seconds(9), Seconds(3)
  23. )
  24. // foreachRDD 不会出现时间戳
  25. windowsDS.foreachRDD(
  26. rdd => {
  27. }
  28. )
  29. ssc.start()
  30. ssc.awaitTermination()
  31. }
  32. }

6 优雅关闭

流式任务需要7*24小时执行,但是有时涉及到 升级代码 需要 主动停止程序 ,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内部程序关闭。

  1. object SparkStreaming08_Close {
  2. def main(args: Array[String]): Unit = {
  3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  4. val ssc = new StreamingContext(sparkConf, Seconds(3))
  5. val lines = ssc.socketTextStream("localhost", 9999)
  6. val wordToOne = lines.map((_, 1))
  7. wordToOne.print()
  8. ssc.start()
  9. // 如果想要关闭采集器,需要创建新的线程
  10. new Thread(
  11. new Runnable {
  12. override def run(): Unit = {
  13. // 优雅地关闭
  14. // 计算节点不再接收新的数据,而是将现有的数据处理完毕,然后关闭
  15. while (true) {
  16. if(true) {
  17. // 获取SparkStreaming状态
  18. val state: StreamingContextState = ssc.getState()
  19. if (state == StreamingContextState.ACTIVE) {
  20. ssc.stop(true, true)
  21. }
  22. System.exit(0)
  23. }
  24. Thread.sleep(5000)
  25. }
  26. }
  27. }
  28. ).start()
  29. ssc.awaitTermination() // 阻塞main线程
  30. }
  31. }
  1. object SparkStreaming09_Resume {
  2. def main(args: Array[String]): Unit = {
  3. val ssc = StreamingContext.getActiveOrCreate("cp", ()=> {
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  5. val ssc = new StreamingContext(sparkConf, Seconds(3))
  6. val lines = ssc.socketTextStream("localhost", 9999)
  7. val wordToOne = lines.map((_, 1))
  8. wordToOne.print()
  9. ssc
  10. })
  11. ssc.checkpoint("cp")
  12. ssc.start()
  13. ssc.awaitTermination()
  14. }
  15. }