原文:https://www.applydatascience.com/airflow/writing-your-first-pipeline/
编写 Airflow DAG 的步骤
DAG 文件,基本上只是一个 Python 脚本,是一个配置文件,将 DAG 的结构指定为代码。
编写 Airflow DAG 或工作流只需要记住 5 个步骤:
- 第一步: 导入模块
- 第二步: 默认参数
- 步骤 3: 实例化 DAG
- 第四步: 任务
- 第五步: 设置依赖关系
第一步: 导入模块
导入工作流所需的 Python 依赖项
from datetime import timedeltaimport airflowfrom airflow import DAGfrom airflow.operators.bash_operator import BashOperator
第二步: 默认参数
定义默认参数和特定 DAG 的参数
default_args = {'owner': 'airflow','start_date': airflow.utils.dates.days_ago(2),# 'end_date': datetime(2018, 12, 30),'depends_on_past': False,'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,# If a task fails, retry it once after waiting# at least 5 minutes'retries': 1,'retry_delay': timedelta(minutes=5),}
第三步: 实例化 DAG
给出 DAG 名称,配置时间表,并设置 DAG 设置
dag = DAG('tutorial',default_args=default_args,description='A simple tutorial DAG',# Continue to run DAG once per dayschedule_interval=timedelta(days=1),)
这里有几个选项可以用于您的日程安排时间间隔。您可以选择使用一些预设参数或类似 cron 的参数:
| preset | meaning | cron |
|---|---|---|
None |
Don’t schedule, use for exclusively “externally triggered” DAGs |
|
@once |
Schedule once and only once | |
@hourly |
Run once an hour at the beginning of the hour | 0 * * * * |
@daily |
Run once a day at midnight | 0 0 * * * |
@weekly |
Run once a week at midnight on Sunday morning | 0 0 * * 0 |
@monthly |
Run once a month at midnight of the first day of the month |
0 0 1 * * |
@yearly |
Run once a year at midnight of January 1 | 0 0 1 1 * |
示例用法:
- 每日日程安排:
schedule_interval='@daily'schedule_interval='0 0 * * *'
第四步: 任务
下一步是布局工作流中的所有任务。
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
dag=dag,
)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
第五步: 设置依赖关系
- 设置依赖项或执行任务的顺序。
- 以下是定义它们之间依赖关系的几种方法:
# This means that t2 will depend on t1
# running successfully to run.
t1.set_downstream(t2)
# similar to above where t3 will depend on t1
t3.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
回顾
- 基本上,DAG 只是一个 Python 文件,用于组织任务并设置它们的执行上下文。DAGs 不执行任何实际计算。
- 相反,任务是 Airflow 的元素,实际上是 “做我们想要执行的工作”。编写配置并按特定顺序组织任务以创建完整的数据管道是你的工作。
- 你最后的 DAG 会是这样的:
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial',
default_args=default_args,
schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks
# created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
