4.1 概述

什么是 Spark Streaming

Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此 得名“离散化”)。

DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的 DStream 支持两种操作,一种是转化操作(transformation),会生成新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。

4. Streaming 应用解析 - 图1

4. Streaming 应用解析 - 图2

Spark 与 Storm 的对比

4. Streaming 应用解析 - 图3

4.2 运行 Spark Streaming

IDEA 编写程序

Pom.xml 加入以下依赖:

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming_2.11</artifactId>
  4. <version>${spark.version}</version>
  5. <scope>provided</scope>
  6. </dependency>

(1)编写程序

  1. object WorldCount {
  2. def main(args: Array[String]) {
  3. val conf = new SparkConf()
  4. .setMaster("local[2]").setAppName("NetworkWordCount")
  5. val ssc = new StreamingContext(conf, Seconds(1))
  6. // Create a DStream that will connect to hostname:port, like localhost:9999
  7. val lines = ssc.socketTextStream("master01", 9999)
  8. // Split each line into words
  9. val words = lines.flatMap(_.split(" "))
  10. // import org.apache.spark.streaming.StreamingContext._
  11. // not necessary since Spark 1.3
  12. // Count each word in each batch
  13. val pairs = words.map(word => (word, 1))
  14. val wordCounts = pairs.reduceByKey(_ + _)
  15. // Print the first ten elements of each RDD generated in this DStream to the console
  16. wordCounts.print()
  17. ssc.start()
  18. ssc.awaitTermination()
  19. }
  20. }

(2) 运行程序

按照Spark Core中的方式进行打包,并将程序上传到Spark机器上。并运行:

  1. [root]# bin/spark-submit --class com.test.streaming.WorldCount ~/wordcount-jar-with-dependencies.jar

通过Netcat发送数据:

  1. [root]# nc -lk 9999
  2. hello world

4.3 架构与抽象

Spark Streaming 使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。Spark Streaming 从各种输入源中读取数据,并把数据分组为小的批次。新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。

在时间区间结束时,批次停止增长。时间区间的大小是由批次间隔这个参数决定的。批次间隔一般设在 500 毫秒到几秒之间,由应用开发者配置。每个输入批次都形成一个 RDD,以 Spark 作业的方式处理并生成其他的 RDD。 处理的结果可以以批处理的方式传给外部系统。

Spark Streaming 的编程抽象是离散化流,也就是 DStream。它是一个 RDD 序列,每个RDD代表数据流中一个时间片内的数据。

4. Streaming 应用解析 - 图4

4. Streaming 应用解析 - 图5

4. Streaming 应用解析 - 图6

如上图所示,Spark Streaming 为每个输入源启动对应的接收器。接收器以任务的形式运行在应用的执行器进程中,从输入源收集数据并保存为 RDD。它们收集到输入数据后会把数据复制到另一个执行器进程来保障容错性(默认行为)。数据保存在执行器进程的内存中,同缓存 RDD 方式。驱动器程序中的 StreamingContext 会周期性地运行 Spark 作业来处理这些数据,把数据与之前时间区间中的 RDD 进行整合。

4.4 Spark Streaming解析

4.4.1 初始化 StreamingContext

初始化:

  1. import org.apache.spark._
  2. import org.apache.spark.streaming._
  3. val conf = new SparkConf().setAppName(appName).setMaster(master)
  4. val ssc = new StreamingContext(conf, Seconds(1))
  5. // 可以通过ssc.sparkContext 来访问SparkContext
  6. // 或者通过已经存在的SparkContext来创建StreamingContext
  7. import org.apache.spark.streaming._
  8. val sc = ... // existing SparkContext
  9. val ssc = new StreamingContext(sc, Seconds(1))

初始化完Context之后:

  1. 定义消息输入源来创建 DStreams
  2. 定义 DStreams 的转化操作和输出操作。
  3. 通过 streamingContext.start() 来启动消息采集和处理。
  4. 等待程序终止,可以通过 **streamingContext.awaitTermination()** 来设置。
  5. 通过 streamingContext.stop() 来手动终止处理程序。

StreamingContext 和 SparkContext 的关系:

  1. import org.apache.spark.streaming._
  2. val sc = ... // existing SparkContext
  3. val ssc = new StreamingContext(sc, Seconds(1))

注意:

  • StreamingContext 一旦启动,对 DStreams 的操作就不能修改了。
  • 在同一时间一个 JVM 中只有一个 StreamingContext 可以启动。
  • stop() 方法将同时停止 SparkContext,可传入参数 stopSparkContext 用于只停止 StreamingContext。
  • Spark1.4 版本后可停止 SparkStreaming 而不丢失数据,通过设置 sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true") 即可。在 StreamingContext 的 start 方法中已经注册了 Hook 方法。

4.4.2 什么是 DStreams

Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据。对数据的操作也是按照 RDD 为单位来进行的。计算过程由 Spark engine 来完成。

4. Streaming 应用解析 - 图7

4. Streaming 应用解析 - 图8

4. Streaming 应用解析 - 图9

4.4.3 DStreams 输入

资源要求

Spark Streaming 原生支持一些不同的数据源。一些“核心”数据源已经被打包到Spark Streaming 的 Maven 工件中,而其他的一些则可以通过 spark-streaming-kafka 等附加工件获取。每个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行,因此会占据分配给应用的 CPU 核心。此外,我们还需要有可用的 CPU 核心来处理数据。这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数。例如,如果我们想要在流计算应用中运行 10 个接收器,那么至少需要为应用分配 11 个 CPU 核心。所以如果在本地模式运行,不要使用 local 或者 local[1]

基本数据源

文件数据源:Socket数据流前面例子已看过。读文件数据流,HDFS API 兼容的文件系统文件,通过 streamingContext.fileStream [ KeyClass, ValueClass, InputFormatClass ] (dataDirectory) 读。如 ssc.textFileStream("hdfs://master:9000/data/"),Spark Streaming 将会监控 dataDirectory 目录并不断处理移动进来的文件,记住目前不支持嵌套目录。

注意:

  • 文件需要有相同的数据格式
  • 文件进入 dataDirectory 的方式需要通过移动或者重命名来实现。
  • 一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据。
  • 如果文件比较简单,可用 streamingContext.textFileStream(dataDirectory) 方法来读取文件。文件流不需要接收器,不需要单独分配 CPU 核

自定义数据源

通过继承 Receiver,并实现 onStartonStop 方法来自定义数据源采集。

自定义数据源:

  1. //模拟Spark内置的Socket链接
  2. class CustomReceiver (host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
  3. override def onStart(): Unit = {
  4. // Start the thread that receives data over a connection
  5. new Thread("Socket Receiver") {
  6. override def run() { receive() }
  7. }.start()
  8. }
  9. override def onStop(): Unit = {
  10. // There is nothing much to do as the thread calling receive() is designed to stop by itself if isStopped() returns false
  11. }
  12. // Create a socket connection and receive data until receiver is stopped
  13. private def receive() {
  14. var socket: Socket = null
  15. var userInput: String = null
  16. try {
  17. // Connect to host:port
  18. socket = new Socket(host, port)
  19. // Until stopped or connection broken continue reading
  20. val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
  21. userInput = reader.readLine()
  22. while(!isStopped && userInput != null) {
  23. // 传送出来
  24. store(userInput)
  25. userInput = reader.readLine()
  26. }
  27. reader.close()
  28. socket.close()
  29. // Restart in an attempt to connect again when server is active again
  30. restart("Trying to connect again")
  31. } catch {
  32. case e: java.net.ConnectException =>
  33. // restart if could not connect to server
  34. restart("Error connecting to " + host + ":" + port, e)
  35. case t: Throwable =>
  36. // restart if there is any other error
  37. restart("Error receiving data", t)
  38. }
  39. }
  40. }
  41. object CustomReceiver {
  42. def main(args: Array[String]) {
  43. val conf = new SparkConf().setMaster("local[2]")
  44. .setAppName("NetworkWordCount")
  45. val ssc = new StreamingContext(conf, Seconds(1))
  46. // Create a DStream that will connect to hostname:port, like localhost:9999
  47. val lines = ssc.receiverStream(new CustomReceiver("master01", 9999))
  48. // Split each line into words
  49. val words = lines.flatMap(_.split(" "))
  50. // import org.apache.spark.streaming.StreamingContext._ not necessary since Spark 1.3
  51. // Count each word in each batch
  52. val pairs = words.map(word => (word, 1))
  53. val wordCounts = pairs.reduceByKey(_ + _)
  54. // Print the first ten elements of each RDD generated in this DStream to the console
  55. wordCounts.print()
  56. ssc.start() // Start the computation
  57. ssc.awaitTermination() // Wait for the computation to terminate
  58. //ssc.stop()
  59. }
  60. }

结果:

4. Streaming 应用解析 - 图10

RDD 队列

测试过程中,可以通过使用 streamingContext.queueStream(queueOfRDDs) 来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处理。

例如:

  1. object QueueRdd {
  2. def main(args: Array[String]) {
  3. val conf = new SparkConf().setMaster("local[2]").setAppName("QueueRdd")
  4. val ssc = new StreamingContext(conf, Seconds(1))
  5. // Create the queue through which RDDs can be pushed to
  6. // a QueueInputDStream
  7. //创建RDD队列
  8. val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
  9. // Create the QueueInputDStream and use it do some processing
  10. // 创建QueueInputDStream
  11. val inputStream = ssc.queueStream(rddQueue)
  12. //处理队列中的RDD数据
  13. val mappedStream = inputStream.map(x => (x % 10, 1))
  14. val reducedStream = mappedStream.reduceByKey(_ + _)
  15. //打印结果
  16. reducedStream.print()
  17. //启动计算
  18. ssc.start()
  19. // Create and push some RDDs into
  20. for (i <- 1 to 30) {
  21. rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
  22. Thread.sleep(2000)
  23. //通过程序停止StreamingContext的运行
  24. //ssc.stop()
  25. }
  26. }
  27. }

