一、介绍

数据处理的方式角度

  • 流式数据处理
  • 批量数据处理

数据处理延迟的长短

  • 实时数据处理:毫秒级别
  • 离线数据处理:小时 or 天级别

SparkStreaming是一个准实时(秒,分钟),微批次(时间)的数据处理框架

Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语

如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

image.png

和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以简单来讲,DStream 就是对 RDD 在实时数据处理场景的一种封装。

1、特点

  • 易用
  • 容错
  • 易整合到Spark体系

2、整体架构

image.png

SparkStreaming 架构
image.png

3、背压机制

Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer 数据生产高于 maxRate,当前集群处理能力也高于 maxRate,这就会造成资源利用率下降等问题。

为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值false,即不启用。

二、Dstream

添加Spark Streaming依赖

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming_2.12</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>

1、永远的WordCount

使用netcat工具创建临时端口不断发送数据,

我们使用SparkStreaming需要创建一个SparkStreamingContext

  1. object WordCount {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")
  4. // 创建一个StreamingContext,第一个参数是SparkConf配置,第二个是采集周期(频率)
  5. // Duration是个时间,默认毫秒,是一个样例类,可以使用伴生对象
  6. // val context = new StreamingContext(conf, Duration(3000L))
  7. val ssc = new StreamingContext(conf, Seconds(3))
  8. // 3、建立socket端口连接
  9. val lineStreams: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
  10. // 4、切分成单词
  11. val words: DStream[String] = lineStreams.flatMap(_.split(" "))
  12. // 5、映射成次数
  13. val tuple: DStream[(String, Int)] = words.map((_, 1))
  14. // 6、统计
  15. val wordCount: DStream[(String, Int)] = tuple.reduceByKey(_ + _)
  16. // 7、打印
  17. wordCount.print()
  18. // 启动SparkStreamingContext
  19. ssc.start()
  20. // 由于处理,需要一直启动,不可以停止
  21. ssc.awaitTermination()
  22. }
  23. }

启动netcat服务器

  1. nc -lk 9999

在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据。

2、RDD队列

测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处理。

  1. object RDD_Queue {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")
  4. // 创建一个StreamingContext,第一个参数是SparkConf配置,第二个是采集周期(频率)
  5. // Duration是个时间,默认毫秒,是一个样例类,可以使用伴生对象
  6. // val context = new StreamingContext(conf, Duration(3000L))
  7. val ssc = new StreamingContext(conf, Seconds(3))
  8. // 创建RDD队列
  9. val queue = new mutable.Queue[RDD[Int]]()
  10. // 这个onAtTime表示是否允许只有一个队列被一次获取中消费
  11. val stream: InputDStream[Int] = ssc.queueStream(queue, oneAtATime = false)
  12. val wordCount: DStream[(Int, Int)] = stream.map((_, 1)).reduceByKey(_ + _)
  13. // 7、打印
  14. wordCount.print()
  15. // 启动SparkStreamingContext
  16. ssc.start()
  17. // 启动之后,不断往queue中添加数据
  18. for(i <- 1 to 5){
  19. queue += ssc.sparkContext.makeRDD(1 to 300, 10)
  20. Thread.sleep(2000)
  21. }
  22. // 由于处理,需要一直启动,不可以停止
  23. ssc.awaitTermination()
  24. }
  25. }

3、自定义数据源

自定义数据采集器
1、继承Receiver,定义泛型
2、重写onStart()和onStop()

  1. object RDD_CustomDataSource {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")
  4. // 创建一个StreamingContext,第一个参数是SparkConf配置,第二个是采集周期(频率)
  5. // Duration是个时间,默认毫秒,是一个样例类,可以使用伴生对象
  6. // val context = new StreamingContext(conf, Duration(3000L))
  7. val ssc = new StreamingContext(conf, Seconds(3))
  8. val stream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver)
  9. stream.print()
  10. // 启动SparkStreamingContext
  11. ssc.start()
  12. // 启动之后,不断往queue中添加数据
  13. // 由于处理,需要一直启动,不可以停止
  14. ssc.awaitTermination()
  15. }
  16. class MyReceiver extends Receiver[String](storageLevel = StorageLevel.MEMORY_ONLY) {
  17. private var flag = false
  18. // 启动,抓取数据
  19. override def onStart(): Unit = {
  20. new Thread(new Runnable {
  21. override def run(): Unit = {
  22. while (true) {
  23. val message = "采集的数据是:" + new Random().nextInt().toString
  24. // 保存消息
  25. store(message)
  26. Thread.sleep(500)
  27. }
  28. }
  29. }).start()
  30. }
  31. // 停止,接收
  32. override def onStop(): Unit = {
  33. flag = false
  34. }
  35. }
  36. }

