1. 设置监控
val spark = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("yarn") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.default.parallelism", "300") .config("spark.sql.shuffle.partitions", "200") .config("enableSendEmailOnTaskFail", "true") .config("spark.extraListeners", "com.wsy.util.SparkAppListener") .getOrCreate()spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))
2. 代码实现
import org.apache.spark.sql.streaming.StreamingQueryListenerclass SparkStreamingQueryListener(appName: String) extends StreamingQueryListener { override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {} override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { val mailTo = "cloud@huawei.com" val mailTittle = s"Spark Structured Streaming Application ${appName} Exception" val mailText = event.exception.getOrElse("OK") val txt = Array(mailTo, mailTittle, mailText) Message.sendMail(txt) }}