执行结果:

  1. [root]# bin/spark-submit --class com.atguigu.streaming.QueueRdd ~/queueRdd-jar-with-dependencies.jar
  2. -------------------------------------------
  3. Time: 1504668485000 ms
  4. -------------------------------------------
  5. (4,30)
  6. (0,30)
  7. (6,30)
  8. (8,30)
  9. (2,30)
  10. (1,30)
  11. (3,30)
  12. (7,30)
  13. (9,30)
  14. (5,30)
  15. -------------------------------------------
  16. Time: 1504668486000 ms
  17. -------------------------------------------
  18. -------------------------------------------
  19. Time: 1504668487000 ms
  20. -------------------------------------------
  21. (4,30)
  22. (0,30)
  23. (6,30)
  24. (8,30)
  25. (2,30)
  26. (1,30)
  27. (3,30)
  28. (7,30)
  29. (9,30)
  30. (5,30)

高级数据源

除核心数据源外,还可以用附加数据源接收器来从一些知名数据获取系统中接收的数据,这些接收器都作为 Spark Streaming 的组件进行独立打包了。它们仍然是Spark的一部分,不过你需要在构建文件中添加额外的包才能使用它们。现有的接收器包括 Twitter、Apache Kafka、Amazon Kinesis、Apache Flume,以及 ZeroMQ。可以通过添加与 Spark 版本匹配的 Maven 工件 spark-streaming-[projectname]_2.10 来引入这些附加接收器。

Apache Kafka

在工程中需要引入 Maven工件 spark-streaming-kafka_2.10 来使用它。包内提供的 KafkaUtils 对象可以在 StreamingContext 和 JavaStreamingContext 中以你的 Kafka 消息创建出 DStream。由于 KafkaUtils 可以订阅多个主题,因此它创建出的 DStream 由成对的主题和消息组成。要创建出一个流数据,需要使用 StreamingContext 实例、一个由逗号隔开的 ZooKeeper 主机列表字符串、消费者组的名字(唯一名字),以及一个从主题到针对这个主题的接收器线程数的映射表来调用 createStream() 方法。

方法:

  1. // 创建一个从主题到接收器线程数的映射表
  2. scala>import org.apache.spark.streaming.kafka._
  3. scala>val topics = List(("pandas", 1), ("logs", 1)).toMap
  4. scala>val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics)
  5. scala>topicLines.map(_._2)

spark kafka 实例

演示 SparkStreaming 如何从 Kafka 读取消息,如果通过连接池方法把消息处理完成后再写回给 Kafka。

4. Streaming 应用解析 - 图11

(1)在原 Spark Streaming 项目基础上创建集成 Kafka 子项目并添加如下 Maven 依赖。

  1. <!-- 提供对象连接池的一种方式 -->
  2. <dependency>
  3. <groupId>org.apache.commons</groupId>
  4. <artifactId>commons-pool2</artifactId>
  5. <version>2.4.2</version>
  6. </dependency>
  7. <!-- 用来连接Kafka的工具类 -->
  8. <dependency>
  9. <groupId>org.apache.kafka</groupId>
  10. <artifactId>kafka-clients</artifactId>
  11. <version>0.10.2.1</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.spark</groupId>
  15. <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  16. <version>${spark.version}</version>
  17. </dependency>

(2)构建 Kafka 生产者连接池类。

  1. //单例对象
  2. object createKafkaProducerPool{
  3. //用于返回真正的对象池GenericObjectPool
  4. def apply(brokerList: String, topic: String): GenericObjectPool[KafkaProducerProxy] = {
  5. val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic))
  6. val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory)
  7. //指定了你的kafka对象池的大小
  8. val poolConfig = {
  9. val c = new GenericObjectPoolConfig
  10. val maxNumProducers = 10
  11. c.setMaxTotal(maxNumProducers)
  12. c.setMaxIdle(maxNumProducers)
  13. c
  14. }
  15. //返回一个对象池
  16. new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig)
  17. }
  18. }

(3)Kafka 生产者代理对象。

  1. case class KafkaProducerProxy(brokerList: String, producerConfig: Properties = new Properties,defaultTopic: Option[String] = None, producer: Option[KafkaProducer [String, String]] = None) {
  2. type Key = String
  3. type Val = String
  4. // scala可以使用require进行函数参数限制
  5. require(brokerList == null || !brokerList.isEmpty, "Must set broker list")
  6. private val p = producer getOrElse {
  7. var props:Properties= new Properties();
  8. props.put("bootstrap.servers", brokerList);
  9. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  10. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  11. new KafkaProducer[String,String](props)
  12. }
  13. //把我的消息包装成了ProducerRecord
  14. private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = {
  15. val t = topic.getOrElse(defaultTopic.getOrElse(throw new IllegalArgumentException("Must provide topic or default topic")))
  16. require(!t.isEmpty, "Topic must not be empty")
  17. key match {
  18. case Some(k) => new ProducerRecord(t, k, value)
  19. case _ => new ProducerRecord(t, value)
  20. }
  21. }
  22. def send(key: Key, value: Val, topic: Option[String] = None) {
  23. //调用KafkaProducer他的send方法发送消息
  24. p.send(toMessage(value, Option(key), topic))
  25. }
  26. def send(value: Val, topic: Option[String]) {
  27. send(null, value, topic)
  28. }
  29. def send(value: Val, topic: String) {
  30. send(null, value, Option(topic))
  31. }
  32. def send(value: Val) {
  33. send(null, value, None)
  34. }
  35. def shutdown(): Unit = p.close()
  36. }
  37. abstract class KafkaProducerFactory(brokerList: String, config: Properties, topic: Option[String] = None) extends Serializable {
  38. def newInstance(): KafkaProducerProxy
  39. }
  40. class BaseKafkaProducerFactory(brokerList: String,config: Properties = new Properties,defaultTopic: Option[String] = None) extends KafkaProducerFactory (brokerList, config, defaultTopic) {
  41. override def newInstance() = new KafkaProducerProxy(brokerList, config, defaultTopic)
  42. }
  43. // 继承一个基础的连接池,需要提供池化的对象类型
  44. class PooledKafkaProducerAppFactory(val factory: KafkaProducerFactory) extends BasePooledObjectFactory[KafkaProducerProxy] with Serializable {
  45. // 用于池来创建对象
  46. override def create(): KafkaProducerProxy = factory.newInstance()
  47. // 用于池来包装对象
  48. override def wrap(obj: KafkaProducerProxy): PooledObject[KafkaProducerProxy] = new DefaultPooledObject(obj)
  49. // 用于池来销毁对象
  50. override def destroyObject(p: PooledObject[KafkaProducerProxy]): Unit = {
  51. p.getObject.shutdown()
  52. super.destroyObject(p)
  53. }
  54. }

(4)Spark Streaming 集成 Kafka 类

  1. object KafkaStreaming {
  2. def main(args: Array[String]) {
  3. //设置sparkconf
  4. val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming Kafka")
  5. //新建了streamingContext
  6. val ssc = new StreamingContext(conf, Seconds(1))
  7. //kafka的地址
  8. val brobrokers ="192.168.10.30:9092,192.168.10.31:9092,192.168.10.32:9092"
  9. //kafka的队列名称
  10. val sourcetopic="source";
  11. //kafka的队列名称
  12. val targettopic="target";
  13. //创建消费者组名
  14. var group="con-consumer-group"
  15. //kafka消费者配置
  16. val kafkaParam = Map(
  17. "bootstrap.servers" -> brobrokers,//用于初始化链接到集群的地址
  18. "key.deserializer" -> classOf[StringDeserializer],
  19. "value.deserializer" -> classOf[StringDeserializer],
  20. //用于标识这个消费者属于哪个消费团体
  21. "group.id" -> group,
  22. //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
  23. //可以使用这个配置,latest自动重置偏移量为最新的偏移量
  24. "auto.offset.reset" -> "latest",
  25. //如果是true,则这个消费者的偏移量会在后台自动提交
  26. "enable.auto.commit" -> (false: java.lang.Boolean)
  27. //ConsumerConfig.GROUP_ID_CONFIG
  28. );
  29. //创建DStream,返回接收到的输入数据
  30. val stream = KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(sourcetopic),kafkaParam))
  31. //定义偏移量数组
  32. var offsetRanges = Array[OffsetRange]()
  33. stream.transform { rdd =>
  34. //获取offset信息
  35. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  36. rdd
  37. }.map(
  38. //每一个stream都是一个ConsumerRecord
  39. s =>("id:" + s.key(),">>>>:"+s.value()) ).foreachRDD(rdd => {
  40. //对于RDD的每一个分区执行一个操作
  41. rdd.foreachPartition(partitionOfRecords => {
  42. // kafka连接池
  43. val pool = createKafkaProducerPool(brobrokers, targettopic)
  44. //从连接池里面取出了一个Kafka的连接
  45. val p = pool.borrowObject()
  46. //发送当前分区里面每一个数据
  47. partitionOfRecords.foreach {
  48. message => System.out.println(message._2)
  49. p.send(message._2,Option(targettopic))
  50. }
  51. // 使用完了需要将kafka还回去
  52. pool.returnObject(p)
  53. for( o <- offsetRanges ){
  54. //打印offset信息...
  55. println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  56. }
  57. })
  58. })
  59. ssc.start()
  60. ssc.awaitTermination()
  61. }
  62. }