源码解析:略

4、Kafka数据源

ReceiverAPI:需要一个专门的 Executor 去接收数据,然后发送给其他的 Executor 做计算。存在的问题,接收数据的 Executor 和计算的 Executor 速度会有所不同,特别在接收数据的 Executor速度大于计算的 Executor 速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用

DirectAPI:是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。

1、Kafka 0-8 Direct 模式(当前版本不适用)

依赖:

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
  4. <version>2.4.5</version>
  5. </dependency>

自动维护offset

  1. object DirectAPIAuto02 {
  2. val getSSC1: () => StreamingContext = () => {
  3. val sparkConf: SparkConf = new
  4. SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
  5. val ssc = new StreamingContext(sparkConf, Seconds(3))
  6. ssc
  7. }
  8. def getSSC: StreamingContext = {
  9. //1.创建 SparkConf
  10. val sparkConf: SparkConf = new
  11. SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
  12. //2.创建 StreamingContext
  13. val ssc = new StreamingContext(sparkConf, Seconds(3))
  14. //设置 CK
  15. ssc.checkpoint("./ck2")
  16. //3.定义 Kafka参数
  17. val kafkaPara: Map[String, String] = Map[String, String](
  18. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
  19. "linux1:9092,linux2:9092,linux3:9092",
  20. ConsumerConfig.GROUP_ID_CONFIG -> "atguigu"
  21. )
  22. //4.读取 Kafka数据
  23. val kafkaDStream: InputDStream[(String, String)] =
  24. KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaPara,Set("atguigu"))
  25. //5.计算 WordCount
  26. kafkaDStream.map(_._2)
  27. .flatMap(_.split(" "))
  28. .map((_, 1))
  29. .reduceByKey(_ + _)
  30. .print()
  31. //6.返回数据
  32. ssc
  33. }
  34. def main(args: Array[String]): Unit = {
  35. //获取 SSC
  36. val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck2", () =>getSSC)
  37. //开启任务
  38. ssc.start()
  39. ssc.awaitTermination()
  40. }
  41. }

手动维护offset

  1. object DirectAPIHandler {
  2. def main(args: Array[String]): Unit = {
  3. //1.创建 SparkConf
  4. val sparkConf: SparkConf = new
  5. SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
  6. //2.创建 StreamingContext
  7. val ssc = new StreamingContext(sparkConf, Seconds(3))
  8. //3.Kafka参数
  9. val kafkaPara: Map[String, String] = Map[String, String](
  10. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
  11. "hadoop102:9092,hadoop103:9092,hadoop104:9092",
  12. ConsumerConfig.GROUP_ID_CONFIG -> "atguigu"
  13. )
  14. //4.获取上一次启动最后保留的 Offset=>getOffset(MySQL)
  15. val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition,
  16. Long](TopicAndPartition("atguigu", 0) -> 20)
  17. //5.读取 Kafka数据创建 DStream
  18. val kafkaDStream: InputDStream[String] = KafkaUtils
  19. .createDirectStream[String,String, StringDecoder
  20. , StringDecoder, String](ssc,(m: MessageAndMetadata[String, String]) => m.message())
  21. //6.创建一个数组用于存放当前消费数据的 offset信息
  22. var offsetRanges = Array.empty[OffsetRange]
  23. //7.获取当前消费数据的 offset信息
  24. val wordToCountDStream: DStream[(String, Int)] = kafkaDStream.transform { rdd =>
  25. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  26. rdd
  27. }.flatMap(_.split(" "))
  28. .map((_, 1))
  29. .reduceByKey(_ + _)
  30. //8.打印 Offset信息
  31. wordToCountDStream.foreachRDD(rdd => {
  32. for (o <- offsetRanges) {
  33. println(s"${o.topic}:${o.partition}:${o.fromOffset}:${o.untilOffset}")
  34. }
  35. rdd.foreach(println)
  36. })
  37. //9.开启任务
  38. ssc.start()
  39. ssc.awaitTermination()
  40. }
  41. }

