
使用 Python 来发送邮件比较简单,将此功能集成在 AirFlow 中也不难,但其实 AirFlow 本身就给我们已经提供了这样的 Operator,以便我们可以发送邮件。
1. 配置SMTP
AirFlow 也是借助 SMTP 服务来发送邮件,在 airflow.cfg 文件中,我们可以定义所需要的 SMTP 的信息。
[smtp]smtp_host = smtp.yumingmin.comsmtp_starttls = Truesmtp_ssl = Falsesmtp_user =smtp_password =smtp_port = 25smtp_mail_from = airflow@airflow.comsmtp_timeout = 30smtp_retry_limit = 5
2. 使用EmailOperator
先来看一下 EmailOperator 具有有哪些参数:
- to (list or string (comma or semicolon delimited)) – list of emails to send the email to. (templated)
- subject (str) – subject line for the email. (templated)
- html_content (str) – content of the email, html markup is allowed. (templated)
- files (list) – file names to attach in email (templated)
- cc (list or string (comma or semicolon delimited)) – list of recipients to be added in CC field
- bcc (list or string (comma or semicolon delimited)) – list of recipients to be added in BCC field
- mime_subtype (str) – MIME sub content type
- mime_charset (str) – character set parameter added to the Content-Type header.
from datetime import timedeltafrom airflow import DAGfrom airflow.operators.email import EmailOperatorfrom airflow.utils.dates import days_agodefault_args = {'owner': 'yumingmin','depends_on_past': False,'email': ['yumingmin@xinye.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=10),'catchup': False}dag = DAG('send_email_demo',default_args=default_args,description='send email using EmailOperator',schedule_interval=timedelta(days=1),start_date=days_ago(1),tags=['demo'],)send_email_op = EmailOperator(task_id='send_email',to=["yumingmin@xinye.com"],subject="This is test email {{ ds }}",html_content="Hello Airflow!",dag=dag)send_email_zh_op = EmailOperator(task_id='send_email_zsh',to=["yumingmin@xinye.com"],subject="中文标题 {{ ds }}",html_content="中文正文",dag=dag)send_email_op >> send_email_zh_op
3. 在任务失败或重启时发送警告邮件
参考文档:https://airflow.apache.org/docs/apache-airflow/stable/howto/email-config.html
在 airflow.cfg 中配置:
[email]email_backend = airflow.utils.email.send_email_smtpemail_conn_id = smtp_defaultdefault_email_on_retry = Truedefault_email_on_failure = Truesubject_template = /home/yumingmin/bigdata/airflow/templates/email/subject_template_defaulthtml_content_template = /home/yumingmin/bigdata/airflow/templates/email/html_content_template_default
在 subject_template_file 文件中填入:
{{ ti.dag_id }} / {{ ti.task_id }}告警邮件
在 html_content_template_file 文件中填入:
Dag ID: {{ ti.dag_id }}<br>Task ID: {{ ti.task_id }}<br>Try {{ try_number }} out of {{ max_tries + 1 }}<br>Exception: <br>{{ exception_html }}<br>Log: <a href="{{ ti.log_url }}">Link</a><br>Host: {{ ti.hostname }}<br>Log file: {{ ti.log_filepath }}<br>Mark success: <a href="{{ ti.mark_success_url }}">Link</a><br>
关于 ti 的参数可以参考:https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/index.html
