依赖包

  1. <dependency>
  2. <groupId>redis.clients</groupId>
  3. <artifactId>jedis</artifactId>
  4. <version>3.1.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.spark</groupId>
  8. <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  9. <version>2.3.0</version>
  10. </dependency>

redis连接池

  1. import org.apache.spark.internal.Logging
  2. import redis.clients.jedis.Jedis
  3. object RedisConnection extends Logging{
  4. private var conn: Jedis = null
  5. private var count = 0
  6. def getConnection(): Jedis = {
  7. if (conn == null) {
  8. this.synchronized {
  9. if (conn == null) {
  10. createConnection()
  11. }
  12. }
  13. }
  14. logInfo(s"---------------ConnectionPool.getConnection count=${count}-----------")
  15. conn
  16. }
  17. private def createConnection():Unit={
  18. count = count + 1
  19. conn = new Jedis("localhost",6379)
  20. conn.auth("123456")
  21. conn.select(3)
  22. logInfo(s"---------------ConnectionPool.createConnection count=${count}-----------")
  23. }
  24. def main(args: Array[String]): Unit = {
  25. val jedis= new Jedis("localhost",6379)
  26. jedis.auth("123456")
  27. jedis.select(3)
  28. println(jedis.get("offset"))
  29. jedis.close()
  30. }
  31. }

StreamingQueryListener

  1. import org.apache.spark.sql.streaming.StreamingQueryListener
  2. class SparkStreamingQueryListener(appName: String) extends StreamingQueryListener {
  3. override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
  4. println("----------------spark structure streaming start---------------------")
  5. }
  6. override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
  7. println("*************************process1**********************")
  8. val s=event.progress.sources.last.startOffset
  9. if(s!=null){
  10. RedisConnection.getConnection().set("test_offset1",s)
  11. println("*******************写入redis********************")
  12. }
  13. println(s)
  14. println("*************************process2**********************")
  15. }
  16. override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
  17. println("----------------spark structure streaming end---------------------")
  18. val jedis=RedisConnection.getConnection()
  19. if(jedis!=null){
  20. jedis.close()
  21. }
  22. }
  23. }

Spark Structured Streaming

  1. import org.apache.spark.sql.SparkSession
  2. import org.apache.spark.sql.streaming.OutputMode
  3. object WordCount {
  4. def main(args: Array[String]): Unit = {
  5. val spark=SparkSession.builder()
  6. .master("local")
  7. .appName("wordcount")
  8. .getOrCreate()
  9. spark.sparkContext.setLogLevel("INFO")
  10. spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))
  11. val kafkaConfig=Map("kafka.bootstrap.servers" -> "localhost:9092","subscribe"->"mytopic")
  12. val jedis=RedisConnection.getConnection()
  13. val key="offset"
  14. val startingOffsets=if(jedis.exists(key)) jedis.get(key) else "latest"
  15. val df = spark
  16. .readStream
  17. .format("kafka")
  18. .option("kafka.bootstrap.servers", kafkaConfig("kafka.bootstrap.servers"))
  19. .option("subscribe", kafkaConfig("subscribe"))
  20. .option("startingOffsets",startingOffsets)
  21. .option("kafka.max.partition.fetch.bytes", 1048576 * 4)
  22. .load()
  23. val query = df.writeStream
  24. .outputMode(OutputMode.Append())
  25. .format("console")
  26. .option("truncate",true)
  27. .start()
  28. println("-------------x1------------------")
  29. val s=query.lastProgress
  30. println(s)
  31. val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
  32. var isStop = false
  33. while (!isStop) {
  34. println("-----------------等待stop---------------------")
  35. isStop = query.awaitTermination(intervalMills)
  36. if (!isStop && jedis.exists("stop_test")&& "stop" == jedis.get("stop_test")) {
  37. println("2秒后开始关闭sparstreaming程序.....")
  38. Thread.sleep(2000)
  39. query.stop()
  40. }
  41. }
  42. query.awaitTermination()
  43. }
  44. }