依赖包
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.3.0</version> </dependency>
redis连接池
import org.apache.spark.internal.Loggingimport redis.clients.jedis.Jedisobject RedisConnection extends Logging{ private var conn: Jedis = null private var count = 0 def getConnection(): Jedis = { if (conn == null) { this.synchronized { if (conn == null) { createConnection() } } } logInfo(s"---------------ConnectionPool.getConnection count=${count}-----------") conn } private def createConnection():Unit={ count = count + 1 conn = new Jedis("localhost",6379) conn.auth("123456") conn.select(3) logInfo(s"---------------ConnectionPool.createConnection count=${count}-----------") } def main(args: Array[String]): Unit = { val jedis= new Jedis("localhost",6379) jedis.auth("123456") jedis.select(3) println(jedis.get("offset")) jedis.close() }}
StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListenerclass SparkStreamingQueryListener(appName: String) extends StreamingQueryListener { override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { println("----------------spark structure streaming start---------------------") } override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { println("*************************process1**********************") val s=event.progress.sources.last.startOffset if(s!=null){ RedisConnection.getConnection().set("test_offset1",s) println("*******************写入redis********************") } println(s) println("*************************process2**********************") } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { println("----------------spark structure streaming end---------------------") val jedis=RedisConnection.getConnection() if(jedis!=null){ jedis.close() } }}
Spark Structured Streaming
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.streaming.OutputModeobject WordCount { def main(args: Array[String]): Unit = { val spark=SparkSession.builder() .master("local") .appName("wordcount") .getOrCreate() spark.sparkContext.setLogLevel("INFO") spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName)) val kafkaConfig=Map("kafka.bootstrap.servers" -> "localhost:9092","subscribe"->"mytopic") val jedis=RedisConnection.getConnection() val key="offset" val startingOffsets=if(jedis.exists(key)) jedis.get(key) else "latest" val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", kafkaConfig("kafka.bootstrap.servers")) .option("subscribe", kafkaConfig("subscribe")) .option("startingOffsets",startingOffsets) .option("kafka.max.partition.fetch.bytes", 1048576 * 4) .load() val query = df.writeStream .outputMode(OutputMode.Append()) .format("console") .option("truncate",true) .start() println("-------------x1------------------") val s=query.lastProgress println(s) val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在 var isStop = false while (!isStop) { println("-----------------等待stop---------------------") isStop = query.awaitTermination(intervalMills) if (!isStop && jedis.exists("stop_test")&& "stop" == jedis.get("stop_test")) { println("2秒后开始关闭sparstreaming程序.....") Thread.sleep(2000) query.stop() } } query.awaitTermination() }}