AirFlow 日常任务的跑批,常常伴随着大量日志文件的产生,尤其是在任务失败尝试重启的时候,日志文件会变得异常地大。这些文件如果不能及时清理,很可能导致服务器磁盘拉爆,下面我们编写一个每天自动清理日志的任务,只保留近一个月的日志。

同时,MySQL 执行日志也应该定期进行清理,不然 MySQL 中 Airflow 库占用空间会越来越大,放任不管的话,迟早有一天 MySQL 库会崩掉。

1. 日志占用磁盘大小

先来看一下运行日志会占用我们磁盘多大的空间,可以看到轻轻松松就会占用 140G 数据占用了,这还只是一个月的数据,所以每天清理日志数据是非常有必要的。

  1. $ du -sh ~/bigdata/airflow/logs
  2. 140G logs/

2. 清理本地调度文件

AirFlow 任务运行的日志文件分为两部分,一部分为本地日志文件,它保存在 $AIRFLOW_HOME/logs/scheduler 文件夹下,通常你查看运行日志时的数字,浏览器就会下载一份文件(这份文件便是存储在 logs/scheduler 文件夹下的)。
image.png

实现的代码也很简单,就是使用 BashOperator 执行一段删除代码即可,这里设置为只保存近 30 天的日志文件,其余的都删掉。

  1. from airflow.operators.bash_operator import BashOperator
  2. airflow_home = os.environ.get("AIRFLOW_HOME")
  3. logging_dir = os.path.join(airflow_home, "logs", "scheduler")
  4. clean_templated = """rm -rf %s/{{ macros.ds_add(ds, -30) }}""" % logging_dir
  5. cleanup_op = BashOperator(
  6. task_id="cleanup_logging",
  7. bash_command=clean_templated,
  8. dag=dag)

3. 清理MySQL库中的执行日志

借助于 MySqlOperator 来清理 MySQL 中 Airflow 执行日志,前提是 Airflow 的数据库使用的是 MySQL。

⭐ 值得注意的是,如果有任务的执行周期是 1 个月或者 2 个月执行一次,需要修改 ds_add(ds, -7)

  1. from airflow.providers.mysql.operators.mysql import MySqlOperator
  2. tk = {
  3. "xcom":"execution_date",
  4. "task_instance":"execution_date",
  5. "task_fail":"execution_date",
  6. "sla_miss":"execution_date",
  7. "log":"execution_date",
  8. "job":"start_date",
  9. "dag_run":"execution_date"
  10. }
  11. for tb in tk:
  12. del_op = MySqlOperator(
  13. task_id=f"del_{tb}",
  14. sql="DELETE FROM %s WHERE %s < '{{ ds_add(ds, -7) }}'" % (tb, tk[tb]),
  15. dag=dag,
  16. )

3. 完整脚本

以下为完整的清理脚本

  1. import os
  2. from airflow import DAG
  3. from datetime import timedelta
  4. from airflow.utils.dates import days_ago
  5. from airflow.operators.bash_operator import BashOperator
  6. from airflow.providers.mysql.operators.mysql import MySqlOperator
  7. airflow_home = os.environ.get("AIRFLOW_HOME")
  8. logging_dir = os.path.join(airflow_home, "logs", "scheduler")
  9. clean_templated = "rm -rf %s/{{ macros.ds_add(ds, -30) }}" % logging_dir
  10. __doc__ = "自动清理DAG日志"
  11. default_args = {
  12. 'owner': 'yumingmin',
  13. 'depends_on_past': False,
  14. 'email': ['yu_mingm623@163.com'],
  15. 'email_on_failure': False,
  16. 'email_on_retry': False,
  17. 'retries': 1,
  18. 'retry_delay': timedelta(minutes=30),
  19. 'catchup': False
  20. }
  21. dag = DAG(
  22. 'cleanup_logging',
  23. default_args=default_args,
  24. description='Cleanup logging automatically',
  25. schedule_interval=timedelta(days=1),
  26. start_date=days_ago(1),
  27. tags=['prod'],
  28. )
  29. cleanup_op = BashOperator(
  30. task_id="cleanup_logging",
  31. bash_command=clean_templated,
  32. dag=dag)
  33. tk = {
  34. "xcom":"execution_date",
  35. "task_instance":"execution_date",
  36. "task_fail":"execution_date",
  37. "sla_miss":"execution_date",
  38. "log":"execution_date",
  39. "job":"start_date",
  40. "dag_run":"execution_date"
  41. }
  42. for tb in tk:
  43. del_op = MySqlOperator(
  44. task_id=f"del_{tb}",
  45. sql="DELETE FROM %s WHERE %s < '{{ ds_add(ds, -30) }}'" % (tb, tk[tb]),
  46. dag=dag,
  47. )
  48. cleanup_op >> del_op
  49. # Documentation for Dag and Task
  50. dag.doc_md = dedent(
  51. """
  52. # 自动清理DAG日志
  53. - 仅仅保留<font color='red'>30</font>天内的运行日志,其余都会被删除
  54. - 脚本使用 Bash 作为媒介来清除 logs 下 schedule 相关日期文件夹
  55. """
  56. )

,开启该任务就会每天清理前 31 天