Spark Streaming

第一:Spark Streaming基本原理

Spark Streaming的核心是一种可扩展、容错的数据流系统,它采用RDD批量模式(即批量处理数据)并加快处理速度。Spark Streaming可以以小批量或批次间隔(从500毫秒到更大的的间隔窗口)运行。

10-Spark-Streaming - 图1如上图所示,Spark Streaming接收输入数据流,并在内部将数据流分为多个较小的batch(batch大小取决于batch的间隔)。Spark引擎将这些输入数据的batch处理后,生成处理过数据的batch结果集。

Spark Streaming的主要抽象是离散流(DStream),它代表了前面提到的构成数据流的那些小批量。DSteam建立在RDD之上,允许Spark开发人员在RDD和batch的相同上下文中工作,现在只将其应用于一系列流问题当中。另外一个重要的方面是,由于你使用的是Apache Spark,Spark Streaming与ML、SQL、DataFrame和GraphFrames都做了集成。

10-Spark-Streaming - 图2Spark Streaming的基本组件:
Spark Streaming是Spark API核心的扩展,支持可扩展,高吞吐量,实时数据流的容错流处理。 数据可以从Kafka,Flume,Kinesis或TCP套接字等许多来源获取,并且可以使用复杂的算法进行处理,这些算法用map,reduce,join和window等高级函数表示。 最后,处理的数据可以推送到文件系统,数据库和实时仪表板。 事实上,您可以将Spark的机器学习和图形处理算法应用于数据流。

其中,Kafaka+Spark Streaming最常用。
Apache Kafka将发布 - 订阅消息传递重新视为分布式的,分区的,复制的提交日志服务。 在使用Spark开始集成之前,请仔细阅读Kafka文档。
Kafka项目在0.8和0.10版本之间引入了一个新的 consumer API,因此有两个独立的相应的Spark Streaming软件包可用。 请为您的brokers和desired features 选择正确的版本。 注意0.8集成与后来的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容。
Kafka 0.8版本

第二:为什么需要Spark Streaming?

随着在线交易和社交媒体以及传感器和设备的普及,很多公司正在以更快的速度产生和处理更多的数据。开发有规模的,实时的可实现的可预测的能力,为这些企业提供了竞争优势,流分析在数据科学家和数据工程师的工具箱中变得日益重要。

Spark Streaming正在被迅速采用,原因在于Apache Spark在同一框架内统一了所有这些不同的数据处理范例(ML的机器学习,Spark SQL和Streaming)。用户可以从培训机器学习模型(ML)到使用模型(Streaming)评测模型,并使用BI工具(SQL)执行分析与可视化展示,所有这些都在同一框架内完成。

Spark Streaming目前的应用场景!
(1)流ETL:将数据推入下游系统之前对其进行持续的清洗和聚合,可以减少最终数据存储中的数据量。
(2)触发器(Triggers):实时检测行为或异常事件,及时触发下游动作。
(3)数据浓缩:实时数据与其他数据集连接,可以进行更为丰富的分析。
(4)复杂会话和持续学习。

第三:Spark Streaming应用程序数据流是什么

10-Spark-Streaming - 图3如上图所示,提供了Spark Driver/Workers/Streaming源与目标间的数据流:Spark Streaming Context的ssc.start()是入口点。
目前,Spark Streaming有很多应用程序需要不断优化和配置。 Spark Streaming的文档在Scala中更完整,所以,因为您正在使用Python API,您可能有时需要请参考文档的Scala版本。

第四: 使用DStream简化Streaming应用程序

Spark Streaming的基本原理是将实时输入数据流以时间片(通常在0.5~2秒之间)为单位进行拆分,然后采用Spark引擎以类似批处理的方式处理每个时间片数据。

Spark Streaming最主要的抽象是离散化数据流(Discretized Stream,DStream),表示连续不断的数据流。在内部实现上,Spark Streaming的输入数据按照时间片(如1秒)分成一段一段,每一段数据转换为Spark中的RDD,并且对DStream的操作都最终被转变为对相应的RDD的操作。例如,如图7-10所示,在进行单词的词频统计时,一个又一个句子会像流水一样源源不断到达,Spark Streaming会把数据流切分成一段一段,每段形成一个RDD,即RDD @ time1、RDD @ time 2、RDD@ time 3和RDD @ time 4等,每个RDD里面都包含了一些句子,这些RDD就构成了一个DStream (名称为lines)。对这个DStream执行flatMap操作时,实际上会被转换成针对每个RDD的flatMap操作,转换得到的每个新的RDD中都包含了一些单词,这些新的RDD(即RDD @ result 1、RDD @result 2、RDD @ result 3、RDD @ result 4等)又构成了一个新的DStream(名称为words)。整个流式计算可根据业务的需求对这些中间的结果进一步处理,或者存储到外部设备中。
image.png

4.1 DStream操作概述

Spark Streaming工作机制

