当我们将任务在 Airflow 上部署好之后,很多时候我们都需要回溯历史一段时间(比如近6个月)的数据,但之前的刷数据方式都是先通过修改 AdminVariable 中的任务日期变量,然后手动重新跑任务,这种情况回溯个 10 天数据还可以,但如果回溯近半年的数据真的是要累死,而且每次都要等任务运行完之后再去修改,以便下一次手动跑任务,真的是苦不堪言。

1. 写回溯历史的脚本

上面说到,部署上线的 DAG 任务是不能直接回溯历史的,这里一个比较笨的方式就是,将上线代码中函数整理出来,写一个 for 循环脚本来刷数据(因为任务中大概率都是包含多个 task,所以循环中也用不了多进程),这种方式也可以达到刷数据的目的,就是每次需要编写一个回溯脚本,还是有点麻烦的。

2. Backfill回溯数据

如果你看过 Airflow 的官网文档,在其 Quick Start 中就有提到 backfill ,且有一个示例代码,回溯 example_bash_operator 任务 2021-05-01 至 2021-05-02 的数据:

  1. $ airflow dags backfill example_bash_operator -s 2015-01-01 -e 2015-01-02

2.1 tutorial任务回溯

再来看一个官方 tutorial 示例代码:

  1. from datetime import timedelta
  2. from textwrap import dedent
  3. from airflow import DAG
  4. from airflow.operators.bash import BashOperator
  5. from airflow.utils.dates import days_ago
  6. default_args = {
  7. 'owner': 'airflow',
  8. 'depends_on_past': False,
  9. 'email': ['airflow@example.com'],
  10. 'email_on_failure': False,
  11. 'email_on_retry': False,
  12. 'retries': 1,
  13. 'retry_delay': timedelta(minutes=5),
  14. }
  15. with DAG(
  16. 'tutorial',
  17. default_args=default_args,
  18. description='A simple tutorial DAG',
  19. schedule_interval=timedelta(days=1),
  20. start_date=days_ago(2),
  21. tags=['example'],
  22. ) as dag:
  23. # t1, t2 and t3 are examples of tasks created by instantiating operators
  24. t1 = BashOperator(
  25. task_id='print_date',
  26. bash_command='date',
  27. )
  28. t2 = BashOperator(
  29. task_id='sleep',
  30. depends_on_past=False,
  31. bash_command='sleep 5',
  32. retries=3,
  33. )
  34. t1.doc_md = dedent(
  35. """\
  36. #### Task Documentation
  37. You can document your task using the attributes `doc_md` (markdown),
  38. `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
  39. rendered in the UI's Task Instance Details page.
  40. ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
  41. """
  42. )
  43. dag.doc_md = __doc__ # providing that you have a docstring at the beggining of the DAG
  44. dag.doc_md = """
  45. This is a documentation placed anywhere
  46. """ # otherwise, type it like this
  47. templated_command = dedent(
  48. """
  49. {% for i in range(5) %}
  50. echo "{{ ds }}"
  51. echo "{{ macros.ds_add(ds, 7)}}"
  52. echo "{{ params.my_param }}"
  53. {% endfor %}
  54. """
  55. )
  56. t3 = BashOperator(
  57. task_id='templated',
  58. depends_on_past=False,
  59. bash_command=templated_command,
  60. params={'my_param': 'Parameter I passed in'},
  61. )
  62. t1 >> [t2, t3]

我们使用 backfill 来回溯这个任务的 2天数据,看看什么效果。

  1. $ airflow dags backfill tutorial.py -s 2021-06-01 -e 2021-06-02

任务运行完之后,查看一下 2021-06-01 任务执行的日志:

  1. # print_date
  2. [2021-08-05 14:01:46,636] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'date']
  3. [2021-08-05 14:01:46,642] {subprocess.py:74} INFO - Output:
  4. [2021-08-05 14:01:46,643] {subprocess.py:78} INFO - Thu Aug 5 14:01:46 CST 2021
  5. # templated
  6. [2021-08-05 14:02:09,081] {subprocess.py:63} INFO - Running command: ['bash', '-c', '\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n\n echo "2021-06-01"\n echo "2021-06-08"\n echo "Parameter I passed in"\n']
  7. [2021-08-05 14:02:09,087] {subprocess.py:74} INFO - Output:
  8. [2021-08-05 14:02:09,088] {subprocess.py:78} INFO - 2021-06-01
  9. [2021-08-05 14:02:09,088] {subprocess.py:78} INFO - 2021-06-08