(5)程序部署

启动 zookeeper 和 kafka:

  1. [root]# bin/kafka-server-start.sh -deamon ./config/server.properties

创建两个 topic,一个为 source,一个为 target:

  1. [root]# bin/kafka-topics.sh --create --zookeeper 192.168.56.150:2181, 192.168.56.151:2181,192.168.56.152:2181 --replication-factor 2 --partitions 2 --topic source
  2. [root]# bin/kafka-topics.sh --create --zookeeper 172.16.148.150:2181, 172.16.148.151:2181,172.16.148.152:2181 --replication-factor 2 --partitions 2 --topic target

启动 kafka console producer 写入source topic:

  1. [root]# bin/kafka-console-producer.sh --broker-list 192.168.56.150:9092, 192.168.56.151:9092, 192.168.56.152:9092 --topic source

启动 kafka console consumer 监听 target topic:

  1. [root]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.150:9092, 192.168.56.151:9092, 192.168.56.152:9092 --topic source

启动 kafkaStreaming 程序:

  1. [root]# ./hadoop/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.KafkaStreaming ./kafkastreaming-jar-with-dependencies.jar

程序运行部分结果如下:

  1. [root]# bin/kafka-console-producer.sh --broker-list 192.168.10.30:9092, 192.168.10.31:9092,192.168.10.32:9092 --topic source
  2. >hello
  3. >hello spark
  4. >hello spark
  5. >haha
  6. >shuishui
  7. >ss
  8. >
  9. 1
  10. 2
  11. 3
  12. 4
  13. 5
  14. 6
  15. 7
  16. 8
  17. [root]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.30:9092, 192.168.10.31:9092,192.168.10.32:9092 --topic target
  18. >>>>:hello
  19. >>>>:hello spark
  20. >>>>:hello spark
  21. >>>>:haha
  22. >>>>:shuishui
  23. >>>>:ss
  24. 1
  25. 2
  26. 3
  27. 4
  28. 5
  29. 6
  30. 7
  31. #kafkaStreaming程序运行部分打印日志:
  32. >>>>:ss
  33. source 0 3 3
  34. source 1 2 3
  35. source 0 3 3
  36. source 1 3 3
  37. source 0 3 3
  38. source 1 3 3

4.4.4 Spark 对 Kafka 两种连接方式的对比

Receiver
4. Streaming 应用解析 - 图12

Receiver 是使用 Kafka 的 High-Level Consumer API 来实现的。Receiver 从 Kafka 中获取的数据都存储在 Spark Executor 的内存中的(如果数据暴增,数据大量堆积,容易出现 **oom **的问题),Spark Streaming 启动的 job 会去处理那些数据。

在默认的配置下,这种方式可能会因为底层的失败而丢失数据,如果要启用高可靠机制,让数据零丢失,就必须启用 Spark Streaming 的预写日志机制(Write Ahead Log, WAL),该机制会同步地将接收到的 Kafka 数据写入分布式文件系统(比如 HDFS, S3)上的预写日志中,所以当底层节点出现了失败,可以通过 WAL 中的数据进行恢复,但是效率会下降

使用时注意事项:

  • 操作简单,代码量少,不需要手动管理 offset,需要开启 wal 机制,可以保证数据不丢失,但效率会减低,并且为了保证数据不丢失,将一份数据存两份,浪费资源。
  • 无法保证数据只被处理一次,在写入外部存储的数据还未将 offset 更新到 zk 就挂掉,这些数据会被重复消费。
  • kafka 的 topic 的分区和 spark streaming 生成的 rdd 分区不相关,增加 topic 的分区数,只会增加 reciver 读取分区数据的线程数,并不会提高 spark 的处理数据的并行度。

Direct
4. Streaming 应用解析 - 图13

Direct 使用 Kafka 的 Low-Level Consumer api 读取 kafka 数据,来获得每个 topic+partition 的最新的 offset,从而定义每个 batch 的 offset 的范围。当处理数据的 job 启动时,就会使用 Kafka 的 Low-Level Consumer api 来获取 Kafka 指定 offset 范围的数据

使用时注意事项:

  • 当读取 topic 的数据时候,会自动对应 topic 的分区生成对应的 RDD 分区并行从 Kafka 中读取数据,在 Kafka partitionRDD partition 之间,有一对一的映射关系。
  • 不需要开启 WAL 机制,只要 Kafka 中作了数据的备份,那么就可以使用通过 Kafka 的副本进行恢复。
  • Spark 内部一定时同步的,所以可以自己跟踪 offset 并保存到 checkpoint 中,可以保证数据不会被重复消费。
  • 操作复杂,代码量大,并且需要自己对 offset 监控维护,增加用户开发成本。

Receiver 配合着 WAL 机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为 Spark 和 ZooKeeper 之间可能是不同步的。基于 direct 的方式,使用 kafka 的简单 api,SparkStreaming 自己就负责追踪消费的 offset,并保存在 checkpoint 中,Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次

区别
**

Receiver Direct
需要开启WAL 不需要开启WAL
使用高层次api 使用简单api
zk自动维护 手动维护offset
无法保证数据被处理一次 数据只被处理一次
代码简单,量少 代码复杂,量大
topic分区与rdd分区不是一对一的关系 topic分区与rdd分区是一对一的关系
由receiver拉取kafka数据 由rdd分区拉取对应分区的数据(kafka与rdd分区相等的情况)
..

例子
**
Maven依赖:

  1. <properties>
  2. <scala.version>2.11.8</scala.version>
  3. <spark.version>2.1.3</spark.version>
  4. <scala.binary.version>2.11</scala.binary.version>
  5. </properties>
  6. <dependencies>
  7. <dependency>
  8. <groupId>org.scala-lang</groupId>
  9. <artifactId>scala-library</artifactId>
  10. <version>${scala.version}</version>
  11. </dependency>
  12. <!-- spark-core -->
  13. <dependency>
  14. <groupId>org.apache.spark</groupId>
  15. <artifactId>spark-core_${scala.binary.version}</artifactId>
  16. <version>${spark.version}</version>
  17. </dependency>
  18. <!-- spark-streaming -->
  19. <dependency>
  20. <groupId>org.apache.spark</groupId>
  21. <artifactId>spark-streaming_${scala.binary.version}</artifactId>
  22. <version>${spark.version}</version>
  23. </dependency>
  24. <!-- spark-streaming kafka -->
  25. <dependency>
  26. <groupId>org.apache.spark</groupId>
  27. <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
  28. <version>${spark.version}</version>
  29. </dependency>
  30. </dependencies>

Receiver:

  1. import kafka.serializer.StringDecoder
  2. import org.apache.spark.storage.StorageLevel
  3. import org.apache.spark.streaming.kafka.KafkaUtils
  4. import org.apache.spark.streaming.{Seconds, StreamingContext}
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. object SparkKafkaReceiver {
  7. private val topics = "receiver-test"
  8. private val HDFS_PATH = "hdfs://node01:9000/kafka-ck"
  9. private val numThreads = 1
  10. def main(args: Array[String]): Unit = {
  11. //当应用程序停止的时候,会将当前批次的数据处理完成后在停止
  12. System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true")
  13. //1000*分区数*采样时间=拉取数据量
  14. System.setProperty("spark.streaming.kafka.maxRatePerPartition", "1000")
  15. val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
  16. //设置监控级别
  17. .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
  18. val sc = new SparkContext(conf)
  19. val ssc = new StreamingContext(sc, Seconds(5))
  20. ssc.checkpoint(HDFS_PATH)
  21. val kafkaParams = Map(
  22. "metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
  23. "zookeeper.connect" -> "node01:2181,node02:2181,node03:2181",
  24. "group.id" -> "receiver"
  25. )
  26. val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
  27. val kafkaDStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  28. ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_2)
  29. // word count
  30. kafkaDStream
  31. .map(_._2) // 1是分区号,2是具体kafka中数
  32. .flatMap(_.split(" "))
  33. .map((_, 1))
  34. .reduceByKey(_ + _)
  35. .print(10) // 输出结果
  36. ssc.start()
  37. ssc.awaitTermination()
  38. }
  39. }