如图7-13所示,在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的任务(Task)运行在一个Executor上,每个Receiver都会负责一个DStream输入流(如从文件中读取数据的文件流、套接字流或者从Kafka中读取的一个输入流等)。Receiver组件接收到数据源发来的数据后,会提交给Spark Streaming程序进行处理。处理后的结果,可以交给可视化组件进行可视化展示,也可以写入到HDFS、HBase中。

image.png

编写Spark Streaming程序的基本步骤

编写Spark Streaming程序的基本步骤如下:

• 通过创建输入 DStream(Input Dstream)来定义输入源。流计算处理的数据对象是来自输入源的数据,这些输入源会源源不断产生数据,并发送给Spark Streaming,由Receiver组件接收到以后,交给用户自定义的Spark Streaming程序进行处理;

• 通过对DStream应用转换操作和输出操作来定义流计算。流计算过程通常是由用户自定义实现的,需要调用各种DStream操作实现用户处理逻辑;

• 调用StreamingContext对象的start()方法来开始接收数据和处理流程;

• 通过调用StreamingContext对象的awaitTermination()方法来等待流计算进程结束,或者可以通过调用StreamingContext对象的stop()方法来手动结束流计算进程。

创建StreamingContext对象

在RDD编程中需要生成一个SparkContext对象,在Spark SQL编程中需要生成一个SparkSession对象,同理,如果要运行一个Spark Streaming程序,就需要首先生成一个StreamingContext对象,它是Spark Streaming程序的主入口。

可以从一个SparkConf对象创建一个StreamingContext对象。登录Linux系统后,启动spark-shell。进入spark-shell以后,就已经获得了一个默认的SparkConext对象,也就是sc。因此,可以采用如下方式来创建StreamingContext对象:

  1. scala> import org.apache.spark.streaming._
  2. scala> val ssc = new StreamingContext(sc, Seconds(1))

new StreamingContext(sc, Seconds(1))的两个参数中,sc表示SparkContext对象,Seconds(1)表示在对 Spark Streaming 的数据流进行分段时,每1秒切成一个分段。可以调整分段大小,比如使用Seconds(5)就表示每5秒切成一个分段。但是,该系统无法实现毫秒级别的分段,因此,Spark Streaming无法实现毫秒级别的流计算。

如果是编写一个独立的Spark Streaming程序,而不是在spark-shell中运行,则需要在代码文件中通过如下方式创建StreamingContext对象:

maven:

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <spark.version>2.2.1</spark.version>
  4. <maven.compiler.source>1.8</maven.compiler.source>
  5. <maven.compiler.target>1.8</maven.compiler.target>
  6. </properties>
  7. <dependencies>
  8. <dependency>
  9. <groupId>junit</groupId>
  10. <artifactId>junit</artifactId>
  11. <version>4.11</version>
  12. <scope>test</scope>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.spark</groupId>
  16. <artifactId>spark-core_2.11</artifactId>
  17. <version>${spark.version}</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.spark</groupId>
  21. <artifactId>spark-streaming_2.11</artifactId>
  22. <version>${spark.version}</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.apache.spark</groupId>
  26. <artifactId>spark-streaming-kafka_2.11</artifactId>
  27. <version>1.6.2</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.scala-lang</groupId>
  31. <artifactId>scala-library</artifactId>
  32. <version>2.11.8</version>
  33. </dependency>
  34. </dependencies>
  1. package com.hx.test
  2. import org.apache.spark._
  3. import org.apache.spark.streaming._
  4. /**
  5. * fileName : Test11StreamingWordCount
  6. * Created by 970655147 on 2016-02-12 13:21.
  7. */
  8. object Test11StreamingWordCount {
  9. // 基于sparkStreaming的wordCount
  10. // 环境windows7 + spark1.2 + jdk1.7 + scala2.10.4
  11. // 1\. 启动netcat [nc -l -p 9999]
  12. // 2\. 启动当前程序
  13. // 3\. netcat命令行中输入数据
  14. // 4\. 回到console, 查看结果[10s 之内]
  15. // *******************************************
  16. // 每一个print() 打印一次
  17. // -------------------------------------------
  18. // Time: 1455278620000 ms
  19. // -------------------------------------------
  20. // Another Infomation !
  21. // *******************************************
  22. // inputText : sdf sdf lkj lkj lkj lkj
  23. // MappedRDD[23] at count at Test11StreamingWordCount.scala:39
  24. // 2
  25. // (sdf,2), (lkj,4)
  26. def main(args :Array[String]) = {
  27. // Create a StreamingContext with a local master
  28. // Spark Streaming needs at least two working thread
  29. // val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]")
  30. // val sc = new StreamingContext(conf, Seconds(1))
  31. val sc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10) )
  32. // Create a DStream that will connect to serverIP:serverPort, like localhost:9999
  33. val lines = sc.socketTextStream("192.168.47.141", 9999)
  34. // Split each line into words
  35. // 以空格把收到的每一行数据分割成单词
  36. val words = lines.flatMap(_.split(" "))
  37. // 在本批次内计单词的数目
  38. val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
  39. // 打印每个RDD中的前10个元素到控制台
  40. wordCounts.print()
  41. sc.start() // 开始计算
  42. sc.awaitTermination() // 等待计算结束
  43. }
  44. }