着重观察一下两个 ⭐ 处,对于 print_date task,我们看到日志中打印的还是 2021-08-05 的日期,而对于 templated task,日志中如我们所期望那样,打印出了 2021-06-01 日期,所以正确回溯历史的关键点在于 {{ ds }} 变量如何获取。

2.2 编写测试脚本

因为我们绝大多数的任务都是使用 PythonOperator 来进行了,而示例中是一个 BashOperator ,接下来我们编写测试脚本,看如何在正确在代码中获取该变量。

  1. from datetime import timedelta, datetime
  2. from airflow import DAG
  3. from airflow.operators.bash import BashOperator
  4. from airflow.operators.python import PythonOperator
  5. from airflow.utils.dates import days_ago
  6. def print_date():
  7. print("today is: {{ ds }}")
  8. print("today is: {{ macros.ds_add(ds, 7) }}")
  9. default_args = {
  10. 'owner': 'yumingmin',
  11. 'depends_on_past': False,
  12. 'email': ['yu_mingm623@163.com'],
  13. 'email_on_failure': False,
  14. 'email_on_retry': False,
  15. 'retries': 1,
  16. 'retry_delay': timedelta(minutes=5),
  17. 'catchup': False
  18. }
  19. dag = DAG(
  20. 'backfill_dags',
  21. default_args=default_args,
  22. description='run backfill dags correctly',
  23. schedule_interval=timedelta(days=1),
  24. start_date=days_ago(1),
  25. tags=['demo'],
  26. )
  27. print_date_op = PythonOperator(
  28. task_id="print_date",
  29. python_callable=print_date,
  30. dag=dag)

回溯一下该任务 2021-06-01 至 2021-06-02 的数据:

  1. $ airflow dags backfill backfil_dags.py -s 2021-06-01 -e 2021-06-02

📋输出日志:

  1. [2021-08-05 15:27:53,181] {logging_mixin.py:104} INFO - today is: {{ ds }}
  2. [2021-08-05 15:27:53,181] {logging_mixin.py:104} INFO - today is: {{ macros.ds_add(ds, 7) }}
  3. [2021-08-05 15:27:53,181] {python.py:151} INFO - Done. Returned value was: None

显然并不能正确获取到想要的日期😅。

2.3 example_python_operator示例

示例中还有一个 example_python_operator DAG,让我们来看看有什么可参考的东西。

  1. def print_context(ds, **kwargs):
  2. """Print the Airflow context and ds variable from the context."""
  3. pprint(kwargs)
  4. print(ds)
  5. return 'Whatever you return gets printed in the logs'
  6. run_this = PythonOperator(
  7. task_id='print_the_context',
  8. python_callable=print_context,
  9. )

print_context 函数中接收了一个参数 ds,好像我们已经发现了怎么去做了。

2.4 第二版测试脚本

  1. def print_date(ds, **kwargs):
  2. print("today is: ", datetime.now().strftime("%Y-%m-%d"))
  3. print("today is: ", kwargs["run_date"])
  4. print("today is: ", ds)
  5. print("today is: {{ ds }}")
  6. print("today is: {{ macros.ds_add(ds, 7) }}")
  7. print_date_op = PythonOperator(
  8. task_id="print_date",
  9. python_callable=print_date,
  10. op_kwargs={"run_date": "{{ ds }}"},
  11. dag=dag)

再次回溯一下相关数据,--reset-dagsruns 表示重新运行已经完成状态的 DAG:

  1. $ airflow dags backfill backfil_dags.py -s 2021-06-01 -e 2021-06-02 --reset-dagruns

