第一章.SparkStreaming概述

1.SparkStreaming是什么

$09[SparkStreaming流式处理] - 图1

2.Spark Streaming架构原理

一.DStream介绍

$09[SparkStreaming流式处理] - 图2

二.架构图

  1. 整体架构图

$09[SparkStreaming流式处理] - 图3

  1. SparkStreaming架构图

$09[SparkStreaming流式处理] - 图4

三.背压机制

  1. - Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。
  2. - 为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure):根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
  3. - 通过属性“spark.streaming.backpressure.enabled”来控制是否启用背压机制,默认值false,即不启用。

3.SparkStreaming特点

  1. 易用

$09[SparkStreaming流式处理] - 图5

  1. 容错

$09[SparkStreaming流式处理] - 图6

  1. 易整合到Spark体系

$09[SparkStreaming流式处理] - 图7

第二章.DStream入门

1.WordCount案例实操

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStream读取端口数据并统计不同单词出现的次数

  1. 添加依赖
  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.12</artifactId>
  5. <version>3.0.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.spark</groupId>
  9. <artifactId>spark-streaming_2.12</artifactId>
  10. <version>3.0.0</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>junit</groupId>
  14. <artifactId>junit</artifactId>
  15. <version>4.12</version>
  16. </dependency>
  17. </dependencies>
  1. 编写代码
package com.atguigu.spark.day09

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object $01_WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建StreamingContext,设置一个批次的时间
    val conf = new SparkConf().setMaster("local[4]").setAppName("test")
    val ssc = new StreamingContext(conf, Seconds(5))
    //设置日志级别
    ssc.sparkContext.setLogLevel("error")
    //2.从网络读取数据
    val ds = ssc.socketTextStream("hadoop102", 9999)
    //3.数据处理
    val ds2 = ds.flatMap(_.split(" "))
    val ds3 = ds2.map(x => {
      (x, 1)
    })
    val ds4 = ds3.reduceByKey(_+_)
    //4.结果展示
    ds4.print()
    //5.启动程序
    ssc.start()
    //6.阻塞主线程,等待外部停止
    ssc.awaitTermination()
  }

}
  1. 启动程序并通过netcat发送数据

$09[SparkStreaming流式处理] - 图8

  1. 在IDEA的控制台输出如下内容

$09[SparkStreaming流式处理] - 图9

注意:目前用的算子,只能处理本批次数据的累加,不能统计所有批次总的单词个数

2.WordCount解析

  • DStream是Spark Streaming 的基础抽象,代表持续性的数据流和经过各种spark算子操作后的结果数据流
  • 在内部实现上,每一批次的数据封装成一个RDD,一系列连续的RDD组成DStream,对这些RDD的转换是由Spark引擎来计算
  • 说明:DStream中批次与批次之间计算相互独立,如果批次设置时间小于计算时间会出现计算任务叠加情况,需要多分配资源,通常情况,批次设置时间要小于计算时间

$09[SparkStreaming流式处理] - 图10

第三章.DStream创建

1.队列数据源

package com.atguigu.spark.day09

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object $02_DStreamQueue {
  def main(args: Array[String]): Unit = {
    //1.创建StreamingContext,设置一个批次的时间
    val conf = new SparkConf().setMaster("local[4]").setAppName("test")
    val ssc = new StreamingContext(conf,Seconds(5))
    //设置日志级别
    ssc.sparkContext.setLogLevel("error")
    val queue = mutable.Queue[RDD[String]]()
    //读取数据
      /**
     * oneAtATime=true 默认,一次读取队列里面的一个数据
     * oneAtATime=false,按照设定的时间,读取队列里面的数据
     */
    val ds = ssc.queueStream(queue,false)
    //操作数据
    val ds2 = ds.flatMap(_.split(" "))
    val ds3 = ds2.map((_,1))
    val ds4 = ds3.reduceByKey(_+_)
    //结果展示
    ds4.print()
    //启动程序
    ssc.start()
    for (i<- 1 to 100){
      val rdd = ssc.sparkContext.parallelize(List("hello java hello spark","hello hadoop flume spark"))
      queue.+=(rdd)
      Thread.sleep(2000)
    }
    //阻塞主线程
    ssc.awaitTermination()

  }

}