Direct:

  1. import kafka.common.TopicAndPartition
  2. import kafka.message.MessageAndMetadata
  3. import kafka.serializer.StringDecoder
  4. import org.I0Itec.zkclient.ZkClient
  5. import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
  6. import org.apache.log4j.Logger
  7. import org.apache.spark.rdd.RDD
  8. import org.apache.spark.streaming.dstream.InputDStream
  9. import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
  10. import org.apache.spark.streaming.{Seconds, StreamingContext}
  11. import org.apache.spark.{SparkConf, SparkContext}
  12. import org.apache.zookeeper.data.Stat
  13. object SparkKafkaDirect {
  14. private val zkHosts = "node01:2181,node02:2181,node03:2181"
  15. private val logger = Logger.getLogger("SparkKafkaDirect")
  16. private val zkPath = "/kafka-direct-test"
  17. private val topic = Set("direct-test")
  18. private val HDFS_PATH="hdfs://node01:9000/kafka-ck"
  19. def main(args: Array[String]): Unit = {
  20. val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
  21. val sc = new SparkContext(conf)
  22. val ssc = new StreamingContext(sc, Seconds(5))
  23. val ssc = new StreamingContext(sc, Seconds(5))
  24. val kafkaParams = Map(
  25. "metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
  26. "group.id" -> "direct"
  27. )
  28. val zkClient: ZkClient = new ZkClient(zkHosts)
  29. // 读取 offset
  30. val offsets: Option[Map[TopicAndPartition, Long]] = readOffset(zkClient)
  31. // 获取到kafka数据
  32. val kafkaDStream: InputDStream[(String, String)] = offsets match {
  33. // 使用 direct方式消费kafka数据
  34. case None =>
  35. print("start from scratch")
  36. KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
  37. case Some(offset) =>
  38. print("start with the offset")
  39. val messageHeader = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
  40. KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, offset, messageHeader)
  41. }
  42. // word count
  43. kafkaDStream.map(_._2) // 1是分区号,2是具体kafka中数
  44. .flatMap(_.split(" "))
  45. .map((_, 1))
  46. .reduceByKey(_ + _)
  47. .foreachRDD(print(_)) // 输出结果
  48. // 保存偏移量到zk中 , 也可自定义到其他存储介质
  49. kafkaDStream.foreachRDD(rdd =>
  50. saveOffset(zkClient, zkHosts, zkPath, rdd)
  51. )
  52. ssc.start()
  53. ssc.awaitTermination()
  54. }
  55. // 保存 offset
  56. def saveOffset(zkClient: ZkClient, zkHost: String, zkPath: String, rdd: RDD[_]): Unit = {
  57. logger.info("save offsets to Zookeeper")
  58. val stopwatch = new Stopwatch()
  59. val offsetsRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  60. offsetsRanges.foreach(offsetRange => logger.debug(s" Using $offsetRange"))
  61. val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
  62. .mkString(",")
  63. logger.info("writing offsets to Zookeeper zkClient=" + zkClient + " zkHosts=" + zkHosts + "zkPath=" + zkPath + " offsetsRangesStr:" + offsetsRangesStr)
  64. updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
  65. logger.info("done updating offsets in zookeeper. took " + stopwatch)
  66. }
  67. // 读取 offset
  68. def readOffset(zkClient: ZkClient): Option[Map[TopicAndPartition, Long]] = {
  69. val stopwatch = new Stopwatch()
  70. val stat = new Stat()
  71. val dataAndStat: (Option[String], Stat) = try {
  72. (Some(zkClient.readData(zkPath, stat)), stat)
  73. } catch {
  74. case _ => (None, stat)
  75. case e2: Throwable => throw e2
  76. }
  77. // 获取offset
  78. dataAndStat._1 match {
  79. case Some(offsetsRangeStr) =>
  80. logger.info(s" Read offset ranges: $offsetsRangeStr")
  81. val offset: Map[TopicAndPartition, Long] = offsetsRangeStr.split(",")
  82. .map(str => str.split(":"))
  83. .map {
  84. case Array(partitions, offset) =>
  85. TopicAndPartition(topic.last, partitions.toInt) -> offset.toLong
  86. }.toMap
  87. logger.info("Done reading offsets from Zookeeper. Took " + stopwatch)
  88. Some(offset)
  89. case None =>
  90. logger.info(" No offsets found in Zookeeper. Took " + stopwatch)
  91. None
  92. }
  93. }
  94. // 更新 zk中的 offset
  95. def updatePersistentPath(zkClient: ZkClient, zkPath: String, offsetsRangesStr: String): Unit = {
  96. try {
  97. zkClient.writeData(zkPath, offsetsRangesStr)
  98. } catch {
  99. // 如果失败了 ==> 没有此目录,则创建目录
  100. case _: ZkNoNodeException =>
  101. createParentPath(zkClient, zkPath)
  102. try {
  103. // 创建一个持久的节点 ==> 即 目录
  104. // 在offset写入到 该节点
  105. zkClient.createPersistent(zkPath, offsetsRangesStr)
  106. } catch {
  107. case _: ZkNodeExistsException =>
  108. zkClient.writeData(zkPath, offsetsRangesStr)
  109. case e2: Throwable => throw e2
  110. }
  111. case e2: Throwable => throw e2
  112. }
  113. }
  114. // 如果path不存在,则创建
  115. def createParentPath(zkClient: ZkClient, zkPath: String): Unit = {
  116. val parentDir = zkPath.substring(0, zkPath.lastIndexOf('/'))
  117. if (parentDir.length != 0)
  118. zkClient.createPersistent(parentDir, true)
  119. }
  120. // 过程时间
  121. class Stopwatch {
  122. private val start = System.currentTimeMillis()
  123. override def toString: String = (System.currentTimeMillis() - start) + " ms"
  124. }
  125. }

4.4.5 Flume-ng

Spark 提供两个不同的接收器来使用 Apache Flume。

两个接收器简介如下:

  • 推式接收器:以 Avro 数据池的方式工作,由 Flume 向其中推数据。
  • 拉式接收器:可从自定义的中间数据池中拉数据,其他进程可用Flume把数据推进该中间数据池。

4. Streaming 应用解析 - 图14

(1) 推式接收器的方法

缺点是没有事务支持。这会增加运行接收器的工作节点发生错误 时丢失少量数据的几率。不仅如此,如果运行接收器的工作节点发生故障,系统会尝试从另一个位置启动接收器,这时需要重新配置 spark-flumeng.conf 才能将数据发给新的工作节点。

4. Streaming 应用解析 - 图15

(2) 拉式接收器的方法

它设置了一个专用的 Flume 数据池供 Spark Streaming 读取,并让接收器主动从数据池中拉取数据。这种方式的优点在于弹性较好,Spark Streaming 通过事务从数据池中读取并复制数据。在收到事务完成的通知前,这 些数据还保留在数据池中。

当你把自定义 Flume 数据池添加到一个节点上之后,就需要配置 spark-flumeng.conf 来把数据推送到这个数据池中,配置如下:

  1. a1.sinks = spark
  2. a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
  3. a1.sinks.spark.hostname = receiver-hostname
  4. a1.sinks.spark.port = port-used-for-sync-not-spark-port
  5. a1.sinks.spark.channel = memoryChannel

4. Streaming 应用解析 - 图16

4.4.6 DStreams 转换

概述

DStream 上的原语与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()transform() 以及各种 Window 相关的原语。DStream 的转化操作可以分为无状态(stateless)和有状态(stateful)两种:

在无状态转化操作中,每个批次的处理不依赖于之前批次的数据。常见 RDD 转化操作,例如 map()filter()reduceByKey() 等,都是无状态转化操作。

相对地,有状态转化操作需要使用之前批次的数据或者是中间结果来计算当前批次的数据。有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。

4. Streaming 应用解析 - 图17

**

无状态转化操作

就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。针对键值对的 DStream 转化操作(比如 reduceByKey())要添加 import StreamingContext._ 才能在 Scala 中使用。

4. Streaming 应用解析 - 图18

有状态转化操作

(1)追踪状态变化 UpdateStateByKey

以DStream 中的数据进行按 key 做 reduce 操作,然后对各个批次的数据进行累加。在有新的数据信息进入或更新时,可以让用户保持想要的任何状。

使用这个功能需要完成两步:

  1. 定义状态:可以是任意数据类型。
  2. 定义状态更新函数:用一个函数指定如何使用先前的状态,从输入流中的新值更新状态。对于有状态操作,要不断的把当前和历史的时间切片的 RDD 累加计算,随着时间的流失,计算的数据规模会变得越来越大。

更新版的 wordcount:

  1. object updateStateByKeyPro {
  2. def main(args: Array[String]): Unit = {
  3. val conf=new SparkConf().setAppName("updateStateByKeyPro")
  4. .setMaster("local[2]")
  5. val ssc=new StreamingContext(conf,Seconds(10))
  6. //开启checkpoint
  7. ssc.checkpoint("hdfs://tgmaster:9000/in/ch")
  8. //连接nc(netcat)服务,接收数据源,产生Dtream 对象
  9. val lines=ssc.socketTextStream("localhost",9999)
  10. //分隔单词,并将分隔后的每个单词出现次数记录为1
  11. val pairs=lines.flatMap(_.split(" ")).map(word=>(word,1))
  12. //调用updateStateByKey算子,统计单词在全局中出现的次数
  13. val result=pairs.updateStateByKey((values:Seq[Int], state:Option[Int])=>{
  14. //创建一个变量,用于记录单词出现次数
  15. var newValue=state.getOrElse(0) //getOrElse相当于if....else.....
  16. for(value <- values){
  17. newValue +=value //将单词出现次数累计相加
  18. }
  19. Option(newValue)
  20. })
  21. //直接输出结果
  22. result.print()
  23. ssc.start() //开启实时计算
  24. ssc.awaitTermination() //等待应用停止
  25. }
  26. }

(2)Window Operations(窗口操作)

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次间隔的源 DStream,要创建一个最近 30 秒的时间窗口(即最近 3 个批次)为一个 DStream,就应当把 windowDuration 设为 30 秒。而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒,并且我们只希望每两个批次计算一次窗口结果,就应该把滑动步长设置为 20 秒。

4. Streaming 应用解析 - 图19

4. Streaming 应用解析 - 图20

4. Streaming 应用解析 - 图21

reduceByWindow()reduceByKeyAndWindow() 让我们可以对每个窗口更高效地进行归约操作。它们接收一个归约函数,在整个窗口上执行,比如 +。除此以外,它们还有一种特殊形式,通过只考虑新进入窗口的数据和离开窗口的数据,让 Spark 增量计算归约结果。这种特殊形式需要提供归约函数的一个逆函数,比如 + 对应的逆函数为 -。对于较大的窗口,提供逆函数可以大大提高执行效率。

4. Streaming 应用解析 - 图22