📋再次查看一下输入日志:

  1. [2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: 2021-08-05
  2. [2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: 2021-06-01
  3. [2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: 2021-06-01
  4. [2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: {{ ds }}
  5. [2021-08-05 15:04:44,060] {logging_mixin.py:104} INFO - today is: {{ macros.ds_add(ds, 7) }}

Wow,可以了,正确接收到该变量了,完美🤟。

同样,我们可以看到有两种方式可以接收回溯的日期,另外一种就是 op_kwargs={"run_date": "{{ ds }}"} 的传参方式。

值得注意的是,在 Airflow 1.x 版本中,是不可以直接收 ds 参数的。

2.5 按照顺序依次回溯

使用上述方式,会导致同一时间触发太多的任务并行跑,我们可以通过设置 DAG 的 max_active_runs=1 来限定每次只跑一个任务,同时需要 depends_on_past 设置为 True

  1. default_args = {
  2. 'owner': 'yumingmin',
  3. 'depends_on_past': True,
  4. 'email': ['yumingmin@airflow.com'],
  5. 'start_date': days_ago(1),
  6. 'retries': 60,
  7. 'retry_delay': timedelta(minutes=5),
  8. 'catchup': False,
  9. }
  10. dag = DAG(
  11. dag_id=config["project"]["task_name"],
  12. default_args=default_args,
  13. description=config["project"]["task_name_zh"],
  14. schedule_interval='@daily',
  15. max_active_runs=1,
  16. tags=['model'],
  17. )

完成上述操作后,就可以根据给定时间范围,依次回溯历史数据了。

3. 扩展

3.1 backfill命令

  1. airflow dags backfill -h
  2. usage: airflow dags backfill [-h] [-c CONF] [--delay-on-limit DELAY_ON_LIMIT]
  3. [-x] [-n] [-e END_DATE] [-i] [-I] [-l] [-m]
  4. [--pool POOL] [--rerun-failed-tasks]
  5. [--reset-dagruns] [-B] [-s START_DATE]
  6. [-S SUBDIR] [-t TASK_REGEX] [-v] [-y]
  7. dag_id
  8. Run subsections of a DAG for a specified date range. If reset_dag_run option is used, backfill will first prompt users whether airflow should clear all the previous dag_run and task_instances within the backfill date range. If rerun_failed_tasks is used, backfill will auto re-run the previous failed task instances within the backfill date range
  9. positional arguments:
  10. dag_id The id of the dag
  11. optional arguments:
  12. -h, --help show this help message and exit
  13. -c CONF, --conf CONF JSON string that gets pickled into the DagRun's conf attribute
  14. --delay-on-limit DELAY_ON_LIMIT
  15. Amount of time in seconds to wait when the limit on maximum active dag runs (max_active_runs) has been reached before trying to execute a dag run again
  16. -x, --donot-pickle Do not attempt to pickle the DAG object to send over to the workers, just tell the workers to run their version of the code
  17. -n, --dry-run Perform a dry run for each task. Only renders Template Fields for each task, nothing else
  18. -e END_DATE, --end-date END_DATE
  19. Override end_date YYYY-MM-DD
  20. -i, --ignore-dependencies
  21. Skip upstream tasks, run only the tasks matching the regexp. Only works in conjunction with task_regex
  22. -I, --ignore-first-depends-on-past
  23. Ignores depends_on_past dependencies for the first set of tasks only (subsequent executions in the backfill DO respect depends_on_past)
  24. -l, --local Run the task using the LocalExecutor
  25. -m, --mark-success Mark jobs as succeeded without running them
  26. --pool POOL Resource pool to use
  27. --rerun-failed-tasks if set, the backfill will auto-rerun all the failed tasks for the backfill date range instead of throwing exceptions
  28. ⭐--reset-dagruns if set, the backfill will delete existing backfill-related DAG runs and start anew with fresh, running DAG runs
  29. ⭐-B, --run-backwards if set, the backfill will run tasks from the most recent day first. if there are tasks that depend_on_past this option will throw an exception
  30. -s START_DATE, --start-date START_DATE
  31. Override start_date YYYY-MM-DD
  32. -S SUBDIR, --subdir SUBDIR
  33. File location or directory from which to look for the dag. Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg'
  34. -t TASK_REGEX, --task-regex TASK_REGEX
  35. The regex to filter specific task_ids to backfill (optional)
  36. -v, --verbose Make logging output more verbose
  37. -y, --yes Do not prompt to confirm reset. Use with care!

--run-backwards 参数表示倒序执行,对于日期任务有特殊要求可以使用下这个参数。

3.2 macros宏

参考文档:https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html