2.自定义数据源

  1. MyReceiver.scala
package com.atguigu.spark.day09

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket

class MyReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK){
  //receiver启动的时候调用
  override def onStart(): Unit = {
    new Thread(){
      override def run(): Unit = {
        receive()
      }
    }.start()
  }
  def receive():Unit={
    //创建Socket客户端
    val socket = new Socket("hadoop102",9999)
    //获取输入流
    val br = new BufferedReader(new InputStreamReader(socket.getInputStream))
    var line = br.readLine()
    while(line!=null && !isStopped()){
      store(line)
      line= br.readLine()
    }
    br.close()
    socket.close()
  }

  //receiver停止的时候调用
  override def onStop(): Unit = {

  }

}
  1. UserDefinedSource.scala
package com.atguigu.spark.day09

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object $03_UserDefinedSource {
  def main(args: Array[String]): Unit = {
    //创建StreamingContext,设置一个批次的时间
    val conf = new SparkConf().setMaster("local[4]").setAppName("test")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.sparkContext.setLogLevel("info")
    val ds = ssc.receiverStream(new MyReceiver)
    ds.flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. 测试

$09[SparkStreaming流式处理] - 图11

$09[SparkStreaming流式处理] - 图12

3.kafka数据源

  1. 版本选型

$09[SparkStreaming流式处理] - 图13

$09[SparkStreaming流式处理] - 图14

注意:目前spark3.0版本只有Direct模式

总结:不同版本的offset存储位置

  • 0-8 ReceiverAPI offset默认存储在:Zookeeper中
  • 0-8 DirectAPI offset默认存储在:CheckPoint
  • 手动维护:MySQL等有事务的存储系统
  • 0-10 DirectAPI offset默认存储在:_consumer_offsets系统主题
  • 手动维护:MySQL等有事务存储系统
  1. 代码编写

需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印在控制台

导入依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
</dependency>

kafkaSource.scala

package com.atguigu.spark.day09

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object $04_KafkaSource {
  def main(args: Array[String]): Unit = {

    //创建StreamingContext,设置一个批次的时间
    val conf = new SparkConf().setMaster("local[4]").setAppName("test")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.sparkContext.setLogLevel("error")
    //读取数据
    //设置读取的topic
    val arr = Array[String]("spark")
    //设置消费者参数
    val props = Map[String,Object](
      //指定k,v的反序列化器
      "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
      //指定kafka broker地址
      "bootstrap.servers"->"hadoop102:9092,hadoop103:9092",
      //指定消费者组id
      "group.id"->"spark1",
      //指定消费者组第一次消费topic数据的时候从哪个位置开始消费
      "auto.offset.reset"->"earliest",
      //是否自动提交offset
      "enable.auto.commit"->"true"
    )
    /**
     * 读取kafka数据创建DStream
     */
    val ds = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](arr,props))
     //处理数据
    ds.foreachRDD(rdd=>{
      val messageRdd = rdd.map(_.value())
      //sparkStreaming读取Kafka数据的时候,rdd分区数=topic分区数
      println(s"rdd分区数=${rdd.getNumPartitions}")
      val result = messageRdd.flatMap(_.split(" "))
        .map((_,1))
        .reduceByKey(_+_)
        .collect()
      println(result.toList)
    })
    //启动,阻塞
    ssc.start()
    ssc.awaitTermination()  
  }

}
  1. 测试
  2. 分别启动Zookeeperhe 和 Kafka集群

$09[SparkStreaming流式处理] - 图15

  1. 创建topic
kafka-topics.sh --create --topic spark --bootstrap-server hadoop102:9092 --partitions 2
  1. 启动生产者
kafka-console-producer.sh --broker-list hadoop102:9092 --topic spark
  1. 启动消费者测试
kafka-console-consumer.sh --topic spark --bootstrap-server hadoop102:9092
  1. 运行程序

