当我们将任务在 Airflow 上部署好之后,很多时候我们都需要回溯历史一段时间(比如近6个月)的数据,但之前的刷数据方式都是先通过修改 Admin ➜ Variable 中的任务日期变量,然后手动重新跑任务,这种情况回溯个 10 天数据还可以,但如果回溯近半年的数据真的是要累死,而且每次都要等任务运行完之后再去修改,以便下一次手动跑任务,真的是苦不堪言。
1. 写回溯历史的脚本
上面说到,部署上线的 DAG 任务是不能直接回溯历史的,这里一个比较笨的方式就是,将上线代码中函数整理出来,写一个 for
循环脚本来刷数据(因为任务中大概率都是包含多个 task
,所以循环中也用不了多进程),这种方式也可以达到刷数据的目的,就是每次需要编写一个回溯脚本,还是有点麻烦的。
2. Backfill回溯数据
如果你看过 Airflow 的官网文档,在其 Quick Start 中就有提到 backfill
,且有一个示例代码,回溯 example_bash_operator
任务 2021-05-01 至 2021-05-02 的数据:
$ airflow dags backfill example_bash_operator -s 2015-01-01 -e 2015-01-02
2.1 tutorial任务回溯
再来看一个官方 tutorial
示例代码:
from datetime import timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beggining of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
t1 >> [t2, t3]
我们使用 backfill
来回溯这个任务的 2天数据,看看什么效果。
$ airflow dags backfill tutorial.py -s 2021-06-01 -e 2021-06-02
任务运行完之后,查看一下 2021-06-01 任务执行的日志:
# print_date
[2021-08-05 14:01:46,636] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'date']
[2021-08-05 14:01:46,642] {subprocess.py:74} INFO - Output:
[2021-08-05 14:01:46,643] {subprocess.py:78} INFO - Thu Aug 5 14:01:46 CST 2021 ⭐
# templated
[2021-08-05 14:02:09,081] {subprocess.py:63} INFO - Running command: ['bash', '-c', '\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n']
[2021-08-05 14:02:09,087] {subprocess.py:74} INFO - Output:
[2021-08-05 14:02:09,088] {subprocess.py:78} INFO - 2021-06-01 ⭐
[2021-08-05 14:02:09,088] {subprocess.py:78} INFO - 2021-06-08
着重观察一下两个 ⭐ 处,对于 print_date
task,我们看到日志中打印的还是 2021-08-05 的日期,而对于 templated
task,日志中如我们所期望那样,打印出了 2021-06-01 日期,所以正确回溯历史的关键点在于 {{ ds }}
变量如何获取。
2.2 编写测试脚本
因为我们绝大多数的任务都是使用 PythonOperator
来进行了,而示例中是一个 BashOperator
,接下来我们编写测试脚本,看如何在正确在代码中获取该变量。
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def print_date():
print("today is: {{ ds }}")
print("today is: {{ macros.ds_add(ds, 7) }}")
default_args = {
'owner': 'yumingmin',
'depends_on_past': False,
'email': ['yu_mingm623@163.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'catchup': False
}
dag = DAG(
'backfill_dags',
default_args=default_args,
description='run backfill dags correctly',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
tags=['demo'],
)
print_date_op = PythonOperator(
task_id="print_date",
python_callable=print_date,
dag=dag)
回溯一下该任务 2021-06-01 至 2021-06-02 的数据:
$ airflow dags backfill backfil_dags.py -s 2021-06-01 -e 2021-06-02
📋输出日志:
[2021-08-05 15:27:53,181] {logging_mixin.py:104} INFO - today is: {{ ds }}
[2021-08-05 15:27:53,181] {logging_mixin.py:104} INFO - today is: {{ macros.ds_add(ds, 7) }}
[2021-08-05 15:27:53,181] {python.py:151} INFO - Done. Returned value was: None
2.3 example_python_operator示例
示例中还有一个 example_python_operator
DAG,让我们来看看有什么可参考的东西。
def print_context(ds, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
python_callable=print_context,
)
print_context
函数中接收了一个参数 ds
,好像我们已经发现了怎么去做了。
2.4 第二版测试脚本
def print_date(ds, **kwargs):
print("today is: ", datetime.now().strftime("%Y-%m-%d"))
print("today is: ", kwargs["run_date"])
print("today is: ", ds)
print("today is: {{ ds }}")
print("today is: {{ macros.ds_add(ds, 7) }}")
print_date_op = PythonOperator(
task_id="print_date",
python_callable=print_date,
op_kwargs={"run_date": "{{ ds }}"},
dag=dag)
再次回溯一下相关数据,--reset-dagsruns
表示重新运行已经完成状态的 DAG:
$ airflow dags backfill backfil_dags.py -s 2021-06-01 -e 2021-06-02 --reset-dagruns
📋再次查看一下输入日志:
[2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: 2021-08-05
[2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: 2021-06-01
[2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: 2021-06-01
[2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: {{ ds }}
[2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: {{ macros.ds_add(ds, 7) }}
Wow,可以了,正确接收到该变量了,完美🤟。
同样,我们可以看到有两种方式可以接收回溯的日期,另外一种就是 op_kwargs={"run_date": "{{ ds }}"}
的传参方式。
值得注意的是,在 Airflow 1.x 版本中,是不可以直接收
ds
参数的。
2.5 按照顺序依次回溯
使用上述方式,会导致同一时间触发太多的任务并行跑,我们可以通过设置 DAG 的 max_active_runs=1
来限定每次只跑一个任务,同时需要 depends_on_past
设置为 True
。
default_args = {
'owner': 'yumingmin',
'depends_on_past': True,
'email': ['yumingmin@airflow.com'],
'start_date': days_ago(1),
'retries': 60,
'retry_delay': timedelta(minutes=5),
'catchup': False,
}
dag = DAG(
dag_id=config["project"]["task_name"],
default_args=default_args,
description=config["project"]["task_name_zh"],
schedule_interval='@daily',
max_active_runs=1,
tags=['model'],
)
完成上述操作后,就可以根据给定时间范围,依次回溯历史数据了。
3. 扩展
3.1 backfill命令
❯ airflow dags backfill -h
usage: airflow dags backfill [-h] [-c CONF] [--delay-on-limit DELAY_ON_LIMIT]
[-x] [-n] [-e END_DATE] [-i] [-I] [-l] [-m]
[--pool POOL] [--rerun-failed-tasks]
[--reset-dagruns] [-B] [-s START_DATE]
[-S SUBDIR] [-t TASK_REGEX] [-v] [-y]
dag_id
Run subsections of a DAG for a specified date range. If reset_dag_run option is used, backfill will first prompt users whether airflow should clear all the previous dag_run and task_instances within the backfill date range. If rerun_failed_tasks is used, backfill will auto re-run the previous failed task instances within the backfill date range
positional arguments:
dag_id The id of the dag
optional arguments:
-h, --help show this help message and exit
-c CONF, --conf CONF JSON string that gets pickled into the DagRun's conf attribute
--delay-on-limit DELAY_ON_LIMIT
Amount of time in seconds to wait when the limit on maximum active dag runs (max_active_runs) has been reached before trying to execute a dag run again
-x, --donot-pickle Do not attempt to pickle the DAG object to send over to the workers, just tell the workers to run their version of the code
-n, --dry-run Perform a dry run for each task. Only renders Template Fields for each task, nothing else
-e END_DATE, --end-date END_DATE
Override end_date YYYY-MM-DD
-i, --ignore-dependencies
Skip upstream tasks, run only the tasks matching the regexp. Only works in conjunction with task_regex
-I, --ignore-first-depends-on-past
Ignores depends_on_past dependencies for the first set of tasks only (subsequent executions in the backfill DO respect depends_on_past)
-l, --local Run the task using the LocalExecutor
-m, --mark-success Mark jobs as succeeded without running them
--pool POOL Resource pool to use
--rerun-failed-tasks if set, the backfill will auto-rerun all the failed tasks for the backfill date range instead of throwing exceptions
⭐--reset-dagruns if set, the backfill will delete existing backfill-related DAG runs and start anew with fresh, running DAG runs
⭐-B, --run-backwards if set, the backfill will run tasks from the most recent day first. if there are tasks that depend_on_past this option will throw an exception
-s START_DATE, --start-date START_DATE
Override start_date YYYY-MM-DD
-S SUBDIR, --subdir SUBDIR
File location or directory from which to look for the dag. Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg'
-t TASK_REGEX, --task-regex TASK_REGEX
The regex to filter specific task_ids to backfill (optional)
-v, --verbose Make logging output more verbose
-y, --yes Do not prompt to confirm reset. Use with care!
--run-backwards
参数表示倒序执行,对于日期任务有特殊要求可以使用下这个参数。
3.2 macros宏
参考文档:https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html