4.2 基本输入源

文件流

文件流在文件流的应用场景中,需要编写Spark Streaming程序,一直对文件系统中的某个目录进行监听,一旦发现有新的文件生成,Spark Streaming 就会自动把文件内容读取过来,使用用户自定义的处理逻辑进行处理
image.png


套接字流

Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。

  • Socket工作原理

在网络编程中,大量的数据交换都是通过Socket实现的。Socket工作原理和日常生活的电话交流非常类似。在日常生活中,用户A要打电话给用户B,首先,用户A拨号,用户B听到电话铃声后提起电话,这时A和B就建立起了连接,二者之间就可以通话了。等交流结束以后,挂断电话结束此次交谈。Socket工作过程也是类似的,即“open(拨电话)—write/read(交谈)—close(挂电话)”模式。如图7-14所示,服务器端先初始化Socket,然后与端口绑定(Bind),对端口进行监听(Listen),调用accept()方法进入阻塞状态,等待客户端连接。客户端初始化一个Socket,然后连接服务器(Connect),如果连接成功,这时客户端与服务器端的连接就建立了。客户端发送数据请求,服务器端接收请求并处理请求,然后把回应数据发送给客户端,客户端读取数据,最后关闭连接,一次交互结束。

image.png

  • 2.使用套接字流作为数据源

在套接字流作为数据源的应用场景中,Spark Streaming程序就是图7-14所示的Socket通信的客户端,它通过Socket方式请求数据,获取数据以后启动流计算过程进行处理。下面编写一个Spark Streaming独立应用程序来实现这个应用场景。执行上面命令以后,就在当前的Linux终端(即“流计算终端”)内顺利启动了Socket客户端。现在,再打开一个新的Linux终端(这里称为“数据源终端”),启动一个Socket服务器端,让该服务器端接收客户端的请求,并给客户端不断发送数据流。通常,Linux发行版中都带有NetCat(简称nc),可以使用如下nc命令生成一个Socket服务器端:$ nc -lk 9999在上面的nc命令中,-l这个参数表示启动监听模式,也就是作为Socket服务器端,nc会监听本机(localhost)的9999号端口,只要监听到来自客户端的连接请求,就会与客户端建立连接通道,把数据发送给客户端;-k参数表示多次监听,而不是只监听1次。

由于之前已经在“流计算终端”内运行了NetworkWordCount程序,该程序扮演了Socket客户端的角色,会向本地(localhost)主机的9999号端口发起连接请求,所以,“数据源终端”内的nc程序就会监听到本地(localhost)主机的9999号端口有来自客户端(NetworkWordCount程序)的连接请求,于是就会建立服务器端(nc程序)和客户端(NetworkWordCount程序)之间的连接通道。连接通道建立以后,nc 程序就会把我们在“数据源终端”内手动输入的内容,全部发送给“流计算终端”内的NetworkWordCount程序进行处理。为了测试程序运行效果,在“数据源终端”内执行上面的nc命令后,可以通过键盘输入一行英文句子后按Enter键,反复多次输入英文句子并按Enter键,nc程序会自动把一行又一行的英文句子不断发送给“流计算终端”的NetworkWordCount程序。在“流计算终端”内,NetworkWordCount程序会不断接收到nc发来的数据,每隔1秒就会执行词频统计,并打印出词频统计信息

4.3 高级数据源

Kafka

Spark Streaming是用来进行流计算的组件,可以把Kafka(或Flume)作为数据源,让Kafka(或Flume)产生数据发送给Spark Streaming应用程序,Spark Streaming应用程序再对接收到的数据进行实时处理,从而完成一个典型的流计算过程。这里仅以Kafka为例进行介绍,Spark和Flume的组合使用也是类似的

Kafka是一种高吞吐量的分布式发布订阅消息系统,为了更好地理解和使用Kafka,这里介绍一下Kafka的相关概念:
• Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。
• Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic,即可生产或消费数据,而不必关心数据存于何处。
• Partition:是物理上的概念,每个Topic包含一个或多个Partition。
• Producer:负责发布消息到Kafka Broker。
• Consumer:消息消费者,向Kafka Broker读取消息的客户端。
• Consumer Group:每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定Group Name,若不指定Group Name,则属于默认的Group。

