依赖包
        <dependency>            <groupId>redis.clients</groupId>            <artifactId>jedis</artifactId>            <version>3.1.0</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>            <version>2.3.0</version>        </dependency>
redis连接池
import org.apache.spark.internal.Loggingimport redis.clients.jedis.Jedisobject RedisConnection extends Logging{    private var conn: Jedis = null    private var count = 0    def getConnection(): Jedis = {        if (conn == null) {            this.synchronized {                if (conn == null) {                    createConnection()                }            }        }        logInfo(s"---------------ConnectionPool.getConnection count=${count}-----------")        conn    }    private def createConnection():Unit={        count = count + 1        conn = new Jedis("localhost",6379)        conn.auth("123456")        conn.select(3)        logInfo(s"---------------ConnectionPool.createConnection count=${count}-----------")    }    def main(args: Array[String]): Unit = {        val jedis= new Jedis("localhost",6379)        jedis.auth("123456")        jedis.select(3)        println(jedis.get("offset"))        jedis.close()    }}
StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListenerclass SparkStreamingQueryListener(appName: String) extends StreamingQueryListener {    override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {        println("----------------spark structure streaming start---------------------")    }    override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {        println("*************************process1**********************")        val s=event.progress.sources.last.startOffset        if(s!=null){            RedisConnection.getConnection().set("test_offset1",s)            println("*******************写入redis********************")        }        println(s)        println("*************************process2**********************")    }    override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {        println("----------------spark structure streaming end---------------------")        val jedis=RedisConnection.getConnection()        if(jedis!=null){            jedis.close()        }    }}
Spark Structured Streaming
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.streaming.OutputModeobject WordCount {    def main(args: Array[String]): Unit = {        val spark=SparkSession.builder()                .master("local")                .appName("wordcount")                .getOrCreate()        spark.sparkContext.setLogLevel("INFO")        spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))        val kafkaConfig=Map("kafka.bootstrap.servers" -> "localhost:9092","subscribe"->"mytopic")        val jedis=RedisConnection.getConnection()        val key="offset"        val startingOffsets=if(jedis.exists(key)) jedis.get(key) else "latest"        val df = spark                .readStream                .format("kafka")                .option("kafka.bootstrap.servers", kafkaConfig("kafka.bootstrap.servers"))                .option("subscribe", kafkaConfig("subscribe"))                .option("startingOffsets",startingOffsets)                .option("kafka.max.partition.fetch.bytes", 1048576 * 4)                .load()        val query = df.writeStream                .outputMode(OutputMode.Append())                .format("console")                .option("truncate",true)                .start()        println("-------------x1------------------")        val s=query.lastProgress        println(s)          val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在        var isStop = false        while (!isStop) {            println("-----------------等待stop---------------------")            isStop = query.awaitTermination(intervalMills)            if (!isStop && jedis.exists("stop_test")&& "stop" == jedis.get("stop_test")) {                println("2秒后开始关闭sparstreaming程序.....")                Thread.sleep(2000)                query.stop()            }        }        query.awaitTermination()    }}