例如:

  1. val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
  2. val ipCountDStream = ipDStream.reduceByKeyAndWindow(
  3. {(x, y) => x + y}, // 加上新进入窗口的批次中的元素
  4. {(x, y) => x - y}, // 移除离开窗口的老批次中的元素
  5. Seconds(30), // 窗口时长
  6. Seconds(10)) // 滑动步长

countByWindow()countByValueAndWindow() 作为对数据进行计数操作的简写。countByWindow() 返回一个表示每个窗口中元素个数的 DStream,而 countByValueAndWindow() 返回的 DStream 则包含窗口中每个值的个数。

例如:

  1. val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
  2. val ipAddressRequestCount=ipDStream.countByValueAndWindow(Seconds(30),Seconds(10))
  3. val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))

重要操作

Transform 操作:Transform 原语允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。 该函数每一批次调度一次。比如下面的例子,在进行单词统计的时候,想要过滤掉 spam 的信息。其实也就是对 DStream 中的 RDD 应用转换。

例如:

  1. val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...)
  2. val cleanedDStream = wordCounts.transform { rdd =>
  3. rdd.join(spamInfoRDD).filter(...)
  4. ...
  5. }

Join 操作:连接操作(leftOuterJoin, rightOuterJoin, fullOuterJoin 也可以),可以连接。

例如:

  1. Stream-Streamwindows-stream to windows-streamstream-dataset
  2. //Stream-Stream Joins
  3. val stream1: DStream[String, String] = ...
  4. val stream2: DStream[String, String] = ...
  5. val joinedStream = stream1.join(stream2)
  6. val windowedStream1 = stream1.window(Seconds(20))
  7. val windowedStream2 = stream2.window(Minutes(1))
  8. val joinedStream = windowedStream1.join(windowedStream2)
  9. //Stream-dataset joins
  10. val dataset: RDD[String, String] = ...
  11. val windowedStream = stream.window(Seconds(20))...
  12. val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

4.4.7 DStreams 输出

与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。

通用的输出操作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和 transform() 有些类似,都可以让我们访问任意 RDD。在 foreachRDD() 中,可以重用我们在 Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。

需要注意的:

  • 连接不能写在 driver 层面
  • 如果写在 foreach 则每个 RDD 都创建,得不偿失。
  • 增加 foreachPartition,在分区创建。
  • 可以考虑使用连接池优化。

例如:

  1. dstream.foreachRDD { rdd =>
  2. // error val connection = createNewConnection() //executed at the driver序列化错误
  3. rdd.foreachPartition { partitionOfRecords =>
  4. // ConnectionPool is a static, lazily initialized pool of connections
  5. val connection = ConnectionPool.getConnection()
  6. // executed at the worker
  7. partitionOfRecords.foreach(record => connection.send(record))
  8. // return to the pool for future reuse
  9. ConnectionPool.returnConnection(connection)
  10. }
  11. }

4. Streaming 应用解析 - 图23

4.4.8 累加器和广播变量

累加器(Accumulators)和广播变量(Broadcast variables)不能从 Spark Streaming 的检查点中恢复。如果你启用检查并也使用了累加器和广播变量,那么你必须创建累加器和广播变量的延迟单实例从而在驱动因失效重启后他们可以被重新实例化。

例如:

  1. object WordBlacklist {
  2. @volatile private var instance: Broadcast[Seq[String]] = null
  3. def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
  4. if (instance == null) {
  5. synchronized {
  6. if (instance == null) {
  7. val wordBlacklist = Seq("a", "b", "c")
  8. instance = sc.broadcast(wordBlacklist)
  9. }
  10. }
  11. }
  12. instance
  13. }
  14. }
  15. object DroppedWordsCounter {
  16. @volatile private var instance: LongAccumulator = null
  17. def getInstance(sc: SparkContext): LongAccumulator = {
  18. if (instance == null) {
  19. synchronized {
  20. if (instance == null) {
  21. instance = sc.longAccumulator("WordsInBlacklistCounter")
  22. }
  23. }
  24. }
  25. instance
  26. }
  27. }
  28. wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  29. // Get or register the blacklist Broadcast
  30. val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  31. // Get or register the droppedWordsCounter Accumulator
  32. val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  33. // Use blacklist to drop words and use droppedWordsCounter to count them
  34. val counts = rdd.filter { case (word, count) =>
  35. if (blacklist.value.contains(word)) {
  36. droppedWordsCounter.add(count)
  37. false
  38. } else {
  39. true
  40. }
  41. }.collect().mkString("[", ", ", "]")
  42. val output = "Counts at time " + time + " " + counts
  43. }

4.4.9 DataFrame ans SQL Operations

你可以很容易地在流数据上使用 DataFrames SQL。必须使用 SparkContext 来创建 StreamingContext 要用的 SQLContext。此外,这一过程可以在驱动失效后重启。我们通过创建一个实例化的 SQLContext 单实例来实现这个工作。如下例所示。我们对前例 word count 进行修改从而使用 DataFrames 和 SQL 来产生 word counts。每个 RDD 被转换为 DataFrame,以临时表格配置并用 SQL 进行查询。

例如:

  1. val words: DStream[String] = ...
  2. words.foreachRDD { rdd =>
  3. // Get the singleton instance of SparkSession
  4. val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  5. import spark.implicits._
  6. // Convert RDD[String] to DataFrame
  7. val wordsDataFrame = rdd.toDF("word")
  8. // Create a temporary view
  9. wordsDataFrame.createOrReplaceTempView("words")
  10. // Do word count on DataFrame using SQL and print it
  11. val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
  12. wordCountsDataFrame.show()
  13. }

你也可以从不同的线程在定义于流数据的表上运行 SQL 查询(也就是说,异步运行 StreamingContext)。仅确定你设置 StreamingContext 记住了足够数量的流数据以使得查询操作可以运行。否则 StreamingContext 不会意识到任何异步的 SQL 查询操作,那么其就会在查询完成之后删除旧的数据。例如,如果你要查询最后一批次,但是你的查询会运行 5 分钟,那么你需要调用 streamingContext.remember(Minutes(5))(in Scala, 或者其他语言的等价操作)。

4.4.10 Caching / Persistence

和 RDDs 类似,DStreams 同样允许开发者将流数据保存在内存中。也就是说,在 DStream 上使用 persist() 方法将会自动把 DStreams 中每个 RDD 保存在内存中。当 DStream 中的数据要被多次计算时,这个很有用(如在同样数据上的多次操作)。对于像 reduceByWindowreduceByKeyAndWindow 以及基于状态的 updateStateByKey 这种操作,保存是隐含默认的。因此,即使开发者没有调用 persist(),由基于窗操作产生的 DStreams 会自动保存在内存中。

4.4.11 不间断运行

检查点机制

检查点机制是我们在 Spark Streaming 中用来保障容错性的主要机制。它可以使 Spark Streaming 阶段性地把应用数据存储到诸如 HDFS 或 Amazon S3 这样的可靠存储系统中, 以供恢复时使用。具体来说,检查点机制主要为以下两个目的服务

  • 控制发生失败时需要重算的状态数。SparkStreaming 可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。
  • 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序 并让驱动器程序从检查点恢复,这样 Spark Streaming 就可以读取之前运行的程序处理数据的进度,并从那里继续。

为了实现这个,Spark Streaming 需要为容错存储系统 checkpoint 足够的信息从而使得其可以从失败中恢复过来。有两种类型的数据设置检查点:

  • Metadata checkpointing:将定义流计算的信息存入容错的系统如 HDFS,元数据包括:
    • 配置 – 用于创建流应用的配置。
    • DStreams 操作 – 定义流应用的 DStreams 操作集合。
    • 不完整批次 – 批次的工作已进行排队但是并未完成。
  • Data checkpointing将产生的 RDDs 存入可靠的存储空间。对于在多批次间合并数据的状态转换,这个很有必要。在这样的转换中,RDDs 的产生基于之前批次的 RDDs,这样依赖链长度随着时间递增。为了避免在恢复期这种无限的时间增长(和链长度成比例),状态转换中间的 RDDs 周期性写入可靠地存储空间(如HDFS)从而切短依赖链。

总而言之,元数据检查点在由驱动失效中恢复是首要需要的。而数据或者 RDD 检查点甚至在使用了状态转换的基础函数中也是必要的。

可以通过向 ssc.checkpoint() 方法传递一个路径参数(HDFS、S3 或者本地路径均可)来配置检查点机制,同时你的应用应该能够使用检查点的数据:

  • 当程序首次启动,其将创建一个新的 StreamingContext,设置所有的流并调用 start()
  • 当程序在失效后重启,其将依据检查点目录的检查点数据重新创建一个 StreamingContext。 通过使用 StraemingContext.getOrCreate 很容易获得这个性能。

RDDs 的检查点引起存入可靠内存的开销。在 RDDs 需要检查点的批次里,处理的时间会因此而延长,检查点的间隔需要很仔细地设置。可以通过 dstream.checkpoint(checkpointInterval)。通常,检查点设置间隔是 5-10 个 DStream 的滑动间隔

检查点用例:

  1. ssc.checkpoint("hdfs://...")
  2. //创建和设置一个新的StreamingContext
  3. def functionToCreateContext():
  4. sc = SparkContext(...) # new context
  5. ssc = new StreamingContext(...)
  6. lines = ssc.socketTextStream(...) # create DStreams
  7. ...
  8. ssc.checkpoint(checkpointDirectory) # 设置检查点目录
  9. return ssc
  10. //从检查点数据中获取StreamingContext或者重新创建一个
  11. context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
  12. //在需要完成的context上做额外的配置
  13. //无论其有没有启动
  14. context ...
  15. //启动context
  16. context.start()
  17. contaxt.awaitTermination()

