依赖包
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
redis连接池
import org.apache.spark.internal.Logging
import redis.clients.jedis.Jedis
object RedisConnection extends Logging{
private var conn: Jedis = null
private var count = 0
def getConnection(): Jedis = {
if (conn == null) {
this.synchronized {
if (conn == null) {
createConnection()
}
}
}
logInfo(s"---------------ConnectionPool.getConnection count=${count}-----------")
conn
}
private def createConnection():Unit={
count = count + 1
conn = new Jedis("localhost",6379)
conn.auth("123456")
conn.select(3)
logInfo(s"---------------ConnectionPool.createConnection count=${count}-----------")
}
def main(args: Array[String]): Unit = {
val jedis= new Jedis("localhost",6379)
jedis.auth("123456")
jedis.select(3)
println(jedis.get("offset"))
jedis.close()
}
}
StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener
class SparkStreamingQueryListener(appName: String) extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
println("----------------spark structure streaming start---------------------")
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
println("*************************process1**********************")
val s=event.progress.sources.last.startOffset
if(s!=null){
RedisConnection.getConnection().set("test_offset1",s)
println("*******************写入redis********************")
}
println(s)
println("*************************process2**********************")
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println("----------------spark structure streaming end---------------------")
val jedis=RedisConnection.getConnection()
if(jedis!=null){
jedis.close()
}
}
}
Spark Structured Streaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
object WordCount {
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder()
.master("local")
.appName("wordcount")
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))
val kafkaConfig=Map("kafka.bootstrap.servers" -> "localhost:9092","subscribe"->"mytopic")
val jedis=RedisConnection.getConnection()
val key="offset"
val startingOffsets=if(jedis.exists(key)) jedis.get(key) else "latest"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaConfig("kafka.bootstrap.servers"))
.option("subscribe", kafkaConfig("subscribe"))
.option("startingOffsets",startingOffsets)
.option("kafka.max.partition.fetch.bytes", 1048576 * 4)
.load()
val query = df.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate",true)
.start()
println("-------------x1------------------")
val s=query.lastProgress
println(s)
val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
var isStop = false
while (!isStop) {
println("-----------------等待stop---------------------")
isStop = query.awaitTermination(intervalMills)
if (!isStop && jedis.exists("stop_test")&& "stop" == jedis.get("stop_test")) {
println("2秒后开始关闭sparstreaming程序.....")
Thread.sleep(2000)
query.stop()
}
}
query.awaitTermination()
}
}