为了让Spark Streaming应用程序能够顺利使用Kafka数据源,在下载Kafka安装文件的时候要注意,Kafka版本号一定要和自己电脑上已经安装的Scala版本号一致才可以。本教材安装的Spark版本号是2.1.0,Scala版本号是2.11,所以,一定要选择Kafka版本号是2.11开头的。例如,到Kafka官网中,可以下载安装文件kafka_2.11-0.10.2.0,前面的2.11就是支持的Scala版本号,后面的0.10.2.0是Kafka自身的版本号。

  • 添加相关jar包

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件)。已经安装好的Spark版本,这些jar包都不在里面。下载spark-streaming-kafka-0-8_2.11-2.1.0.jar文件。现在,需要把这个文件复制到Spark目录的jars目录下。

用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。

基于Receiver的方式

这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据。如下图:

image.png

直接Direct读取方式

image.pngimage.png
image.png

offset存储问题

2.1 背景
spark streaming + kafka 是很常见的分布式流处理组合。spark streaming 消费kafka有两种方式:Receiver DStream方式和Direct DStream的方式,前者不需要自己管理offset,但是比较迂回会产生大量数据冗余到hdfs,后者比较直接,但是如果程序出现问题导致需要重启,那么offset的存储就可以避免重复消费或者丢失数据,所以Direct DStream的offset 需要自己管理。

2.2 方式
官网文档上我们可以看到三种存储offset的方式。

2.2.1 Checkpoints
这种方式使用简单,直接利用spark streaming的checkpoint机制把offset存到hdfs里,但是比较坑的是,除了offset,还有spark streaming的很多字节数据都备份了,所以修改了代码就不能再从checkpoint启动了。

2.2.2 Kafka itself
kafka会自动将offset存到另一个topic,注意 enable.auto.commit 要设置为false,不然默认是定期自动存储,会出问题。想用这种方式的话官网文档有示例。

2.2.3 Your own data store .
第三种方式是自己DIY,可以存到mysql、hbase、zookeeper等等。
之前用过存储到hbase,目前组内好像spark和hbase不是通的。
offset存储到zk的方式,网上也有很多可用的代码,但是对于spark2.x和kafka 10的组合,并没有找到可用的代码,相比于旧版本,api的更改还是很多的。

2.3 offset存储到zookeeper
版本:spark-streaming-kafka-0-10_2.11-2.2.0.jar 和 kafka_2.11-0.10.0.1.jar
调用示例:
createDStream(
ssc,
“test0418”,
“test_spark_streaming_group”,
“localhost:2181”,
“localhost:9092”,
kafkaParams
)

  1. /**
  2. * 创建DStream,会读取zk上存储的offset,没有的话就直接创建一个
  3. * @param ssc
  4. * @param topics
  5. * @param group
  6. * @param ZKservice
  7. * @param broker
  8. * @param kafkaParams
  9. * @return
  10. */
  11. def createDStream(ssc:StreamingContext, topics:Set[String], group:String, ZKservice:String,broker:String, kafkaParams:Map[String, Object]):InputDStream[ConsumerRecord[String, String]] ={
  12. val zkClient = new ZkClient(ZKservice)
  13. var kafkaStream : InputDStream[ConsumerRecord[String, String]] = null
  14. var fromOffsets: Map[TopicPartition, Long] = Map()
  15. for(topic <-topics){
  16. //创建一个 ZKGroupTopicDirs 对象
  17. val topicDirs = new ZKGroupTopicDirs(group, topic)
  18. // 获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_name
  19. val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
  20. //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)
  21. val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")
  22. //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
  23. val leaders = getLeaders(topic,broker)
  24. if (children > 0) {
  25. //如果保存过 offset,这里更好的做法,还应该和 kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误
  26. for (i <- 0 until children) {
  27. val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
  28. val tp = new TopicPartition(topic, i)
  29. val tap = TopicAndPartition(topic, i)
  30. //比较最早的offset
  31. val requestMin = OffsetRequest(Map(tap -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
  32. val consumerMin = new SimpleConsumer(leaders.get(i).get, 9092, 10000, 10000, "getMinOffset")
  33. val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tap).offsets
  34. var nextOffset = partitionOffset.toLong
  35. if (curOffsets.length > 0 && nextOffset < curOffsets.head) { // 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择
  36. nextOffset = curOffsets.head
  37. }
  38. fromOffsets += (tp -> nextOffset)
  39. // println("@@@@@@ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")
  40. }
  41. }
  42. }
  43. if(fromOffsets.isEmpty){
  44. //没有存储过,第一次创建
  45. kafkaStream = KafkaUtils.createDirectStream[String, String](
  46. ssc,
  47. PreferConsistent,
  48. Subscribe[String, String](topics, kafkaParams)
  49. )
  50. }else{
  51. kafkaStream = KafkaUtils.createDirectStream[String, String](
  52. ssc,
  53. PreferConsistent,
  54. Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
  55. )
  56. }
  57. kafkaStream
  58. }
  59. /**
  60. * 处理完成后将from offset写入zk
  61. * 注意是from offset 不是 until offset
  62. * 为了保证数据不丢失,重启时可能有一个betch数据重复提交
  63. * @param group
  64. * @param ZKservice
  65. * @param kafkaStream
  66. */
  67. def updateZKOffsets(group:String, ZKservice:String, kafkaStream:InputDStream[ConsumerRecord[String, String]]): Unit = {
  68. val zkClient = new ZkClient(ZKservice)
  69. var offsetRanges = Array[OffsetRange]()
  70. kafkaStream.transform{ rdd =>
  71. //得到该 rdd 对应 kafka 的消息的 offset
  72. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  73. rdd
  74. }.map(msg => msg.value()).foreachRDD { rdd =>
  75. for (o <- offsetRanges) {
  76. //不同的topic对应不同的目录
  77. val topicDirs = new ZKGroupTopicDirs(group, o.topic)
  78. val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
  79. //将该 partition 的 offset 保存到 zookeeper
  80. new ZkUtils(zkClient,new ZkConnection(ZKservice),false).updatePersistentPath(zkPath, o.fromOffset.toString)
  81. // println(s"@@@@@@ topic ${o.topic} partition ${o.partition} fromoffset ${o.fromOffset} untiloffset ${o.untilOffset} #######")
  82. }
  83. }
  84. }
  85. /**
  86. * 根据任意一台kafka机器ip获取 topich的partition和机器的对应关系
  87. * @param topic_name
  88. * @param broker
  89. * @return
  90. */
  91. def getLeaders(topic_name:String,broker:String): Map[Int,String] ={
  92. val topic2 = List(topic_name)
  93. val req = new TopicMetadataRequest(topic2, 0)
  94. val getLeaderConsumer = new SimpleConsumer(broker, 9092, 10000, 10000, "OffsetLookup")
  95. val res = getLeaderConsumer.send(req)
  96. val topicMetaOption = res.topicsMetadata.headOption
  97. // 将结果转化为 partition -> leader 的映射关系
  98. val partitions = topicMetaOption match {
  99. case Some(tm) =>
  100. tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]
  101. case None =>
  102. Map[Int, String]()
  103. }
  104. partitions
  105. }