2、Kafka 0-10 Direct 模式

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.fasterxml.jackson.core</groupId>
  8. <artifactId>jackson-core</artifactId>
  9. <version>2.10.1</version>
  10. </dependency>
  1. object DirectAPI {
  2. def main(args: Array[String]): Unit = {
  3. //1.创建 SparkConf
  4. val sparkConf: SparkConf = new
  5. SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
  6. //2.创建 StreamingContext
  7. val ssc = new StreamingContext(sparkConf, Seconds(3))
  8. //3.定义 Kafka参数
  9. val kafkaPara: Map[String, Object] = Map[String, Object](
  10. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
  11. "linux1:9092,linux2:9092,linux3:9092",
  12. ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
  13. "key.deserializer" ->
  14. "org.apache.kafka.common.serialization.StringDeserializer",
  15. "value.deserializer" ->
  16. "org.apache.kafka.common.serialization.StringDeserializer"
  17. )
  18. //4.读取 Kafka数据创建 DStream
  19. val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
  20. KafkaUtils.createDirectStream[String, String](ssc,
  21. LocationStrategies.PreferConsistent,
  22. ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))
  23. //5.将每条消息的 KV取出
  24. val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
  25. //6.计算 WordCount
  26. valueDStream.flatMap(_.split(" "))
  27. .map((_, 1))
  28. .reduceByKey(_ + _)
  29. .print()
  30. //7.开启任务
  31. ssc.start()
  32. ssc.awaitTermination()
  33. }
  34. }

5、Dstream转换

DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语

1、有状态转换操作

将数据保存到一个检查点目录中,每次加载这个缓存中的目录。updateStateByKey

  1. object StatefuleOption {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-streaming")
  4. val ssc = new StreamingContext(conf, Seconds(3))
  5. // 必须设置检查点作为缓存的目录
  6. ssc.checkpoint("cache")
  7. val linesStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999, storageLevel = StorageLevel.MEMORY_ONLY)
  8. val state: DStream[(String, Int)] = linesStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
  9. // 根据key有状态更新
  10. // 第一个值表示相同的key的value数据,
  11. // 第二个值表示缓冲区的相同的key的value数据
  12. .updateStateByKey((seq: Seq[Int], buffer: Option[Int]) => {
  13. val newVal = buffer.getOrElse(0) + seq.sum
  14. Option(newVal)
  15. })
  16. state.print()
  17. ssc.start()
  18. ssc.awaitTermination()
  19. }
  20. }

1、window

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许
状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

  • 窗口时长:计算内容的时间范围;
  • 滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集周期大小的整数倍。

WordCount 第三版:3 秒一个批次,窗口 12 秒,滑步 6 秒。

关于 Window 的操作还有如下方法:
(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream;
(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的 reduce 函数”,也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式传入)。如前述函数,reduce 任务的数量通过可选参数来配置。

  1. object StatefuleOption_window {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-streaming")
  4. val ssc = new StreamingContext(conf, Seconds(3))
  5. val stream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
  6. // 每隔三秒采集,每次移动1秒
  7. val ds: DStream[String] = stream.window(Seconds(3), Seconds(1))
  8. ds.print()
  9. val dss: DStream[(String, Int)] = stream.flatMap(_.split(" ")).map((_, 1))
  10. // 根据key和windows聚合
  11. // 第一个是数据增加的函数
  12. // 第二个是数据减少的函数
  13. // 第三个是
  14. .reduceByKeyAndWindow(
  15. (x: Int, y: Int) => {
  16. x + y
  17. },
  18. (x: Int, y: Int) => {
  19. x - y
  20. },
  21. // 3:窗口间隔
  22. Seconds(6),
  23. // 4:滑动间隔
  24. Seconds(2)
  25. )
  26. dss.print()
  27. ssc.start()
  28. ssc.awaitTermination()
  29. }
  30. }

2、无状态转换操作

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。

1、transform

拿到最原始的RDD后进行操作
1、DStream功能不完善
2、需要代码周期性的执行

  1. object Statefuless_Transform {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-streaming")
  4. val ssc = new StreamingContext(conf, Seconds(3))
  5. // 必须设置检查点作为缓存的目录
  6. ssc.checkpoint("cache")
  7. val linesStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999, storageLevel = StorageLevel.MEMORY_ONLY)
  8. // 写代码的位置:Driver端
  9. val stream: DStream[String] = linesStream.transform(
  10. stream => stream.map(
  11. // 写代码的位置:Driver端
  12. // 流式数据,源源不断的rdd,代码可以周期性执行
  13. rdd => {
  14. // 写代码的位置:rdd内部:Executor端
  15. rdd
  16. }
  17. )
  18. )
  19. // 写代码的位置:Driver端
  20. val s: DStream[String] = stream.map(
  21. stream => {
  22. // 写代码的位置:rdd内部:Executor端
  23. stream
  24. }
  25. )
  26. s.print()
  27. ssc.start()
  28. ssc.awaitTermination()
  29. }
  30. }

