创建

主要是通过kafka创建
direct模式

  1. import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
  2. import org.apache.kafka.common.serialization.StringDeserializer
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.streaming.dstream.InputDStream
  5. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
  6. import org.apache.spark.streaming.{Seconds, StreamingContext}
  7. object Spark01_DirectAPI010 {
  8. def main(args: Array[String]): Unit = {
  9. //1.创建SparkConf
  10. val conf: SparkConf = new SparkConf().setAppName("DirectAPI010").setMaster("local[*]")
  11. //2.创建StreamingContext
  12. val ssc = new StreamingContext(conf, Seconds(3))
  13. //3.构建Kafka参数
  14. val kafkaParmas: Map[String, Object] = Map[String, Object](
  15. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
  16. ConsumerConfig.GROUP_ID_CONFIG -> "bigdata191122",
  17. ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  18. ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
  19. )
  20. //4.消费Kafka数据创建流
  21. val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
  22. LocationStrategies.PreferConsistent,
  23. ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaParmas))
  24. //5.计算WordCount并打印
  25. kafkaDStream.map(_.value())
  26. .flatMap(_.split(" "))
  27. .map((_, 1))
  28. .reduceByKey(_ + _)
  29. .print()
  30. //6.启动任务
  31. ssc.start()
  32. ssc.awaitTermination()
  33. }
  34. }

转换

无状态

无状态转化操作就是把简单的RDD转化操作应用到单个批次上,例如reduceByKey只会汇总单个批次的数据。 又分为DStream的API 、非DStream的API
DStream的API:

  1. DStreamAPI
  2. val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)
  3. //转换为RDD操作
  4. val wordAndCountDStream: DStream[(String, Int)] = lineDStream.map。。。。
  5. flatMap
  6. filter
  7. repartitioin
  8. reduceByKey:聚合小批次内 仅仅
  9. groupByKey

非DStream的API:Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来.

  • 使用transform函数只有一定要返回一个新的DStream。 ```scala object Spark06_Nostate_Transform { def main(args: Array[String]): Unit = { //创建SparkConf val conf: SparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“WordCount”) //创建StreamingContext val ssc = new StreamingContext(conf, Seconds(3)) //创建DStream val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream(“hadoop202”, 9999) //转换为RDD操作 val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {
    1. val words: RDD[String] = rdd.flatMap(_.split(" "))
    2. val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
    3. val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
    4. value
    5. })
    //打印 wordAndCountDStream.print //启动 ssc.start() ssc.awaitTermination() }
  1. <a name="WkGym"></a>
  2. #### 有状态
  3. - updateStateByKey需要对检查点目录进行配置
  4. ```scala
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.streaming.{Seconds, StreamingContext}
  7. object WorldCount {
  8. def main(args: Array[String]) {
  9. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  10. val ssc = new StreamingContext(conf, Seconds(3))
  11. ssc.checkpoint("hdfs://hadoop102:9000/streamCheck")
  12. val lines = ssc.socketTextStream("hadoop102", 9999)
  13. val words = lines.flatMap(_.split(" "))
  14. val pairs = words.map(word => (word, 1))
  15. // 定义更新状态方法,参数values为当前批次单词频度,state为以往批次单词频度
  16. val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  17. val currentCount = values.foldLeft(0)(_ + _)
  18. val previousCount = state.getOrElse(0)
  19. Some(currentCount + previousCount)
  20. }
  21. val stateDstream = pairs.updateStateByKey[Int](updateFunc)
  22. stateDstream.print()
  23. //或者直接写
  24. //val stateDS: DStream[(String, Int)] = mapDS.updateStateByKey(
  25. // (seq: Seq[Int], state: Option[Int]) => {Option(seq.sum + state.getOrElse(0))})
  26. ssc.start() // Start the computation
  27. ssc.awaitTermination() // Wait for the computation to terminate
  28. }
  29. }
  • windows操作: 窗口时长 和 滑动步长为采集周期的整倍数 ```scala object Spark08State_window { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“WordCount”) val ssc = new StreamingContext(sparkConf, Seconds(3)) //设置检查点路径 用于保存状态 ssc.checkpoint(“D:\dev\workspace\my-bak\spark-bak\cp”) //创建DStream val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream(“hadoop202”, 9999) val flatMapDS: DStream[String] = lineDStream.flatMap(.split(“ “))

    //设置窗口大小,滑动的步长 val windowDS: DStream[String] = flatMapDS.window(Seconds(6),Seconds(3)) //结构转换 val mapDS: DStream[(String, Int)] = windowDS.map((,1)) //聚合 val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(+_) reduceDS.print() //启动 ssc.start() ssc.awaitTermination() } }

```scala
(1)window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的Dstream
(2)countByWindow(windowLength, slideInterval):返回一个滑动窗口计数流中的元素。
(3)reduceByWindow(func, windowLength, slideInterval):通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。Note:默认情况下,这个操作使用Spark的默认数量并行任务(本地是2),在集群模式中依据配置属性(spark.default.parallelism)来做grouping。你可以通过设置可选参数numTasks来设置不同数量的tasks。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):这个函数是上述函数的更高效版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。注意:为了使用这个操作,检查点必须可用。 
(6)countByValueAndWindow(windowLength,slideInterval, [numTasks]):对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的值是其在滑动窗口中频率。如上,可配置reduce任务数量。

