Spark Streaming

streaming-arch

一、Spark Streaming (允许用户使用一套和批处理非常接近的 API 来编写流式计算应用)

  • Spark - Streaming DStream 架构
  • Spark Streaming 使用离散化流 (discretized stream) 作为抽象表示,叫做 DStream
  • DStream 是一个持续的 RDD 序列
  • DStream 是随时间推移而受到的数据的序列,在内部每个时间区间收到的数据都作为 RDD 存在
  • DStream 可以通过各种输入源创建,比如(Flume,Kafka,HDFS 等)

1. Spark Streaming 程序例子

  1. package com.angejia.dw.spark
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.StreamingContext
  4. import org.apache.spark.streaming.StreamingContext._
  5. import org.apache.spark.streaming.dstream.DStream
  6. import org.apache.spark.streaming.Duration
  7. import org.apache.spark.streaming.Seconds;
  8. object SparkStreamingTest {
  9. def main(args: Array[String]) {
  10. this.testStreaming()
  11. }
  12. def testStreaming() : Unit = {
  13. val conf = new SparkConf().setAppName("TestStreaming")
  14. // 从 SparkConf 创建 StreamingContext 并指定 1 秒钟的批处理大小
  15. val scc = new StreamingContext(conf,Seconds(2))
  16. // 连接到本地机器 7777 端口上后,使用收到的数据创建 DStream
  17. val lines = scc.socketTextStream("127.0.0.1", 7777)
  18. // 从 DStream 中筛选出包含字符串的 "error" 的行
  19. val errorLines = lines.filter { _.contains("error")}
  20. // 打印出 "error"
  21. errorLines.print()
  22. // 启动流计算环境 StreamingContext 并等待它"完成"
  23. scc.start()
  24. // 等待作业完成
  25. scc.awaitTermination()
  26. }
  27. }
  28. // 提交 spark 到集群
  29. spark-submit --name TestStreaming --class com.angejia.dw.spark.SparkStreamingTest --master local[2] ./spark-test.jar

二. DStream 操作

  • transformation 转化操作,会生成一个新的 transformation
  • output operation 输出操作,可以把数据写入外部系统中,比如(HDFS)

1. transformation 转化操作

1.1 transformation - 无状态转化操作

  • 无状态转化操作 : 分别应用到每个 RDD 上的
  1. map()
  2. flatMap()
  3. filter()
  4. repartition()
  5. reduceByKey()
  6. groupByKey()
  7. join()
  8. 连接 2 DStream

1.2 transformation - 有状态转化操作

  • 有状态转化操作 : 跨时间区间跟踪数据的操作
  • 当需要先前批次的数据,也用来在新的批次中计算结果
  • 主要类型
    • 滑动窗口 : 以一个时间段为滑动窗口进行操作
    • updateStateByKey() : 跟踪每个键的状态变化(例如构建一个代表用户会话的对象)
1.2.1 transformation - 有状态转化操作 - 窗口操作
  • 参数 :
    • 窗口时长 : 控制每次计算最近多少个批次的数据
    • 滑动步长 : 控制对新的 DStream 进行计算的间隔
  1. // SparkStreaming 上下文
  2. val conf = new SparkConf().setAppName("testDStreamWindows")
  3. val scc = new StreamingContext(conf,Seconds(2))
  4. // 有状态操作,需要在 StreamingContext 中打开检查点机制来确保容错性
  5. scc.checkpoint("hdfs://path/dir")
  6. // 监听
  7. val linesLog = scc.fileStream("/")
  8. // 窗口
  9. val windowLog = linesLog.window(Seconds(30),Seconds(10))
  10. windowLog.count()
1.2.2 transformation - 有状态转化操作 - updateStateByKey()
  • 应用案例
    • DStream 中跨批次维护状态(例如跟踪用户访问网站的状态)
  • 用于键值对形式的 DStream
  • 给定一个有(键,事件)对构成的 DStream ,并传递一个指定如何根据新的事件,更新每个键对应状态的函数
  • updateStateByKey() 的结果是一个新的 DStream,其内部的 RDD 序列是由每个事件区间对应的 (键,状态) 对组成的

2. output operation 输出操作

  1. val conf = new SparkConf().setAppName("testDStreamOutputOperation")
  2. val scc = new StreamingContext(conf,Seconds(2))
  3. val linesLog = scc.fileStream("/")
  4. // 保存为文件例子
  5. linesLog.saveAsTextFiles("/path/xxx", "txt")
  6. // 循环 RDD 输出、保存例子
  7. // 循环 RDD
  8. linesLog.foreachRDD(rdd => {
  9. // 循环 RDD 分区
  10. rdd.foreachPartition( partition => {
  11. // 循环 RDD 分区内容 (打开连接到存储系统的连接,比如数据库连接)
  12. partition.foreach( item => {
  13. // 把 item 写到系统中
  14. })
  15. // 关闭连接
  16. })
  17. })

二、Spark Streaming 输入源

1. 核心数据源

  • 本地文件
  • HDFS
  • Kafka
  • Flume
  • Akka actor

2. 本地文件

  1. // 监听目录
  2. val logData = scc.textFileStream("dir")

3. HDFS

3. Kafka

4. Flume

三、Spark Streaming 性能优化

  1. 1. 批次和窗口大小
  2. 500 毫秒为比较好的最小批次大小,通过不断处理减少这个参数,根据 Spark 界面去查看处理时间是否增加
  3. 2. 并行度
  4. 1) 增加接收器数目
  5. 通过创建多个 DStream(这样会创建多个接收器)
  6. 2) 将受到的数据显示地重新分区
  7. 重新分配分区,或者合并多个流得到的数据流
  8. 3) 提高聚合计算的并行度