测试代码如下:

  1. var ssc = new StreamingContext(sc, Seconds(30))
  2. val servers = "broker1.test-bus-kafka.data.m.com:9092,broker2.test-bus-kafka.data.m.com:9092,broker3.test-bus-kafka.data.m.com:9092"
  3. val topic = "antispam.test.ly,antispam.test.ly2"
  4. val path = "/home/hadoopuser/ly/"
  5. val broker = "broker1.test-bus-kafka.data.m.com"
  6. val ZKservice = "zk1.test-inf-zk.data.m.com:2181,zk2.test-inf-zk.data.m.com:2181,zk3.test-inf-zk.data.m.com:2181"
  7. val group = "test_spark_streaming_group"
  8. val kafkaParams = Map[String, Object](
  9. "bootstrap.servers" -> servers,
  10. "key.deserializer" -> classOf[StringDeserializer],
  11. "value.deserializer" -> classOf[StringDeserializer],
  12. "group.id" -> "test",
  13. "auto.offset.reset" -> "earliest"
  14. )
  15. val topics = topic.split(",").toSet
  16. // val topic = "antispam.test.ly2"
  17. val DStream = createDStream(ssc,topics,group,ZKservice,broker,kafkaParams)
  18. DStream.foreachRDD { recored =>
  19. recored.saveAsTextFile(path)
  20. recored.foreachPartition(
  21. message => {
  22. while (message.hasNext) {
  23. println(s"@^_^@ [" + message.next() + "] @^_^@")
  24. }
  25. }
  26. )
  27. }
  28. updateZKOffsets(group,ZKservice,DStream)
  29. ssc.start()

调优

Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置。

  • 3.1合理的批处理时间(batchDuration)

几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。

如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。

一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。在平时的应用中,根据不同的应用场景和硬件配置。

  • 3.2合理的Kafka拉取量(maxRatePerPartition重要)

对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的。 配置参数为:
spark.streaming.kafka.maxRatePerPartition。
这个参数默认是没有上线的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量。

  • 3.3缓存反复使用的Dstream(RDD)

Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache(),将该数据流缓存起来,防止过度的调度资源造成的网络开销

  • 3.4 设置合理的CPU资源数

CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高.

  • 3.5 设置合理的parallelism

在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

  • 3.6 使用高性能的算子
  • 3.7 使用Kryo优化序列化性能

———————————————————————————————————————————————————————————-
———————————————————————————————————————————————————————————-
———————————————————————————————————————————————————————————-

  • 使用Python的Spark Streaming来创建一个简单的单词计数例子
  • 使用DStream————由众多小批次数据组成的离散数据流

模拟方法

To run this example:
使用Linux中bash终端将多个单词发送到我们计算机的本地端口(9999),请注意,在终端1中输入单词
Terminal 1: nc -lk 9999
运行Spark Streaming来接收这些文字,并对他们进行计数
Terminal 2: ./bin/spark-submit streaming_word_count.py localhost 9999

