AirFlow 日常任务的跑批,常常伴随着大量日志文件的产生,尤其是在任务失败尝试重启的时候,日志文件会变得异常地大。这些文件如果不能及时清理,很可能导致服务器磁盘拉爆,下面我们编写一个每天自动清理日志的任务,只保留近一个月的日志。
同时,MySQL 执行日志也应该定期进行清理,不然 MySQL 中 Airflow 库占用空间会越来越大,放任不管的话,迟早有一天 MySQL 库会崩掉。
1. 日志占用磁盘大小
先来看一下运行日志会占用我们磁盘多大的空间,可以看到轻轻松松就会占用 140G 数据占用了,这还只是一个月的数据,所以每天清理日志数据是非常有必要的。
$ du -sh ~/bigdata/airflow/logs
140G logs/
2. 清理本地调度文件
AirFlow 任务运行的日志文件分为两部分,一部分为本地日志文件,它保存在 $AIRFLOW_HOME/logs/scheduler
文件夹下,通常你查看运行日志时的数字,浏览器就会下载一份文件(这份文件便是存储在 logs/scheduler
文件夹下的)。
实现的代码也很简单,就是使用 BashOperator
执行一段删除代码即可,这里设置为只保存近 30 天的日志文件,其余的都删掉。
from airflow.operators.bash_operator import BashOperator
airflow_home = os.environ.get("AIRFLOW_HOME")
logging_dir = os.path.join(airflow_home, "logs", "scheduler")
clean_templated = """rm -rf %s/{{ macros.ds_add(ds, -30) }}""" % logging_dir
cleanup_op = BashOperator(
task_id="cleanup_logging",
bash_command=clean_templated,
dag=dag)
3. 清理MySQL库中的执行日志
借助于 MySqlOperator
来清理 MySQL 中 Airflow 执行日志,前提是 Airflow 的数据库使用的是 MySQL。
⭐ 值得注意的是,如果有任务的执行周期是 1 个月或者 2 个月执行一次,需要修改
ds_add(ds, -7)
from airflow.providers.mysql.operators.mysql import MySqlOperator
tk = {
"xcom":"execution_date",
"task_instance":"execution_date",
"task_fail":"execution_date",
"sla_miss":"execution_date",
"log":"execution_date",
"job":"start_date",
"dag_run":"execution_date"
}
for tb in tk:
del_op = MySqlOperator(
task_id=f"del_{tb}",
sql="DELETE FROM %s WHERE %s < '{{ ds_add(ds, -7) }}'" % (tb, tk[tb]),
dag=dag,
)
3. 完整脚本
以下为完整的清理脚本
import os
from airflow import DAG
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
airflow_home = os.environ.get("AIRFLOW_HOME")
logging_dir = os.path.join(airflow_home, "logs", "scheduler")
clean_templated = "rm -rf %s/{{ macros.ds_add(ds, -30) }}" % logging_dir
__doc__ = "自动清理DAG日志"
default_args = {
'owner': 'yumingmin',
'depends_on_past': False,
'email': ['yu_mingm623@163.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=30),
'catchup': False
}
dag = DAG(
'cleanup_logging',
default_args=default_args,
description='Cleanup logging automatically',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
tags=['prod'],
)
cleanup_op = BashOperator(
task_id="cleanup_logging",
bash_command=clean_templated,
dag=dag)
tk = {
"xcom":"execution_date",
"task_instance":"execution_date",
"task_fail":"execution_date",
"sla_miss":"execution_date",
"log":"execution_date",
"job":"start_date",
"dag_run":"execution_date"
}
for tb in tk:
del_op = MySqlOperator(
task_id=f"del_{tb}",
sql="DELETE FROM %s WHERE %s < '{{ ds_add(ds, -30) }}'" % (tb, tk[tb]),
dag=dag,
)
cleanup_op >> del_op
# Documentation for Dag and Task
dag.doc_md = dedent(
"""
# 自动清理DAG日志
- 仅仅保留<font color='red'>30</font>天内的运行日志,其余都会被删除
- 脚本使用 Bash 作为媒介来清除 logs 下 schedule 相关日期文件夹
"""
)
,开启该任务就会每天清理前 31 天