$09[SparkStreaming流式处理] - 图16

第四章.DStream转换

DStream上的操作与RDD的类似,分为转换和输出两种,此外转换操作中还有一些比较特殊的原语,如updateStateByKey(),transform()以及各种window相关的原语

1.无状态转换操作

无状态转换操作,就是把RDD转换操作应用到DStream每个批次上,每个批次相互独立,自己算自己的

一.常规无状态转换操作

DStream的部分无状态转化操作列在了下表中,都是DStream自己的API

注意,针对键值对的DStream转化操作,要添加import StreamingContext._才能在scala中使用,比如reduceByKey()

函数名称 目的 Scala示例 函数签名
map() 对DStream中的每个元素应用给定函数,返回由各元素输出的元素组成的DStream ds.map(x=>x + 1) f: (T) -> U
flatMap() 对DStream中的每个元素应用给定函数,返回由各元素输出的迭代器组成的DStream。 ds.flatMap(x => x.split(“ “)) f: T -> Iterable[U]
filter() 返回由给定DStream中通过筛选的元素组成的DStream ds.filter(x => x != 1) f: T -> Boolean
repartition() 改变DStream的分区数 ds.repartition(10) N / A
reduceByKey() 将每个批次中键相同的记录规约。 ds.reduceByKey( (x, y) => x + y) f: T, T -> T
groupByKey() 将每个批次中的记录根据键分组。 ds.groupByKey() N / A

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD批次组成,且无状态转化操作是分别应用到每个RDD批次上的

二.Transform

需求:通过Transform可以将DStream每一批次的数据直接转换为RDD的算子操作

package com.atguigu.spark.day09

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.junit.Test

