💥使用EmailOperator发送邮件 - 图1
使用 Python 来发送邮件比较简单,将此功能集成在 AirFlow 中也不难,但其实 AirFlow 本身就给我们已经提供了这样的 Operator,以便我们可以发送邮件。

1. 配置SMTP

AirFlow 也是借助 SMTP 服务来发送邮件,在 airflow.cfg 文件中,我们可以定义所需要的 SMTP 的信息。

  1. [smtp]
  2. smtp_host = smtp.yumingmin.com
  3. smtp_starttls = True
  4. smtp_ssl = False
  5. smtp_user =
  6. smtp_password =
  7. smtp_port = 25
  8. smtp_mail_from = airflow@airflow.com
  9. smtp_timeout = 30
  10. smtp_retry_limit = 5

2. 使用EmailOperator

官方文档:https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/email/index.html?highlight=email#module-airflow.operators.email

先来看一下 EmailOperator 具有有哪些参数:

  • to (list or string (comma or semicolon delimited)) – list of emails to send the email to. (templated)
  • subject (str) – subject line for the email. (templated)
  • html_content (str) – content of the email, html markup is allowed. (templated)
  • files (list) – file names to attach in email (templated)
  • cc (list or string (comma or semicolon delimited)) – list of recipients to be added in CC field
  • bcc (list or string (comma or semicolon delimited)) – list of recipients to be added in BCC field
  • mime_subtype (str) – MIME sub content type
  • mime_charset (str) – character set parameter added to the Content-Type header.
  1. from datetime import timedelta
  2. from airflow import DAG
  3. from airflow.operators.email import EmailOperator
  4. from airflow.utils.dates import days_ago
  5. default_args = {
  6. 'owner': 'yumingmin',
  7. 'depends_on_past': False,
  8. 'email': ['yumingmin@xinye.com'],
  9. 'email_on_failure': False,
  10. 'email_on_retry': False,
  11. 'retries': 1,
  12. 'retry_delay': timedelta(minutes=10),
  13. 'catchup': False
  14. }
  15. dag = DAG(
  16. 'send_email_demo',
  17. default_args=default_args,
  18. description='send email using EmailOperator',
  19. schedule_interval=timedelta(days=1),
  20. start_date=days_ago(1),
  21. tags=['demo'],
  22. )
  23. send_email_op = EmailOperator(
  24. task_id='send_email',
  25. to=["yumingmin@xinye.com"],
  26. subject="This is test email {{ ds }}",
  27. html_content="Hello Airflow!",
  28. dag=dag
  29. )
  30. send_email_zh_op = EmailOperator(
  31. task_id='send_email_zsh',
  32. to=["yumingmin@xinye.com"],
  33. subject="中文标题 {{ ds }}",
  34. html_content="中文正文",
  35. dag=dag
  36. )
  37. send_email_op >> send_email_zh_op

经过测试,发送中文的邮件也是没有问题的。

3. 在任务失败或重启时发送警告邮件

参考文档:https://airflow.apache.org/docs/apache-airflow/stable/howto/email-config.html

airflow.cfg 中配置:

  1. [email]
  2. email_backend = airflow.utils.email.send_email_smtp
  3. email_conn_id = smtp_default
  4. default_email_on_retry = True
  5. default_email_on_failure = True
  6. subject_template = /home/yumingmin/bigdata/airflow/templates/email/subject_template_default
  7. html_content_template = /home/yumingmin/bigdata/airflow/templates/email/html_content_template_default

subject_template_file 文件中填入:

  1. {{ ti.dag_id }} / {{ ti.task_id }}告警邮件

html_content_template_file 文件中填入:

  1. Dag ID: {{ ti.dag_id }}<br>
  2. Task ID: {{ ti.task_id }}<br>
  3. Try {{ try_number }} out of {{ max_tries + 1 }}<br>
  4. Exception: <br>{{ exception_html }}<br>
  5. Log: <a href="{{ ti.log_url }}">Link</a><br>
  6. Host: {{ ti.hostname }}<br>
  7. Log file: {{ ti.log_filepath }}<br>
  8. Mark success: <a href="{{ ti.mark_success_url }}">Link</a><br>

关于 ti 的参数可以参考:https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/index.html