贡献者:@morefreeze

注意:
Lineage 支持是实验性的,可能随时会发生变化。

Airflow 可以帮助跟踪数据的来源,以及数据发生了什么变化。 这有助于实现审计跟踪和数据治理,还可以调试数据流。

Airflow 通过任务的 inlets 和 outlets 跟踪数据。 让我们通过一个例子看看它是如何工作的。

  1. from airflow.operators.bash_operator import BashOperator
  2. from airflow.operators.dummy_operator import DummyOperator
  3. from airflow.lineage.datasets import File
  4. from airflow.models import DAG
  5. from datetime import timedelta
  6. FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
  7. args = {
  8. 'owner': 'airflow' ,
  9. 'start_date': airflow.utils.dates.days_ago(2)
  10. }
  11. dag = DAG(
  12. dag_id='example_lineage', default_args=args,
  13. schedule_interval='0 0 * * *',
  14. dagrun_timeout=timedelta(minutes=60))
  15. f_final = File("/tmp/final")
  16. run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
  17. inlets={"auto": True},
  18. outlets={"datasets": [f_final,]})
  19. f_in = File("/tmp/whole_directory/")
  20. outlets = []
  21. for file in FILE_CATEGORIES:
  22. f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file))
  23. outlets.append(f_out)
  24. run_this = BashOperator(
  25. task_id='run_me_first', bash_command='echo 1', dag=dag,
  26. inlets={"datasets": [f_in,]},
  27. outlets={"datasets": outlets}
  28. )
  29. run_this.set_downstream(run_this_last)

任务定义了参数inletsoutletsinlets可以是一个数据集列表{"datesets":[dataset1,dataset2]},也可以是指定的上游任务outlets像这样{"task_ids":["task_id1","task_id2"]},或者不想指定直接用{"auto":True}也可以,甚至是前面几种的组合。 outlets 也是一个数据集列表{"datesets":[dataset1,dataset2]}。 在运行任务时,数据集的字段会被模板渲染。

注意:
只要 Operator 支持,它会自动地加上 inlets 和 outlets。

在示例 DAG 任务中, run_me_first是一个 BashOperator,它接收CAT1, CAT2, CAT3作为 inlets(译注:根据代码,应为“输出 outlets”)。 其中的execution_date会在任务运行时被渲染成执行时间。

注意:
在底层,Airflow 会在pre_execute方法中准备 lineage 元数据。 当任务运行结束时,会调用post_execute将 lineage 元数据推送到 XCOM 中。 因此,如果您要创建自己的 Operator,并且需要覆写这些方法,确保分别用prepare_lineageapply_lineage装饰这些方法。

Apache Atlas

Airflow 可以将 lineage 元数据发送到 Apache Atlas。 您需要在airflow.cfg中配置atlas

  1. [lineage]
  2. backend = airflow.lineage.backend.atlas
  3. [atlas]
  4. username = my_username
  5. password = my_password
  6. host = host
  7. port = 21000

请确保已经安装了atlasclient