1. 设置监控

  1. val spark = SparkSession.builder()
  2. .appName(this.getClass.getSimpleName)
  3. .master("yarn")
  4. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  5. .config("spark.default.parallelism", "300")
  6. .config("spark.sql.shuffle.partitions", "200")
  7. .config("enableSendEmailOnTaskFail", "true")
  8. .config("spark.extraListeners", "com.wsy.util.SparkAppListener")
  9. .getOrCreate()
  10. spark.streams.addListener(new SparkStreamingQueryListener(this.getClass.getSimpleName))

2. 代码实现

  1. import org.apache.spark.sql.streaming.StreamingQueryListener
  2. class SparkStreamingQueryListener(appName: String) extends StreamingQueryListener {
  3. override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
  4. override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {}
  5. override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
  6. val mailTo = "cloud@huawei.com"
  7. val mailTittle = s"Spark Structured Streaming Application ${appName} Exception"
  8. val mailText = event.exception.getOrElse("OK")
  9. val txt = Array(mailTo, mailTittle, mailText)
  10. Message.sendMail(txt)
  11. }
  12. }