原文:https://www.applydatascience.com/airflow/writing-your-first-pipeline/

编写 Airflow DAG 的步骤

DAG 文件,基本上只是一个 Python 脚本,是一个配置文件,将 DAG 的结构指定为代码。
编写 Airflow DAG 或工作流只需要记住 5 个步骤:

  • 第一步: 导入模块
  • 第二步: 默认参数
  • 步骤 3: 实例化 DAG
  • 第四步: 任务
  • 第五步: 设置依赖关系

第一步: 导入模块

导入工作流所需的 Python 依赖项

  1. from datetime import timedelta
  2. import airflow
  3. from airflow import DAG
  4. from airflow.operators.bash_operator import BashOperator

第二步: 默认参数

定义默认参数和特定 DAG 的参数

  1. default_args = {
  2. 'owner': 'airflow',
  3. 'start_date': airflow.utils.dates.days_ago(2),
  4. # 'end_date': datetime(2018, 12, 30),
  5. 'depends_on_past': False,
  6. 'email': ['airflow@example.com'],
  7. 'email_on_failure': False,
  8. 'email_on_retry': False,
  9. # If a task fails, retry it once after waiting
  10. # at least 5 minutes
  11. 'retries': 1,
  12. 'retry_delay': timedelta(minutes=5),
  13. }

第三步: 实例化 DAG

给出 DAG 名称,配置时间表,并设置 DAG 设置

  1. dag = DAG(
  2. 'tutorial',
  3. default_args=default_args,
  4. description='A simple tutorial DAG',
  5. # Continue to run DAG once per day
  6. schedule_interval=timedelta(days=1),
  7. )

这里有几个选项可以用于您的日程安排时间间隔。您可以选择使用一些预设参数或类似 cron 的参数:

preset meaning cron
None Don’t schedule, use for exclusively “externally
triggered” DAGs
@once Schedule once and only once
@hourly Run once an hour at the beginning of the hour 0 * * * *
@daily Run once a day at midnight 0 0 * * *
@weekly Run once a week at midnight on Sunday morning 0 0 * * 0
@monthly Run once a month at midnight of the first day
of the month
0 0 1 * *
@yearly Run once a year at midnight of January 1 0 0 1 1 *

示例用法:

  • 每日日程安排:
    • schedule_interval='@daily'
    • schedule_interval='0 0 * * *'

第四步: 任务

下一步是布局工作流中的所有任务。

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)
t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    dag=dag,
)
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

第五步: 设置依赖关系

  • 设置依赖项或执行任务的顺序。
  • 以下是定义它们之间依赖关系的几种方法:
# This means that t2 will depend on t1
# running successfully to run.
t1.set_downstream(t2)
# similar to above where t3 will depend on t1
t3.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

回顾

  • 基本上,DAG 只是一个 Python 文件,用于组织任务并设置它们的执行上下文。DAGs 不执行任何实际计算。
  • 相反,任务是 Airflow 的元素,实际上是 “做我们想要执行的工作”。编写配置并按特定顺序组织任务以创建完整的数据管道是你的工作。
  • 你最后的 DAG 会是这样的:
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
    'tutorial', 
    default_args=default_args, 
    schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks 
# created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)
t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)
templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""
t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)