1. 设置监控
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("yarn")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.default.parallelism", "300")
.config("spark.sql.shuffle.partitions", "200")
.config("enableSendEmailOnTaskFail", "true")
.config("spark.extraListeners", "com.wsy.util.SparkAppListener")
.getOrCreate()
spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))
2. 代码实现
import org.apache.spark.sql.streaming.StreamingQueryListener
class SparkStreamingQueryListener(appName: String) extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
val mailTo = "cloud@huawei.com"
val mailTittle = s"Spark Structured Streaming Application ${appName} Exception"
val mailText = event.exception.getOrElse("OK")
val txt = Array(mailTo, mailTittle, mailText)
Message.sendMail(txt)
}
}