WAL 预写日志

WAL 即 write ahead log(预写日志),作用就是,将数据通过日志的方式写到可靠的存储,比如 HDFS、s3,在 driver 或 worker failure 时可以从在可靠存储上的日志文件恢复数据。

WAL 在 driver 端和 executor 端都有应用:

  • WAL 在 driver 端的应用:在 StreamingContext 中的 JobScheduler 中的 ReceiverTracker 的 ReceivedBlockTracker 构造函数中被创建,ReceivedBlockTracker 用于管理已接收到的 blocks 信息。需要注意的是,这里只需要启用 checkpoint 就可以创建该 driver 端的 WAL 管理实例,无需将 spark.streaming.receiver.writeAheadLog.enable 设置为 true。首选需要明确的是,ReceivedBlockTracker 通过 WAL 写入 log 文件的内容是3种事件(当然,会进行序列化),即 BlockAdditionEventBatchAllocationEvent BatchCleanupEvent,对于信息、分配和清理事件。
  • WAL 在 executor 端的应用:Receiver 接收到的数据会源源不断的传递给 ReceiverSupervisor,是否启用 WAL 机制(即 writeAheadLog 设置为 true)会影响 ReceiverSupervisor 在存储 block 时的行为:
    • 不启用 WAL:即可以接受一定的数据丢失,则不需要启用 WAL,对性能影响较大。你设置的 Storage-Level 是什么,就怎么存储。比如 MEMORY_ONLY 只会在内存中存一份,MEMORY_AND_DISK 会在内存和磁盘上各存一份等。
    • 启用 WAL:即若完全不能接受数据丢失,那就需要同时启用 checkpointWAL,checkpoint 保存着执行进度(比如已生成但未完成的 jobs),WAL 中保存着 blocks blocks 元数据(比如保存着未完成的 jobs 对应的 blocks 信息及 block 文件),这种情况需注意 exactly once 语义。在 Storage-Level 指定的存储的基础上,写一份到 WAL 中。存储一份在 WAL 上,更不容易丢数据但性能损失也比较大。

开启预写日志功能时用户传输数据的流程:

  1. 接受数据:接收器将数据分成一系列小块,存储到 Executor 内存或者磁盘中,如果启动了预写日志,数据同时还写入到容错文件系统的预写日志文件中。
  2. 通知 StreamContext接受块的元数据(Meatdata)被发送到 Driver 的 StreamingContext。这个元数据包括:定位其在 executor 内存或者磁盘中数据位置的块信息、块数据在日志文件中的偏移信息。如果启动了预写日志,数据同时还写入到容错文件系统的预写日志文件中。
  3. 处理数据:每批数据的间隔,流上下文使用块信息产生弹性分布式数据集 RDD 和他们的作业 Job,StreamingContext 通过运行任务处理 Executor 内存或者磁盘中的数据块执行作业。
  4. 周期性的设置检查点:为了恢复的需要,流计算(即StreamingContext)提供来的DStream)周期性的设置检查点,并保存到同一个容错文件系统的另外一组文件中。

4. Streaming 应用解析 - 图24
用户传输数据的生命周期如下图所示

当一个失败的Driver端重启的时候,会进行如下处理:

  1. 恢复计算:使用检查点信息重启 Driver,重新构造上下文重启接收器。
  2. 恢复元数据:为了保证能够继续下去所必备的全部元数据块都被恢复。
  3. 未完成作业的重新生成:由于失败而没有处理完成的批处理,将使用会的元数据再次产生 RDD 和对应的作业。
  4. 读取保存在日志中的块数据:在这些作业执行时,块数据之间从预写日志中读出,这将恢复在日志中可靠地保存所有必要的数据。
  5. 重发尚未确认的数据:失败时没有保存到日志中的缓存数据将由数据源再次发送。

image.png

背压机制**

默认情况下,Spark Streaming 通过 Receiver 以生产者生产数据的速率接收数据,计算过程中会出现 batch processing time > batch interval 的情况,其中 batch processing time 为实际计算一个批次花费时间, batch interval 为 Streaming 应用设置的批处理间隔。

意味着 Spark Streaming 的数据接收速率高于 Spark 从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能完全处理当前接收速率接收的数据。如果这种情况持续过长的时间,会造成数据在内存中堆积,导致 Receiver 所在 Executor 内存溢出等问题(如果设置 StorageLevel 包含 disk, 则内存存放不下的数据会溢写至 disk, 加大延迟)。

spark v1.5 开始引入反压机制(backpressure),通过动态控制数据接收速率来适配集群数据处理能力

Spark Streaming Backpressure: 根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。通过属性 spark.streaming.backpressure.enabled 来控制是否启用 backpressure 机制,默认值 false,即不启用。

4. Streaming 应用解析 - 图26

4. Streaming 应用解析 - 图27

4. Streaming 应用解析 - 图28

在原架构的基础上加上一个新的组件 RateController,这个组件负责监听“OnBatchCompleted”事件,然后从中抽取 processingDelayschedulingDelay 信息。Estimator 依据这些信息估算出最大处理速度 rate,并基于 Receiver InputStream 将 rate 通过 ReceiverTracker ReceiverSupervisorImpl 转发给 BlockGenerator(继承自 RateLimiter),这样接收器就知道下一步应该接收多少数据了。

流量控制点:当 Receiver 开始接收数据时,会通过 supervisor.pushSingle() 方法将接收的数据存入 currentBuffer 等待 BlockGenerator 定时将数据取走,包装成 block。再将数据存放入 currentBuffer 之时,要获取许可(令牌)。如果获取到许可就可以将数据存入 buffer, 否则将被阻塞,进而阻塞 Receiver 从数据源拉取数据。

4. Streaming 应用解析 - 图29

令牌桶机制:大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶大小。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回。

驱动器程序容错

驱动器程序的容错需以特殊的方式创建 StreamingContext。需把检查点目录提供给 StreamingContext。与直接调用 new StreamingContext 不同,应该使用 StreamingContext.getOrCreate() 函数。

配置过程如下:

(1)启动 Driver 自动重启功能

说明:

  • standalone: 提交任务时添加 —supervise 参数。
  • yarn: 设置 yarn.resourcemanager.am.max-attempts 或者 spark.yarn.maxAppAttempts
  • mesos: 提交任务时添加 —supervise 参数。

(2)设置 checkpoint

  1. StreamingContext.setCheckpoint(hdfsDirectory)

(3)支持从 checkpoint 中重启配置

  1. def createContext(checkpointDirectory: String): StreamingContext = {
  2. val ssc = new StreamingContext
  3. ssc.checkpoint(checkpointDirectory)
  4. ssc
  5. }
  6. val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext(checkpointDirectory))

工作节点容错

为了应对工作节点失败的问题,Spark Streaming 使用与 Spark 的容错机制相同的方法。所有从外部数据源中收到的数据都在多个工作节点上备份。所有从备份数据转化操作的过程中创建出来的 RDD 都能容忍一个工作节点的失败,根据 RDD 谱系图,系统可以把丢失的数据从幸存的输入数据备份中重算出来。对于 reduceByKey 等 Stateful 操作重做的 lineage 较长的,强制启动 checkpoint,减少重做几率。

接收器容错

运行接收器的工作节点的容错也是很重要的。如果这样的节点发生错误,Spark Streaming 会在集群中别的节点上重启失败的接收器。然而,这种情况会不会导致数据的丢失取决于数据源的行为(数据源是否会重发数据)以及接收器的实现(接收器是否会向数据源确认收到数据)。

一般主要是通过将接收到数据后先写日志(WAL)到可靠文件系统中,后才写入实际的 RDD。如果后续处理失败则成功写入 WAL 的数据通过 WAL 进行恢复,未成功写入 WAL 的数据通过可回溯的 Source 进行重放。

接收器提供以下保证:

  • 所有从可靠文件系统中读取的数据(比如通过 StreamingContext.hadoopFiles 读取的) 都是可靠的。
  • 对于像 Kafka、推式 Flume、Twitter 这样的不可靠数据源,Spark 1.2 后,收到的数据被记录到诸如 HDFS 这样的可靠的文件系统中,这样即使驱动器程序重启也不会导致数据丢失。

操作过程如下:

启用 checkpoint

  1. scala>ssc.setCheckpoint(checkpointDir)

启用 WAL:

  1. scala>sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

对 Receiver 使用可靠性存储:

  1. StoreageLevel.MEMORY_AND_DISK_SER or StoreageLevel.MEMORY_AND_DISK_SER2

处理保证

Spark Streaming可以为所有的转化操作提供 “精确一次”执行的语义。可以使用事务操作来写入外部系统(即原子化地将一个 RDD 分区一次写入),或者设计幂等的更新操作(即多次运行同一个更新操作仍生成相同的结果)。比如 Spark Streaming 的 saveAs…File 操作会在一个文件写完时自动将其原子化地移动到最终位置上,以此确保每个输出文件只存在一份。

性能考量

最常见的问题是 Spark Streaming 可以使用的最小批次间隔是多少。总的来说,500 毫秒已经被证实为对许多应用而言是比较好的最小批次大小。寻找最小批次大小的最佳实践是从一个比较大的批次大小(10 秒左右)开始,不断使用更小的批次大小。

相似地,对于窗口操作,计算结果的间隔(也就是滑动步长)对于性能也有巨大的影响。 当计算代价巨大并成为系统瓶颈时,就应该考虑提高滑动步长了。

