使用 Python 来发送邮件比较简单,将此功能集成在 AirFlow 中也不难,但其实 AirFlow 本身就给我们已经提供了这样的 Operator,以便我们可以发送邮件。
1. 配置SMTP
AirFlow 也是借助 SMTP 服务来发送邮件,在 airflow.cfg
文件中,我们可以定义所需要的 SMTP 的信息。
[smtp]
smtp_host = smtp.yumingmin.com
smtp_starttls = True
smtp_ssl = False
smtp_user =
smtp_password =
smtp_port = 25
smtp_mail_from = airflow@airflow.com
smtp_timeout = 30
smtp_retry_limit = 5
2. 使用EmailOperator
先来看一下 EmailOperator
具有有哪些参数:
- to (list or string (comma or semicolon delimited)) – list of emails to send the email to. (templated)
- subject (str) – subject line for the email. (templated)
- html_content (str) – content of the email, html markup is allowed. (templated)
- files (list) – file names to attach in email (templated)
- cc (list or string (comma or semicolon delimited)) – list of recipients to be added in CC field
- bcc (list or string (comma or semicolon delimited)) – list of recipients to be added in BCC field
- mime_subtype (str) – MIME sub content type
- mime_charset (str) – character set parameter added to the Content-Type header.
from datetime import timedelta
from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'yumingmin',
'depends_on_past': False,
'email': ['yumingmin@xinye.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=10),
'catchup': False
}
dag = DAG(
'send_email_demo',
default_args=default_args,
description='send email using EmailOperator',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
tags=['demo'],
)
send_email_op = EmailOperator(
task_id='send_email',
to=["yumingmin@xinye.com"],
subject="This is test email {{ ds }}",
html_content="Hello Airflow!",
dag=dag
)
send_email_zh_op = EmailOperator(
task_id='send_email_zsh',
to=["yumingmin@xinye.com"],
subject="中文标题 {{ ds }}",
html_content="中文正文",
dag=dag
)
send_email_op >> send_email_zh_op
3. 在任务失败或重启时发送警告邮件
参考文档:https://airflow.apache.org/docs/apache-airflow/stable/howto/email-config.html
在 airflow.cfg
中配置:
[email]
email_backend = airflow.utils.email.send_email_smtp
email_conn_id = smtp_default
default_email_on_retry = True
default_email_on_failure = True
subject_template = /home/yumingmin/bigdata/airflow/templates/email/subject_template_default
html_content_template = /home/yumingmin/bigdata/airflow/templates/email/html_content_template_default
在 subject_template_file
文件中填入:
{{ ti.dag_id }} / {{ ti.task_id }}告警邮件
在 html_content_template_file
文件中填入:
Dag ID: {{ ti.dag_id }}<br>
Task ID: {{ ti.task_id }}<br>
Try {{ try_number }} out of {{ max_tries + 1 }}<br>
Exception: <br>{{ exception_html }}<br>
Log: <a href="{{ ti.log_url }}">Link</a><br>
Host: {{ ti.hostname }}<br>
Log file: {{ ti.log_filepath }}<br>
Mark success: <a href="{{ ti.mark_success_url }}">Link</a><br>
关于 ti
的参数可以参考:https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/index.html