1. 实现

当task执行failed时,发送邮件

  1. import org.apache.spark._
  2. import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
  3. class SparkAppListener(conf: SparkConf) extends SparkListener{
  4. override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
  5. val info = taskEnd.taskInfo
  6. val AppName = conf.get("spark.app.name")
  7. val taskId = info.taskId
  8. val stageId = taskEnd.stageId
  9. if (info != null && taskEnd.stageAttemptId != -1) {
  10. val errorMessage: Option[String] =
  11. taskEnd.reason match {
  12. case e: ExceptionFailure =>
  13. Some(e.toErrorString)
  14. case e: TaskFailedReason =>
  15. Some(e.toErrorString)
  16. case _ => None
  17. }
  18. if (errorMessage.nonEmpty) {
  19. if (conf.getBoolean("enableSendEmailOnTaskFail", false)) {
  20. val messageInfo = "please note: ******AppName:"+AppName+"******\n"+
  21. "******stageId:"+stageId+"******\n"+
  22. "******taskId:"+taskId+"******\n"+ errorMessage.get
  23. val args = Array("wangsheying@qq.com", "spark任务监控", messageInfo)
  24. try {
  25. Message.sendMail(args)
  26. } catch {
  27. case e: Exception => e.printStackTrace()
  28. }
  29. }
  30. }
  31. }
  32. }
  33. }