减少批处理所消耗时间的常见方式还有提高并行度

  • 增加接收器数。有时如果记录太多导致单台机器来不及读入并分发的话,接收器会成为系统瓶颈。这时你就需要通过创建多个输入 DStream(这样会创建多个接收器)来增加接收器数目,然后使用 union 来把数据合并为一个数据源。
  • 将收到的数据显式地重新分区。如果接收器数目无法再增加,你可以通过使用 DStream.repartition 来显式重新分区输入流(或者合并多个流得到的数据流)来重新分配收到的数据。
  • 提高聚合计算的并行度。对于像 reduceByKey() 这样的操作,你可以在第二个参数中指定并行度,我们在介绍 RDD 时提到过类似的手段。

4.5 高级解析

4.5.1 DStreamGraph 对象解析

在 Spark Streaming 中,DStreamGraph 是一个非常重要的组件,主要用来:

  • 通过成员 inputStreams 持有 Spark Streaming 输入源及接收数据的方式。
  • 通过成员 outputStreams 持有 Streaming app 的 output 操作,并记录 DStream 依赖关系
  • 生成每个 batch 对应的 jobs

DStreamGraph 实例的创建

源码上 StreamingContext 包含了 DStreamGraph 类型的成员 graph,当 new StreamingContext 的时候, DStreamGraph 会在 StreamingContext 主构造函数中被创建。若当前 checkpoint 可用,会优先从 checkpoint 恢复 graph,否则新建一个。还可以从这里知道的一点是:graph 是运行在 driver 上的

DStreamGraph 记录输入源及如何接收数据

DStreamGraph 类成员成员 inputStreams 为 InputDStream 类型的数组,InputDStream 是所有 input streams(数据输入流) 的虚基类。该类提供了 start()stop() 方法供 streaming 系统来开始和停止接收数据。那些只需要在 driver 端接收数据并转成 RDD 的 input streams 可以直接继承 InputDStream,例如 FileInputDStream 是 InputDStream 的子类,它监控一个 HDFS 目录并将新文件转成 RDDs。而那些需要在 workers 上运行 receiver 来接收数据的 InputDStream,需要继承 ReceiverInputDStream,比如 KafkaReceiver。

4. Streaming 应用解析 - 图30

从上面的调用流程图我们可以知道:

  • ssc.textFileStream 会触发新建一个 FileInputDStream。FileInputDStream 继承于 InputDStream,其 start() 方法定义了数据源及如何接收数据。
  • FileInputDStream 构造函数中,会调用 ssc.graph.addInputStream(this),将自身添加到 DStreamGraph 的 inputStreams: ArrayBuffer[InputDStream[_]] 中,这样 DStreamGraph 就知道了这个 Streaming App 的输入源及如何接收数据。可能你会奇怪为什么 inputStreams 是数组类型,举个例子,这里再来一个val lines1 = ssc.textFileStream(args(0)),那么又将生成一个 FileInputStream 实例添加到 inputStreams,所以这里需要集合类型
  • 生成 FileInputDStream 调用其 map 方法,将以 FileInputDStream 本身作为 partent 来构造新的 MappedDStream。对于 DStream 的 transform 操作,都将生成一个新的 DStream,和 RDD transform 生成新的 RDD 类似。
  • 与 MappedDStream 不同,所有继承了 InputDStream 的定义了输入源及接收数据方式的 sreams 都没有 parent,因为它们就是最初的 streams

DStream 的依赖链

每个 DStream 的子类都会继承 def dependencies: List[DStream[_]] = List() 方法,该方法用来返回自己的依赖的父 DStream 列表。比如没有父 DStream 的 InputDStream 的 dependencies 方法返回 List()。

构造函数参数列表中的 parent 即在 ssc.textFileStream 中 new 的定义了输入源及数据接收方式的最初的 FileInputDStream 实例,这里的 dependencies 方法将返回该 FileInputDStream 实例,这就构成了第一条依赖。可用如下图表示,这里特地将 input streams 用蓝色表示,以强调其与普通由 transform 产生的 DStream 的不同。每一个 transform 操作都将创建一个新的 DStreamMap 会创建 MappedDStreamflatMap 也会创建一个 FlatMappedDStream