实施

打开两个终端,一个用于nc命令,另一个用于Spark Streaming程序
nc -lk 9999 从这个点开始,你在终端所输入的一切将被传送到9999端口。

在另一终端屏幕运行以下streaming_word_count.py脚本

  1. # streaming_word_count.py 如下
  2. # 导入必要的类并创建一个本地的SparkContext和StreamingContexts,StreamingContexts是Spark Streaming的入口点
  3. from pyspark import SparkContext
  4. from pyspark.streaming import StreamingContext
  5. # 用两个工作线程创建Spark Context(注意:`local [2]`)
  6. sc = SparkContext("local[2]", "NetworkWordCount")
  7. # 创建1秒的本地StreamingContextwith批处理间隔,1是批间隔,每秒运行微批次。
  8. ssc = StreamingContext(sc, 1)
  9. # 创建DStream,将连接到连接到localhost:9999的输入行的流,
  10. lines = ssc.socketTextStream("localhost", 9999)
  11. # 统计单词
  12. words = lines.flatMap(lambda line: line.split(" "))
  13. pairs = words.map(lambda word: (word, 1))
  14. wordCounts = pairs.reduceByKey(lambda x, y: x + y)
  15. # 将此DStream中生成的每个RDD的前十个元素打印到控制台
  16. wordCounts.pprint()
  17. # 启动Spark Streaming开始计算,然后等待终止命令来停止运行(CTRL+C),如果没有等到停止命令,Spark Streaming程序将继续运行。
  18. ssc.start()
  19. # 等待计算结束
  20. ssc.awaitTermination()

全局实施

  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. sc = SparkContext("local[2]", "StatefulNetworkWordCount")
  4. ssc = StreamingContext(sc, 1)
  5. # 给Saprk Streaming配置了一个检查点(checkpoint)
  6. ssc.checkpoint("checkpoint")
  7. def updateFunc(new_values, last_sum):
  8. return sum(new_values) + (last_sum or 0)
  9. lines = ssc.socketTextStream("localhost", 9999)
  10. running_counts = lines.flatMap(lambda line: line.split(" "))\
  11. .map(lambda word: (word, 1))\
  12. .updateStateByKey(updateFunc)
  13. running_counts.pprint()
  14. ssc.start()
  15. ssc.awaitTermination()

第五:结构化流

利用DataFrame引入结构化流,简化代码。

  1. # Structured Streaming Word Count Example
  2. # Original Source: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
  3. #
  4. # To run this example:
  5. # Terminal 1: nc -lk 9999
  6. # Terminal 2: ./bin/spark-submit structured_streaming_word_count.py localhost 9999
  7. # Note, type words into Terminal 1
  8. #
  9. # Import the necessary classes and create a local SparkSession
  10. from pyspark.sql import SparkSession
  11. from pyspark.sql.functions import explode
  12. from pyspark.sql.functions import split
  13. spark = SparkSession \
  14. .builder \
  15. .appName("StructuredNetworkWordCount") \
  16. .getOrCreate()
  17. # Create DataFrame representing the stream of input lines from connection to localhost:9999
  18. lines = spark\
  19. .readStream\
  20. .format('socket')\
  21. .option('host', 'localhost')\
  22. .option('port', 9999)\
  23. .load()
  24. # Split the lines into words
  25. words = lines.select(
  26. explode(
  27. split(lines.value, ' ')
  28. ).alias('word')
  29. )
  30. # Generate running word count
  31. wordCounts = words.groupBy('word').count()
  32. # Start running the query that prints the running counts to the console
  33. query = wordCounts\
  34. .writeStream\
  35. .outputMode('complete')\
  36. .format('console')\
  37. .start()
  38. # Await Spark Streaming termination
  39. query.awaitTermination()

第六:利用PySpark Streaming读取和分析数据

Step 1. Starting Zookeeper, creating the topic, starting Apache Kafka broker, starting the console producer.
  1. kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties
  2. kafka$ bin/kafka-server-start.sh config/server.properties
  3. kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pysparkBookTopic
  4. kafka$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pysparkBookTopic
Step 2. Starting PySpark with spark-streaming-kafka package.
  1. $ pyspark --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0
Step 3. Creating sum of each row of numbers.
  1. def stringToNumberSum(data):
  2. removedSpaceData = data.strip()
  3. if removedSpaceData == '' :
  4. return(None)
  5. splittedData = removedSpaceData.split(' ')
  6. numData = [float(x) for x in splittedData]
  7. sumOfData = sum(numData)
  8. return (sumOfData)
  9. dataInString = '10 10 20 '
  10. stringToNumberSum(dataInString)