2、join

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

  1. object Statefuless_Join {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-streaming")
  4. val ssc = new StreamingContext(conf, Seconds(3))
  5. ssc.checkpoint("cp")
  6. val data9999: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999, storageLevel = StorageLevel.MEMORY_ONLY)
  7. val data8888: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888, storageLevel = StorageLevel.MEMORY_ONLY)
  8. val map9999: DStream[(String, Int)] = data9999.map((_, 9))
  9. val map8888: DStream[(String, Int)] = data8888.map((_, 8))
  10. // 所谓ds的join操作,其实就是两个RDD的join
  11. val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)
  12. joinDS.print()
  13. ssc.start()
  14. ssc.awaitTermination()
  15. }
  16. }

6、DStream输出

如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。
如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。

常见的输出操作:

  • print()
  • saveAsTextFiles()
  • saveAsObjectFiles()
  • saveAsHadoopFiles()
  • foreachRDD(func)

注意:
1) 连接不能写在 driver 层面(序列化)
2) 如果写在 foreach 则每个 RDD 中的每一条数据都创建,得不偿失;
3) 增加 foreachPartition,在分区创建(获取)。

7、优雅停机

处理完当前数据之后关闭

  1. /**
  2. * Stop the execution of the streams, with option of ensuring all received data
  3. * has been processed.
  4. *
  5. * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
  6. * will be stopped regardless of whether this StreamingContext has been
  7. * started.
  8. * @param stopGracefully if true, stops gracefully by waiting for the processing of all
  9. * received data to be completed
  10. */
  11. def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
  12. var shutdownHookRefToRemove: AnyRef = null
  13. if (LiveListenerBus.withinListenerThread.value) {
  14. throw new SparkException(s"Cannot stop StreamingContext within listener bus thread.")
  15. }
  16. synchronized {
  17. // The state should always be Stopped after calling `stop()`, even if we haven't started yet
  18. state match {
  19. case INITIALIZED =>
  20. logWarning("StreamingContext has not been started yet")
  21. state = STOPPED
  22. case STOPPED =>
  23. logWarning("StreamingContext has already been stopped")
  24. state = STOPPED
  25. case ACTIVE =>
  26. // It's important that we don't set state = STOPPED until the very end of this case,
  27. // since we need to ensure that we're still able to call `stop()` to recover from
  28. // a partially-stopped StreamingContext which resulted from this `stop()` call being
  29. // interrupted. See SPARK-12001 for more details. Because the body of this case can be
  30. // executed twice in the case of a partial stop, all methods called here need to be
  31. // idempotent.
  32. Utils.tryLogNonFatalError {
  33. scheduler.stop(stopGracefully)
  34. }
  35. // Removing the streamingSource to de-register the metrics on stop()
  36. Utils.tryLogNonFatalError {
  37. env.metricsSystem.removeSource(streamingSource)
  38. }
  39. Utils.tryLogNonFatalError {
  40. uiTab.foreach(_.detach())
  41. }
  42. Utils.tryLogNonFatalError {
  43. unregisterProgressListener()
  44. }
  45. StreamingContext.setActiveContext(null)
  46. Utils.tryLogNonFatalError {
  47. waiter.notifyStop()
  48. }
  49. if (shutdownHookRef != null) {
  50. shutdownHookRefToRemove = shutdownHookRef
  51. shutdownHookRef = null
  52. }
  53. logInfo("StreamingContext stopped successfully")
  54. state = STOPPED
  55. }
  56. }
  57. if (shutdownHookRefToRemove != null) {
  58. ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
  59. }
  60. // Even if we have already stopped, we still need to attempt to stop the SparkContext because
  61. // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
  62. if (stopSparkContext) sc.stop()
  63. }
  • 新建一个线程用作优雅停机
  • 在第三方存储中设置中断标志
  • 重启时从检查点恢复数据
  1. object SparkStreaming08_Close {
  2. def main(args: Array[String]): Unit = {
  3. /*
  4. 线程的关闭:
  5. val thread = new Thread()
  6. thread.start()
  7. thread.stop(); // 强制关闭
  8. */
  9. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  10. val ssc = new StreamingContext(sparkConf, Seconds(3))
  11. val lines = ssc.socketTextStream("localhost", 9999)
  12. val wordToOne = lines.map((_,1))
  13. wordToOne.print()
  14. ssc.start()
  15. // 如果想要关闭采集器,那么需要创建新的线程
  16. // 而且需要在第三方程序中增加关闭状态
  17. new Thread(
  18. new Runnable {
  19. override def run(): Unit = {
  20. // 优雅地关闭
  21. // 计算节点不在接收新的数据,而是将现有的数据处理完毕,然后关闭
  22. // Mysql : Table(stopSpark) => Row => data
  23. // Redis : Data(K-V)
  24. // ZK : /stopSpark
  25. // HDFS : /stopSpark
  26. /*
  27. while ( true ) {
  28. if (true) {
  29. // 获取SparkStreaming状态
  30. val state: StreamingContextState = ssc.getState()
  31. if ( state == StreamingContextState.ACTIVE ) {
  32. ssc.stop(true, true)
  33. }
  34. }
  35. Thread.sleep(5000)
  36. }
  37. */
  38. Thread.sleep(5000)
  39. val state: StreamingContextState = ssc.getState()
  40. if ( state == StreamingContextState.ACTIVE ) {
  41. ssc.stop(true, true)
  42. }
  43. System.exit(0)
  44. }
  45. }
  46. ).start()
  47. ssc.awaitTermination() // block 阻塞main线程
  48. }
  49. }