class $05_NoStatus {
  //创建StreamingContext,设置一个批次的时间
  val conf = new SparkConf().setMaster("local[4]").setAppName("test")
  val ssc = new StreamingContext(conf,Seconds(5))
  ssc.sparkContext.setLogLevel("error")
  @Test
  def transform():Unit={
    val ds = ssc.socketTextStream("hadoop102",9999)
    val ds2 = ds.transform(rdd=>{
      rdd.flatMap(_.split(" "))
    })
    ds2.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

2.有状态转化操作

有状态转化操作,计算当前批次RDD时,需要用到历史RDD的数据

一.UpdateStateByKey

- updateStateByKey()用于键值对形式的DStream,可以记录历史批次状态。例如可以实现累加WordCount。
- updateStateByKey()参数中需要传递一个函数,在函数内部可以根据需求对新数据和历史状态进行整合处理,返回一个新的DStream。

- 注意:使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。
   - checkpoint小文件过多
   - checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次
package com.atguigu.spark.day09

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object $06_UpdateStateByKey {
  /**
   * updateStateByKey:统计全局结果
   */
  def main(args: Array[String]): Unit = {
    //创建StreamingContext,设置一个批次的时间
    val conf = new SparkConf().setMaster("local[4]").setAppName("test")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("error")
    //设置状态的保存路径
    ssc.checkpoint("checkpoint")
    val ds = ssc.socketTextStream("hadoop102",9999)
    val ds2 = ds.flatMap(_.split(" ")).map((_,1))
    println(ds2)
    val func=(currentBathValues:Seq[Int],state:Option[Int])=>{
      /**
       * currentBathValues:当前批次key对应的所有value值
       * state:之前批次key对应的结果
       */
      //获取之前批次统计的value值
      val total = state.getOrElse(0) + currentBathValues.sum
      Some(total)
    }
    val ds3 = ds2.updateStateByKey(func)
    ds3.print()
    ssc.start()
    ssc.awaitTermination()

  }

}

启动程序并向9999端口发送数据

$09[SparkStreaming流式处理] - 图17

$09[SparkStreaming流式处理] - 图18

二.Window

window operation可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态,所有基于窗口的操作都需要这两个参数,分别为窗口时长以及滑动步长

  • 窗口时长:计算内容的时间范围
  • 滑动步长:隔多久触发一次计算
package com.atguigu.spark.day09

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object $07_Window {
  def main(args: Array[String]): Unit = {
    //创建StreamingContext,设置一个批次的时间
    val conf = new SparkConf().setMaster("local[4]").setAppName("test")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.sparkContext.setLogLevel("error")
    ssc.checkpoint("checkpoint")
    val ds = ssc.socketTextStream("hadoop102",9999)
    val ds2 = ds.flatMap(_.split(" "))
      .map(x=>("---->"+x,1))
    ds2.print()
    //val ds3 = ds.flatMap(_.split(" ")).map((_,1)).window(Seconds(20),Seconds(10))
    //val ds4 = ds3.reduceByKey(_+_)
    //ds4.print()

    ds.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((agg,curr)=>{
      //之前批次结果+滑入批次的结果数据
      println(s"第一个函数: ${agg} ${curr}")
      agg+curr
    },(agg,curr)=>{
      //此函数是之前批次的结果-滑窗批次的结果数据
      println(s"第二个函数: ${agg} ${curr}")
      agg-curr
    },windowDuration = Seconds(20),Seconds(5)).print()
    ssc.start()
    ssc.awaitTermination()
  }

}

第五章.DStream输出

DStream通常将数据输出到,外部数据库或屏幕上。

foreachRDD(func):这是最通用的输出操作,即将函数func用于产生DStream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者写入数据库。

在企业开发中通常采用foreachRDD(),它用来对DStream中的RDD进行任意计算。这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到如MySQL的外部数据库中。

package com.atguigu.spark.day09

import com.mysql.jdbc.Connection

import java.sql.{DriverManager, PreparedStatement}

object $08_Output {
  import org.apache.spark.SparkConf
  import org.apache.spark.streaming.{Seconds, StreamingContext}
  //1、创建StreamingContext,设置一个批次的时间
  val conf = new SparkConf().setMaster("local[4]").setAppName("test")
  val ssc = new StreamingContext(conf, Seconds(5))
  ssc.sparkContext.setLogLevel("error")
  val ds = ssc.socketTextStream("hadoop102",9999)
  ds.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreachRDD(rdd=>{
    //以下代码是在Driver执行的
    rdd.foreachPartition(it=>{
      //jdbc
      var connection:Connection = null
      var statement:PreparedStatement = null
      try{

        connection = DriverManager.getConnection("....")
        statement = connection.prepareStatement("insert into .. values(?,?)")

        var i = 0
        it.foreach(x => {
          statement.setString(1,x._1)
          statement.setInt(2,x._2)
          statement.addBatch()
          if(i%1000==0){
            //提交一个批次数据
            statement.executeBatch()
            statement.clearBatch()
          }
          i = i +1
        })
        //提交最后一个不满1000条的批次数据
        statement.executeBatch()

      }catch {
        case e:Exception => e.printStackTrace()
      }finally {
        if(statement!=null)
          statement.close()
        if(connection!=null)
          connection.close()
      }
    })
  })
  ssc.start()
  ssc.awaitTermination()

}

第六章.优雅的关闭

流式任务需要7*24小时执行,所以有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了 关闭方式:使用外部文件系统来控制内部程序关闭

package com.atguigu.spark.day09

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import java.net.URI

object $09_StreamingClose {
  def main(args: Array[String]): Unit = {
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    //1、创建StreamingContext,设置一个批次的时间
    val conf = new SparkConf().setMaster("local[4]").setAppName("test")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("error")
    ssc.socketTextStream("hadoop102",9999).flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()
    ssc.start()
    //监控HDFS某个目录是否存在
    val fs = FileSystem.get(new URI("hdsf://hadoop102://9820"),new Configuration())
    //如果目录存在则继续执行,不存在则退出
    while(fs.exists(new Path("hdfs://hadoop102:9820/xx"))){
      Thread.sleep(2000)
    }
    //stopGracefully:如果设置为true,则等到所有接收的数据全部处理完成才会停止
    ssc.stop(true,true)
    ssc.awaitTermination()
  }

}