有状态的
package com.yang.spark
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author :ywb
* @date :Created in 2022/2/26 1:44 PM
* @description:toDo
*/
object SparkKafka01_Consumer {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("kafka-saprk")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))
ssc.checkpoint("check")
val kafkaPara: Map[String, Object] = Map[String,Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "42.192.229.208:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> "consumer01"
)
val kafkaDSteam: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaPara)
)
val value: DStream[(String,Int)] = kafkaDSteam.map(item => (item.value(),1))
//TODO 无状态的操作
//val res: DStream[(String, Int)] = value.reduceByKey(_ + _)
//res.print()
//TODO 有状态的操作 注意:需要设定有状态点的设定 ssc.checkpoint("check")
val res: DStream[(String, Int)] = value.updateStateByKey((newDate: Seq[Int], buff: Option[Int]) => {
val i: Int = buff.getOrElse(0) + newDate.sum
Option(i)
})
res.print()
ssc.start()
ssc.awaitTermination()
}
}