Spark Streaming 2.0.x + Kafka 0.10.x

  • spark-streaming 实时消费 Kafka 主题下的日志

一、介绍

二、Direct Approach (No Receivers) , Direct Stream 直流模式

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.storage.StorageLevel
  3. import org.apache.spark.streaming.Seconds
  4. import org.apache.spark.streaming.StreamingContext
  5. import org.apache.kafka.clients.consumer.ConsumerRecord
  6. import org.apache.kafka.common.serialization.StringDeserializer
  7. import org.apache.spark.streaming.kafka010._
  8. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  9. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe;
  10. /**
  11. * 直流模式
  12. */
  13. def runByDirectApproach() : Unit = {
  14. // 创建 sparkStreaming 上下文对象
  15. val conf = new SparkConf()
  16. conf.setMaster("local[2]")
  17. conf.setAppName("UserPortrait")
  18. conf.set("spark.streaming.kafka.maxRetries" , "5")
  19. val ssc = new StreamingContext(conf, Seconds(this.config.get.batchDuration))
  20. val brokerList = "broker1:9092,broker2:9092,broker3:9092"
  21. val topics = Array("topic1", "topic2")
  22. val groupId = "userPortrait"
  23. val kafkaParams = Map[String, Object](
  24. "bootstrap.servers" -> brokerList,
  25. "key.deserializer" -> classOf[StringDeserializer],
  26. "value.deserializer" -> classOf[StringDeserializer],
  27. "group.id" -> groupId,
  28. "auto.offset.reset" -> "latest",
  29. "enable.auto.commit" -> (false: java.lang.Boolean)
  30. )
  31. println(kafkaParams)
  32. val directKafkaStream = KafkaUtils.createDirectStream[String, String](
  33. ssc,
  34. PreferConsistent,
  35. Subscribe[String, String](topics, kafkaParams)
  36. )
  37. directKafkaStream.foreachRDD{ (rdd, time) =>
  38. println("------ " + "RDD : " + rdd + " , Time: " + time + " --------- ")
  39. // 循环分区
  40. rdd.foreachPartition { partitionIterator =>
  41. // 循环分区值
  42. partitionIterator.foreach( consumerRecord => {
  43. println("-----------------------------------------------------------------------------------------")
  44. // 当前批次日志
  45. val curLines = consumerRecord.value()
  46. curLines.split("\n").foreach { curLine =>
  47. val logInfo = this.formatLogData(curLine)
  48. val logType = logInfo.getOrElse("logType", "").toString()
  49. logType match {
  50. case "accessLog" => {
  51. this.recommendAction(logInfo)
  52. }
  53. case _ => println("nothing")
  54. }
  55. }
  56. })
  57. }
  58. }
  59. // 启动流计算环境 StreamingContext 并等待它完成"
  60. ssc.start()
  61. // 等待作业完成
  62. ssc.awaitTermination()
  63. }

三、案例 Demo

1. Kafka Streaming 直流模式 操作 SparkSQL

  1. package com.dw2345.dw_realtime.safe_log
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.TaskContext
  4. import org.apache.spark.sql.{SparkSession, DataFrame, SQLContext}
  5. import org.apache.spark.storage.StorageLevel
  6. import org.apache.spark.rdd.RDD
  7. import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
  8. import org.apache.spark.streaming.kafka010._
  9. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  10. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  11. import org.apache.kafka.clients.consumer.ConsumerRecord
  12. import org.apache.kafka.common.serialization.StringDeserializer;
  13. object SafeRealtimeClickLog {
  14. /** Case class for converting RDD to DataFrame */
  15. case class Record(line: String)
  16. def main(args: Array[String]) {
  17. val sparkConf = new SparkConf()
  18. sparkConf.setMaster("local[2]")
  19. sparkConf.setAppName("SafeRealTimeClickLog")
  20. val sparkContext = new StreamingContext(sparkConf, Seconds(3))
  21. val kafkaBrokers = "dw7:9092,dw8:9092,d9:9092"
  22. val topics = Array("test")
  23. val consumerId = "safe-realtime-click-log"
  24. val groupId = "test-consumer-group"
  25. val kafkaParams = Map[String, Object](
  26. "bootstrap.servers" -> kafkaBrokers,
  27. "key.deserializer" -> classOf[StringDeserializer],
  28. "value.deserializer" -> classOf[StringDeserializer],
  29. "consumer.id" -> consumerId,
  30. "group.id" -> groupId,
  31. "auto.offset.reset" -> "latest",
  32. // 是否自动提交 offset
  33. "enable.auto.commit" -> (false: java.lang.Boolean)
  34. )
  35. val directKafkaStream = KafkaUtils.createDirectStream[String, String](
  36. sparkContext,
  37. PreferConsistent,
  38. Subscribe[String, String](topics, kafkaParams)
  39. )
  40. directKafkaStream.foreachRDD{ (rdd: RDD[ConsumerRecord[String, String]], time: Time) =>
  41. println("------ " + "RDD : " + rdd + " , Time: " + time + " --------- ")
  42. println("------ " + "PartitionsNum : " + rdd.getNumPartitions + " --------- ")
  43. // Get the singleton instance of SparkSession
  44. val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
  45. import spark.implicits._
  46. // Convert RDD[ConsumerRecord[String, String]] to RDD[String]
  47. val consumerRecordValueRDD = rdd.map(consumerRecord => consumerRecord.value())
  48. // Convert RDD[String] to RDD[case class]
  49. val consumerRecordCase = consumerRecordValueRDD.map(line=>Record(line))
  50. // RDD[case class] to DataFrame
  51. val consumerDataFrame = consumerRecordCase.toDF()
  52. // Creates a temporary view using the DataFrame
  53. consumerDataFrame.createOrReplaceTempView("record")
  54. // Query Sql
  55. val wordCountsDataFrame = spark.sql("SELECT line FROM record")
  56. wordCountsDataFrame.show()
  57. }
  58. // 启动流计算环境 StreamingContext 并等待它"完成"
  59. sparkContext.start()
  60. // 等待作业完成
  61. sparkContext.awaitTermination()
  62. }
  63. }
  64. /** Lazily instantiated singleton instance of SparkSession
  65. */
  66. object SparkSessionSingleton {
  67. // 在JVM中为transient字段,非序列化的一部分,常用语临时保存的缓存数据,或易于重新计算的数据。
  68. @transient private var instance: SparkSession = _
  69. def getInstance(sparkConf: SparkConf): SparkSession = {
  70. if (instance == null) {
  71. instance = SparkSession
  72. .builder
  73. .config(sparkConf)
  74. //.config("spark.sql.warehouse.dir", warehouseLocation)
  75. //.enableHiveSupport()
  76. .getOrCreate()
  77. }
  78. instance
  79. }
  80. }