恢复数据

  1. object SparkStreaming09_Resume {
  2. def main(args: Array[String]): Unit = {
  3. val ssc = StreamingContext.getActiveOrCreate("cp", ()=>{
  4. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
  5. val ssc = new StreamingContext(sparkConf, Seconds(3))
  6. val lines = ssc.socketTextStream("localhost", 9999)
  7. val wordToOne = lines.map((_,1))
  8. wordToOne.print()
  9. ssc
  10. })
  11. ssc.checkpoint("cp")
  12. ssc.start()
  13. ssc.awaitTermination() // block 阻塞main线程
  14. }
  15. }

三、案例实操

1、环境准备

添加依赖

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-core_2.12</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.spark</groupId>
  8. <artifactId>spark-streaming_2.12</artifactId>
  9. <version>3.0.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.spark</groupId>
  13. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  14. <version>3.0.0</version>
  15. </dependency>
  16. <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
  17. <dependency>
  18. <groupId>com.alibaba</groupId>
  19. <artifactId>druid</artifactId>
  20. <version>1.1.10</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>mysql</groupId>
  24. <artifactId>mysql-connector-java</artifactId>
  25. <version>5.1.27</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>com.fasterxml.jackson.core</groupId>
  29. <artifactId>jackson-core</artifactId>
  30. <version>2.10.1</version>
  31. </dependency>
  • 实时数据生成 ```scala object MockData { def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“mock-data”) // new StreamingContext(conf,S)

    // 生成模拟数据 // 格式 timestamp area city userId adId // 含义:时间戳 区域 城市 用户 广告

    val kafkaProperties = new Properties() // 添加配置 kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,”192.168.29.128:9092”) // kafka集群地址 kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,”org.apache.kafka.common.serialization.StringSerializer”) // key的序列化配置 kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,”org.apache.kafka.common.serialization.StringSerializer”) // value的序列化配置 val producer = new KafkaProducerString, String // 将数据通过Application => Kafka => SparkStreaming => Analysis

    while(true) {

    1. mockData().foreach(
    2. data => {
    3. // 向Kafka中生成数据

    // producer.send(new ProducerRecordString,String)

    1. println(data)
    2. }
    3. )
    4. Thread.sleep(2000)

    }

} def mockData(): ListBuffer[String] = {

  1. val list = ListBuffer[String]()
  2. val areaList = ListBuffer[String]("华北","华东","华南")
  3. val cityList = ListBuffer[String]("北京","上海","深圳")
  4. for (i <- 1 to 30){
  5. val area = areaList(new Random().nextInt(3)) + 1
  6. val city = areaList(new Random().nextInt(3)) + 1
  7. val userId = new Random().nextInt(6) + 1
  8. val adId = new Random().nextInt(6) + 1
  9. list.append(s"${System.currentTimeMillis()} ${area} ${city} ${userId} ${adId}")
  10. }
  11. list

}

}

  1. - JdbcUtils
  2. ```scala
  3. object JdbcUtil {
  4. private var dataSource = init()
  5. def init(): DataSource = {
  6. val prop = new Properties()
  7. prop.setProperty("url","jdbc:mysql://localhost:3306/spark-streaming")
  8. prop.setProperty("driverClassName","com.mysql.jdbc.Driver")
  9. prop.setProperty("user","root")
  10. prop.setProperty("password","root")
  11. prop.setProperty("maxActive","50")
  12. DruidDataSourceFactory.createDataSource(prop)
  13. }
  14. def getCon:Connection = {
  15. dataSource.getConnection()
  16. }
  17. }

2、需求一:广告黑名单

实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。
注:黑名单保存到 MySQL 中
**

思路:
1、判断用户是否在白名单(目前存在mysql表)
2、判断点击是否超过阈值
3、更新点击次数