Step 4. Reading data from Kafka and getting sum of each row.
  1. from pyspark.streaming.kafka import KafkaUtils
  2. from pyspark.streaming import StreamingContext
  3. bookStreamContext = StreamingContext(sc, 10)
  4. bookKafkaStream = KafkaUtils.createStream(
  5. ssc = bookStreamContext,
  6. zkQuorum = 'localhost:2185',
  7. groupId = 'pysparkBookGroup',
  8. topics = {'pysparkBookTopic':1}
  9. )
  10. sumedData = bookKafkaStream.map( lambda data : stringToNumberSum(data[1]))
  11. sumedData.pprint()
  12. bookStreamContext.start()
  13. bookStreamContext.awaitTerminationOrTimeout(30)

第七:Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher)

方法1:基于接收者的方法

对于Python应用程序,在部署应用程序时,必须添加上面的库及其依赖关系。 请参阅下面的部署小节。

(1)导入KafkaUtils并创建一个输入DStream,如下所示:

  1. from pyspark.streaming.kafka import KafkaUtils
  2. kafkaStream = KafkaUtils.createStream(streamingContext, \
  3. [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
  • 默认情况下,Python API会将Kafka数据解码为UTF8编码的字符串。 您可以指定自定义解码函数,将Kafka记录中的字节数组解码为任意的数据类型。
  • Kafka中的主题分区与Spark Streaming中生成的RDD的分区不相关。 因此,增加KafkaUtils.createStream()中主题特定分区的数量只会增加单个接收者使用哪些主题的线程数,在处理数据时不会增加Spark的并行性。
  • 多个Kafka输入DStreams可以创建不同的组和主题,用于使用多个接收器并行接收数据。如果已经使用HDFS等复制文件系统启用了写入日志,则接收到的数据已经被复制到日志中。

(2)部署:与任何Spark应用程序一样,spark-submit用于启动您的应用程序。
对于缺乏SBT / Maven项目管理的Python应用程序,可以使用—packages直接将spark-streaming-kafka-0-8_2.11及其依赖关系添加到spark-submit

  1. ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 ...

方法2:直接方法(无接收者)

在Spark 1.3中引入了这种新的无接收器“直接”方法,以确保更强大的端到端保证。 这种方法不是使用接收器来接收数据,而是定期查询Kafka在每个主题+分区中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围。 当处理数据的作业启动时,Kafka简单的客户API用于读取Kafka中定义的偏移范围(类似于从文件系统读取文件)。

与基于接收机的方法(即方法1)相比,这种方法具有以下优点。

  • 简化的并行性:不需要创建多个输入Kafka流并将其合并。使用directStream,Spark Streaming将创建与使用Kafka分区一样多的RDD分区,这些分区将全部从Kafka并行读取数据。所以在Kafka和RDD分区之间有一对一的映射关系,这更容易理解和调整。
  • 效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这会进一步复制数据。这实际上是效率低下的,因为数据被有效地复制了两次 - 一次是由卡夫卡(Kafka),另一次是由预先写入日志(Write Ahead Log)复制。这个第二种方法消除了这个问题,因为没有接收器,因此不需要预先写入日志。只要你有足够的卡夫卡保留,消息可以从卡夫卡恢复。
  • 完全一次的语义:第一种方法使用Kafka的高级API来存储Zookeeper中消耗的偏移量。传统上这是从卡夫卡消费数据的方式。虽然这种方法(结合提前写入日志)可以确保零数据丢失(即至少一次语义),但是在某些失败情况下,有一些记录可能会消耗两次。发生这种情况是因为Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间的不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。在其检查点内,Spark Streaming跟踪偏移量。这消除了Spark Streaming和Zookeeper / Kafka之间的不一致,因此Spark Streaming每次记录都会在发生故障时有效地接收一次。为了实现输出结果的一次语义,将数据保存到外部数据存储区的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务(请参阅主程序中输出操作的语义指导进一步的信息)。

请注意,这种方法的一个缺点是它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监控工具将不会显示进度。但是,您可以在每个批次中访问由此方法处理的偏移量,并自己更新Zookeeper(请参见下文)。

接下来,我们将讨论如何在流式应用程序中使用这种方法。
(1)导入KafkaUtils并创建一个输入DStream,如下所示:

  1. from pyspark.streaming.kafka import KafkaUtils
  2. directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
  • 您也可以传递一个messageHandler到createDirectStream来访问包含关于当前消息的元数据的KafkaMessageAndMetadata并将其转换为任何需要的类型。 默认情况下,Python API会将Kafka数据解码为UTF8编码的字符串。 您可以指定自定义解码函数,将Kafka记录中的字节数组解码为任意的数据类型。 查看API文档和示例。
  • 在Kafka参数中,您必须指定metadata.broker.list或bootstrap.servers。 默认情况下,它将开始消耗每个Kafka分区的最新偏移量。 如果您将Kafka参数中的配置auto.offset.reset设置为最小,那么它将从最小的偏移量开始消耗。
  • 你也可以使用KafkaUtils.createDirectStream的其他变体开始消耗任何偏移量。 此外,如果您想访问每批中消耗的卡夫卡补偿,您可以执行以下操作。
  1. offsetRanges = []
  2. def storeOffsetRanges(rdd):
  3. global offsetRanges
  4. offsetRanges = rdd.offsetRanges()
  5. return rdd
  6. def printOffsetRanges(rdd):
  7. for o in offsetRanges:
  8. print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
  9. directKafkaStream \
  10. .transform(storeOffsetRanges) \
  11. .foreachRDD(printOffsetRanges)
  • 如果您希望基于Zookeeper的Kafka监视工具显示流应用程序的进度,您可以使用它自己更新Zookeeper。
  • 请注意,HasOffsetRanges的类型转换只会在directKafkaStream中调用的第一个方法中完成,而不是在方法链之后。您可以使用transform()而不是foreachRDD()作为第一个方法调用,以访问偏移量,然后调用更多的Spark方法。但是,请注意,RDD分区与Kafka分区之间的一对一映射在任何混洗或重新分区的方法(例如, reduceByKey()或window()。
  • 另外要注意的是,由于这种方法不使用接收器,与标准接收器相关的(即spark.streaming.receiver。形式的配置)将不适用于由此方法创建的输入DStream(将应用于其他输入DStreams)。相反,使用配置spark.streaming.kafka。。最重要的是spark.streaming.kafka.maxRatePerPartition,它是每个Kafka分区将被这个直接API读取的最大速率(每秒消息数)。
    (2)部署:与任何Spark应用程序一样,spark-submit用于启动您的应用程序。
    对于缺乏SBT / Maven项目管理的Python应用程序,可以使用—packages直接将spark-streaming-kafka-0-8_2.11及其依赖关系添加到spark-submit
  1. ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 ...

第八:Spark Streaming + Flume Integration Guide (Kafka broker version 0.8.2.1 or higher)

Apache Flume是一个分布式的,可靠的,可用的服务,用于高效地收集,聚合和移动大量的日志数据。 在这里我们解释如何配置Flume和Spark Streaming来接收来自Flume的数据。 有两种方法。

方法1:Flume-style Push-based Approach

Flume旨在在Flume代理之间推送数据。 在这种方法中,Spark Streaming基本上设置了一个接收器,作为Flume的一个Avro代理,Flume可以将这个接收器推送到这个接收器。
配置步骤:

General Requirements

选择群集中的一台机器

  • 当启动Flume + Spark Streaming应用程序时,其中一名Spark工作人员必须在该机器上运行。
  • Flume可以配置为将数据推送到该机器上的端口。

由于推送模式,流媒体应用程序需要启动,接收机已安排并在所选端口上侦听,Flume才能够推送数据。

Configuring Flume

配置Flume代理通过在配置文件中包含以下内容将数据发送到Avro接收器。

  1. agent.sinks = avroSink
  2. agent.sinks.avroSink.type = avro
  3. agent.sinks.avroSink.channel = memoryChannel
  4. agent.sinks.avroSink.hostname = <chosen machine's hostname>
  5. agent.sinks.avroSink.port = <chosen port on the machine>

Configuring Spark Streaming Application(以Python为例)

  1. from pyspark.streaming.flume import FlumeUtils
  2. flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])

默认情况下,Python API会将Flume事件主体解码为UTF8编码的字符串。 您可以指定自定义解码函数,将Flume事件中的正文字节数组解码为任意任意数据类型。 查看API文档和示例。

请注意,主机名应该与群集中的资源管理器(Mesos,YARN或Spark Standalone)所使用的相同,以便资源分配可以匹配名称,并在正确的机器中启动接收器。

对于缺乏SBT / Maven项目管理的Python应用程序,spark-streaming-flume_2.11及其依赖项可直接添加到使用—packages的spark-submit

  1. ./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.2.1 ...

方法2:Pull-based Approach using a Custom Sink

Flume不是直接将数据直接推送到Spark Streaming,而是运行一个自定义的Flume接收器,它允许进行以下操作。

Flume将数据推入接收器,并且数据保持缓冲。
Spark Streaming使用可靠的Flume接收器和事务来从接收器中提取数据。 只有在Spark Streaming接收和复制数据后,事务才能成功。
这确保了比以前的方法更强大的可靠性和容错保证。 但是,这需要配置Flume运行自定义接收器。
配置步骤:

General Requirements

选择一台将在Flume代理中运行定制接收器的机器。 Flume管道的其余部分配置为向该代理发送数据。 Spark群集中的机器应该可以访问运行定制接收器的选定机器。

Configuring Flume

Configuring Spark Streaming Application

  1. from pyspark.streaming.flume import FlumeUtils
  2. addresses = [([sink machine hostname 1], [sink port 1]), ([sink machine hostname 2], [sink port 2])]
  3. flumeStream = FlumeUtils.createPollingStream(streamingContext, addresses)