DF&SQL

  • 要想用DF操作流,必须创建SQLContext(SparkSession里包含SQLContext)
  • 加入隐式转换

    object Spark01_WordCount {
    def main(args: Array[String]): Unit = {
      val conf: SparkConf = new SparkConf().setAppName("Spark01_W").setMaster("local[*]")
      val ssc = new StreamingContext(conf,Seconds(3))
      val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202",9999)
      val flatMapDS: DStream[String] = socketDS.flatMap(_.split(" "))
      val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1))
      //
      val spark = SparkSession.builder.config(conf).getOrCreate()
      import spark.implicits._
      mapDS.foreachRDD(rdd =>{
        val df: DataFrame = rdd.toDF("word", "count")
        df.createOrReplaceTempView("words")
        spark.sql("select * from words").show
      })
      //val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)
      //reduceDS.print()
      //启动采集器
      ssc.start()
      //等待采集结束,终止上下文环境对象
      ssc.awaitTermination()
    }
    }
    

    行动算子 action

  • print()

在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。

  • saveAsTextFiles(prefix, [suffix])

以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。

  • saveAsObjectFiles(prefix, [suffix])

以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为”prefix-TIME_IN_MS[.suffix]”. Python中目前不可用。

  • saveAsHadoopFiles(prefix, [suffix]

将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为”prefix-TIME_IN_MS[.suffix]”。Python API 中目前不可用。

  • foreachRDD(func) 这里的RDD是指每一个批次,因为一个批次一个RDD

这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。
通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中,但是在使用的时候需要注意以下几点
连接不能写在driver层面(序列化); 让task创建链接
里面写foreachPartition,在分区创建(获取)。

// 下面的代码就是 foreachRDD 里套foreachPartition
filterDStream.foreachRDD {
  rdd => { //rdd里面 foreachpartition外面的代码,创建的对象是在driver端
    rdd.foreachPartition {
      jsonObjIter => {
        val dauList: List[(String, DauInfo)] = jsonObjIter.map {
          jsonObj => {
            val commonJsonObj: JSONObject = jsonObj.getJSONObject("common")

            val dauInfo: DauInfo = DauInfo(
              commonJsonObj.getString("mid"),
              commonJsonObj.getString("uid"),
              commonJsonObj.getString("ar"),
              commonJsonObj.getString("ch"),
              commonJsonObj.getString("vc"),
              commonJsonObj.getString("dt"),
              commonJsonObj.getString("hr"),
              "00",
              commonJsonObj.getLong("ts")
            )
            (dauInfo.mid, dauInfo)
          }
        }.toList
        //调用ES工具类,将dauList中的数据批量保存到ES中
        val dt: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date)
        //向ES中批量插入数据
        MyESUtil.bulkInsert(dauList, "gmall0421_dau_info_" + dt)
      }
    }
    //保存offset
    OffsetManageUtil.saveOffset(topic, groupId, offsetRanges)
  }
}