有状态的

    1. package com.yang.spark
    2. import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    3. import org.apache.kafka.common.serialization.StringDeserializer
    4. import org.apache.spark.SparkConf
    5. import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    6. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    7. import org.apache.spark.streaming.{Seconds, StreamingContext}
    8. /**
    9. * @author :ywb
    10. * @date :Created in 2022/2/26 1:44 PM
    11. * @description:toDo
    12. */
    13. object SparkKafka01_Consumer {
    14. def main(args: Array[String]): Unit = {
    15. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("kafka-saprk")
    16. val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
    17. ssc.checkpoint("check")
    18. val kafkaPara: Map[String, Object] = Map[String,Object](
    19. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "42.192.229.208:9092",
    20. ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    21. ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    22. ConsumerConfig.GROUP_ID_CONFIG -> "consumer01"
    23. )
    24. val kafkaDSteam: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
    25. ssc,
    26. LocationStrategies.PreferConsistent,
    27. ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaPara)
    28. )
    29. val value: DStream[(String,Int)] = kafkaDSteam.map(item => (item.value(),1))
    30. //TODO 无状态的操作
    31. //val res: DStream[(String, Int)] = value.reduceByKey(_ + _)
    32. //res.print()
    33. //TODO 有状态的操作 注意:需要设定有状态点的设定 ssc.checkpoint("check")
    34. val res: DStream[(String, Int)] = value.updateStateByKey((newDate: Seq[Int], buff: Option[Int]) => {
    35. val i: Int = buff.getOrElse(0) + newDate.sum
    36. Option(i)
    37. })
    38. res.print()
    39. ssc.start()
    40. ssc.awaitTermination()
    41. }
    42. }