MappedDStream 的实现如下:

  1. class MappedDStream[T: ClassTag, U: ClassTag] (
  2. parent: DStream[T],
  3. mapFunc: T => U
  4. ) extends DStream[U](parent.ssc) {
  5. //依赖的关键
  6. override def dependencies: List[DStream[_]] = List(parent)

4. Streaming 应用解析 - 图31

4. Streaming 应用解析 - 图32

4. Streaming 应用解析 - 图33

在 DStream 中,与 transofrm 相对应的是 output 操作,包括 print, saveAsTextFiles, foreachRDD,saveAsObjectFiles, saveAsHadoopFiles。output 操作中,会创建 ForEachDStream 实例并调用 register 方法将自身添加到 DStreamGraph.outputStreams 成员中,该 ForEachDStream 实例也会持有是调用哪个 output 操作。与 DStream transform 操作返回一个新的 DStream 不同,output 操作不会返回任何东西,只会创建一个 ForEachDStream 作为依赖链的终结

4. Streaming 应用解析 - 图34

4. Streaming 应用解析 - 图35

通过以上分析,我们总结一下:

  • DStream 逻辑上通过 transformation 来形成 DAG,但在物理上却是通过与 transformation 反向的依赖(dependency)来构成表示的。
  • 当某个节点调用了 output 操作时,就产生一个新的 ForEachDStream,这个新的 ForEachDStream 记录了具体的 output 操作是什么。
  • 在每个 batch 动态生成 RDD 实例时,就对新生成的 DStream 进行 BFS 遍历。由 output 操作新生成的 DStream 称为 output stream。

最后,我们给出:

  • Spark Streaming 记录整个 DStream DAG 的方式,就是通过一个 DStreamGraph 实例记录了到所有的 output stream 节点的引用。通过对所有 output stream 节点进行遍历,就可以得到所有上游依赖的 DStream,不能被遍历到的 DStream 节点 ——如g和h(如下图),虽然出现在了逻辑的 DAG 中,但是并不属于物理的 DStreamGraph,也将在 Spark Streaming 的实际运行过程中不产生任何作用。
  • DStreamGraph 实例同时也记录了到所有 input stream 节点的引用,DStreamGraph 时常需要遍历没有上游依赖的 DStream 节点 —— 称为 input stream—— 记录一下就可以避免每次为查找 input stream 而对 output steam 进行 BFS 的消耗。

4. Streaming 应用解析 - 图36

4. Streaming 应用解析 - 图37

4.5.2 ReceiverTracker 与数据导入

有容乃大,兼容众多数据源

InputDStream 是所有 input streams(数据输入流) 的虚基类,继承 ReceiverInputDStream 并定义相应的 receiver,就是 Spark Streaming 能兼容众多数据源的原因。

为每个 batch 的 RDD 提供输入数据

在 StreamingContext 中,有一个重要的组件叫做 ReceiverTracker,它是 Spark Streaming 作业调度器 JobScheduler 的成员,负责启动、管理各个 receiver 及管理各个 receiver 接收到的数据。

确定 receiver 要分发到哪些 executors 上执行

(1)创建 ReceiverTracker 实例

StreamingContext#start() 会调用 JobScheduler#start() 方法,在 JobScheduler#start() 中,会创建一个新的 ReceiverTracker 实例 receiverTracker,并调用其 start() 方法。

4. Streaming 应用解析 - 图38

(2)ReceiverTracker#start()

继续跟进 ReceiverTracker#start(),它主要做了:初始化一个 endpoint: ReceiverTrackerEndpoint,用来接收和处理来自 ReceiverTracker 和 receivers 发送的消息;调用 launchTasks 来自将各个 receivers 分发到 executors 上。

4. Streaming 应用解析 - 图39

(3)ReceiverTracker#launchTasks()

继续跟进 launchTasks,它也主要做了:获取 DStreamGraph.inputStreams 中继承了 ReceiverInputDStream 的 input streams 的 receivers,即数据接收器;给消息接收处理器 endpoint 发送 StartAllReceivers
(receivers)消息,直接返回,不等待消息被处理。

4. Streaming 应用解析 - 图40

(4)处理 StartAllReceivers 消息

endpoint 在接收到消息后,会先判断消息类型,对不同的消息做不同处理。对于 StartAllReceivers 消息,处理流程如下:计算每个 receiver 要分发的目的 executors。遵循两条原则:将 receiver 分布的尽量均匀。如果 receiver 的 preferredLocation 本身不均匀,以 preferredLocation 为准。遍历每个 receiver,根据第1步中得到的目的 executors 调用 startReceiver 方法。到这里,已经确定了每个 receiver 要分发到哪些 executors 上。

4. Streaming 应用解析 - 图41

(5)启动 receivers

接上,通过 ReceiverTracker#startReceiver(receiver: Receiver[_],scheduledExecutors: Seq[String])启动 receivers。分发和启动 receiver 的方式不可谓不精彩。其中 startReceiverFunc 函数主要实现如下:

  1. val supervisor = new ReceiverSupervisorImpl(
  2. receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
  3. supervisor.start()
  4. supervisor.awaitTermination()

supervisor.start() 中会调用 receiver#onStart 后立即返回。receiver#onStart 一般自行新建线程或线程池来接收数据,比如在 KafkaReceiver 中,就新建了线程池,在线程池中接收 topics 的数据 supervisor.start() 返回后,由 supervisor.awaitTermination() 阻塞住线程,以让这个 task 一直不退出,从而可以源源不断接收数据。

4. Streaming 应用解析 - 图42

数据流转

(1)Receiver -> ReceiverSupervisor

Receiver 将接收到的数据源源不断地传给 ReceiverSupervisor。Receiver 调用其 store(...) 方法,store 方法中继续调用 supervisor.pushSinglesupervisor.pushArrayBuffer 等方法来传递数据。

Receiver#store 有多重形式, ReceiverSupervisor 也有不同的 store 方法:

  • pushSingle:对应单条小数据。
  • pushArrayBuffer:对应数组形式的数据。
  • pushIterator:对应 iterator 形式数据。
  • pushBytes:对应 ByteBuffer 形式的块数据。

对于细小的数据,存储时需要 BlockGenerator 聚集多条数据成一块,然后再成块存储;反之就不用聚集,直接成块存储。

(2)ReceiverSupervisor -> BlockManager -> disk/memory

主要将从 receiver 收到的数据以 block(数据块)的形式存储。存储 block 的是 receivedBlockHandler: ReceivedBlockHandler,根据参数 spark.streaming.receiver.writeAheadLog.enable 配置的不同,默认为 false,receivedBlockHandler 对象对应的类也不同。启动 WAL 的好处就是在 application 挂掉之后,可以恢复数据。

(3)ReceiverSupervisor -> ReceiverTracker

将 block 存储之后,获得 block 描述信息 blockInfo: ReceivedBlockInfo,这里面包含:streamId、数据位置、数据条数、数据 size 等信息。之后,封装以 block 作为参数的 AddBlock(blockInfo) 消息并发送给 ReceiverTracker 以通知其有新增 block 数据块。

(4)ReceiverTracker -> ReceivedBlockTracker

ReceiverTracker 收到 ReceiverSupervisor 发来的 AddBlock(blockInfo) 消息后,直接调用 addBlock 把 block 信息传给 ReceivedBlockTracker。

4. Streaming 应用解析 - 图43

4.5.3 动态生成 job

JobScheduler 有两个重要成员,一是上文介绍的 ReceiverTracker,负责分发 receivers 及源源不断地接收数据;二是本文将要介绍的 JobGenerator,负责定时的生成 jobs 并 checkpoint。

定时逻辑

在 JobScheduler 的主构造函数中,会创建 JobGenerator 对象。在 JobGenerator 的主构造函数中,会创建一个定时器RecurringTimer()。该定时器每隔 ssc.graph.batchDuration.milliseconds 会执行一次 eventLoop.post(GenerateJobs(new Time(longTime))) 向 eventLoop 发送 GenerateJobs(new Time(longTime)) 消息,eventLoop 收到消息后会进行这个 batch 对应的 jobs 的生成及提交执行,eventLoop 是一个消息接收处理器。需要注意的是,timer 在创建后不会马上启动,将在 StreamingContext#start() 启动 Streaming Application 时间接调用到 timer.start(restartTime.milliseconds) 才启动。

为 batch 生成 jobs

eventLoop 在接收到 GenerateJobs(new Time(longTime)) 消息后的主要处理流程有三步:首先,将已接收到的 blocks 分配给 batch;其次,生成该 batch 对应的 jobs;最后,将 jobs 封装成 JobSet 并提交执行

4. Streaming 应用解析 - 图44

(1)将已接受到的 blocks 分配给 batch

各个 ReceiverInputDStream 对应的 receivers 接收并保存的 blocks 信息会保存在 ReceivedBlockTracker#streamIdToUnallocatedBlockQueues,该成员 key 为 streamId,value 为该 streamId 对应的 InputDStream 已接收保存但尚未分配的 blocks 信息。所以获取某 InputDStream 未分配的 blocks 只要以该 InputDStream 的 streamId 来从 streamIdToUnallocatedBlockQueues 来 get 就好。获取之后,会清楚该 streamId 对应的value,以保证 block 不会被重复分配。在实际调用中,为 batchTime 分配 blocks时,会从streamIdToUnallocatedBlockQueues取出未分配的blocks塞进 timeToAllocatedBlocks: mutable.HashMap[Time, AllocatedBlocks] 中,以在之后作为该 batchTime 对应的 RDD 的输入数据。通过以上步骤,就可以为 batch 的所有 InputDStream 分配 blocks。也就是为 batch 分配了 blocks。

4. Streaming 应用解析 - 图45

(2)生成该 batch 对应的 jobs

DStreamGraph#generateJobs(time: Time) 中,对于 DStreamGraph 成员 ArrayBuffer[DStream[_]] 的每一项,调用 DStream#generateJob(time: Time) 来生成这个 outputStream 在该 batchTime 的 job。返回 Seq[Job],而不是单个 job。这是因为,在一个 batch 内,可能会有多个 OutputStream 执行了多次 output 操作,每次 output 操作都将产生一个 Job,最终就会产生多个 Jobs。该生成过程主要有三步:首先,获取该 outputStream 在该 batchTime 对应的 RDD;其次,根据第一步中得到的 RDD 生成最终 job 要执行的函数 jobFunc;最后,根据 Step2 中得到的 jobFunc 生成最终要执行的 Job 并返回。

4. Streaming 应用解析 - 图46

4.5.4 job 的提交与执行

在 JobScheduler 生成某个 batch 对应的 Seq[Job] 之后,会将 batch Seq[Job] 封装成一个 JobSet 对象,JobSet 持有某个 batch 内所有的 jobs,并记录各个 job 的运行状态。之后,调用 JobScheduler#submitJobSet(jobSet: JobSet) 来提交 jobs,在该函数中,除了一些状态更新,主要任务就是执行 jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))),即对于jobSet中的每一个 job,执行 jobExecutor.execute(newJobHandler(job)),要搞懂这行代码干了什么需了解 JobHandler jobExecutor

JobHandler

JobHandler 继承了 Runnable,为了说明与 job 的关系,其精简后的实现如下:

  1. private class JobHandler(job: Job) extends Runnable with Logging {
  2. import JobScheduler._
  3. def run() {
  4. _eventLoop.post(JobStarted(job))
  5. PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
  6. job.run()
  7. }
  8. _eventLoop = eventLoop
  9. if (_eventLoop != null) {
  10. _eventLoop.post(JobCompleted(job))
  11. }
  12. }
  13. }

JobHandler#run 方法主要执行了 job.run(),该方法最终将调用到-动态生成 job 中的 jobFunc,jonFunc 将提交对应 RDD DAG 定义的 job。

JobExecutor

知道了 JobHandler 是用来执行 job 的,那么 JobHandler 将在哪里执行 job 呢?答案是 jobExecutor,jobExecutor 为 JobScheduler 成员,是一个线程池,在 JobScheduler 主构造函数中创建,如下:

  1. private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
  2. private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")

JobHandler 将最终在线程池 jobExecutor 的线程中被调用,jobExecutor 的线程数可通过 spark.streaming.concurrentJobs 配置,默认为1。若配置多个线程,就能让多个 job 同时运行,若只有一个线程,那么同一时刻只能有一个 job 运行。以上,即 jobs 被执行的逻辑。

4.5.5 Block 的生成与存储

ReceiverSupervisorImpl 共提供了4个将从 receiver 传递过来的数据转换成 block 并存储的方法,分别是:

  • pushSingle: 处理单条数据。
  • pushArrayBuffer: 处理数组形式数据。
  • pushIterator: 处理 iterator 形式处理。
  • pushBytes: 处理 ByteBuffer 形式数据。

其中,pushArrayBuffer、pushIterator、pushBytes 最终调用 pushAndReportBlock;而 pushSingle 将调用 defaultBlockGenerator.addData(data)

pushAndReportBlock

首先获取一个新的 blockId,之后调用 receivedBlockHandler.storeBlock。receivedBlockHandler 在 ReceiverSupervisorImpl 构造函数中初始化。当启用了 checkpoint 且 spark.streaming.receiver.writeAheadLog.enable 为真,receivedBlockHandler 被初始化为 WriteAheadLogBasedBlockHandler 类型;否则将初始化为 BlockManagerBasedBlockHandler 类型。WriteAheadLogBasedBlockHandler#storeBlock 将 ArrayBuffer, iterator, bytes 类型的数据序列化后得到的 serializedBlock 首先交由 BlockManager 根据设置的 StorageLevel 存入 executor 的内存或磁盘中,然后通过 WAL 再存储一份。

BlockManagerBasedBlockHandler#storeBlock 将 ArrayBuffer, iterator, bytes 类型的数据交由 BlockManager 根据设置的 StorageLevel 存入 executor 的内存或磁盘中,并不再通过 WAL 存储一份。

pushSingle

pushSingle 将调用 BlockGenerator#addData(data: Any) 通过积攒的方式来存储数据。对 BlockGenerator 是如何积攒一条一条数据最后写入 block 的逻辑进行分析。

4. Streaming 应用解析 - 图47

说明:

  • currentBuffer:变长数组,当 receiver 接收的一条一条的数据将会添加到该变长数组的尾部。
  • blockIntervalTimer & blockIntervalMs:分别是定时器和时间间隔。blockIntervalTimer 中有一个线程,每隔 blockIntervalMs 会执行操作:将 currentBuffer 赋值给 newBlockBuffer;将 currentBuffer 指向新的空的 ArrayBuffer 对象;将 newBlockBuffer 封装成 newBlock;将 newBlock 添加到 blocksForPushing 队列中 blockIntervalMs 由 spark.streaming.blockInterval 控制,默认是 200ms。
  • blockPushingThread & blocksForPushing & blockQueueSize:blocksForPushing 是一个定长数组,长度由 blockQueueSize 决定,默认为10,可通过 spark.streaming.blockQueueSize 改变。

更多高级解析关注:https://www.jianshu.com/u/001d44710e2e