1. 实现
当task执行failed时,发送邮件
import org.apache.spark._
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
class SparkAppListener(conf: SparkConf) extends SparkListener{
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
val AppName = conf.get("spark.app.name")
val taskId = info.taskId
val stageId = taskEnd.stageId
if (info != null && taskEnd.stageAttemptId != -1) {
val errorMessage: Option[String] =
taskEnd.reason match {
case e: ExceptionFailure =>
Some(e.toErrorString)
case e: TaskFailedReason =>
Some(e.toErrorString)
case _ => None
}
if (errorMessage.nonEmpty) {
if (conf.getBoolean("enableSendEmailOnTaskFail", false)) {
val messageInfo = "please note: ******AppName:"+AppName+"******\n"+
"******stageId:"+stageId+"******\n"+
"******taskId:"+taskId+"******\n"+ errorMessage.get
val args = Array("wangsheying@qq.com", "spark任务监控", messageInfo)
try {
Message.sendMail(args)
} catch {
case e: Exception => e.printStackTrace()
}
}
}
}
}
}