数据血缘(Lineage)
贡献者:@morefreeze
注意: Lineage 支持是实验性的,可能随时会发生变化。
Airflow 可以帮助跟踪数据的来源,以及数据发生了什么变化。 这有助于实现审计跟踪和数据治理,还可以调试数据流。
Airflow 通过任务的 inlets 和 outlets 跟踪数据。 让我们通过一个例子看看它是如何工作的。
from airflow.operators.bash_operator import BashOperatorfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.lineage.datasets import Filefrom airflow.models import DAGfrom datetime import timedeltaFILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]args = {'owner': 'airflow' ,'start_date': airflow.utils.dates.days_ago(2)}dag = DAG(dag_id='example_lineage', default_args=args,schedule_interval='0 0 * * *',dagrun_timeout=timedelta(minutes=60))f_final = File("/tmp/final")run_this_last = DummyOperator(task_id='run_this_last', dag=dag,inlets={"auto": True},outlets={"datasets": [f_final,]})f_in = File("/tmp/whole_directory/")outlets = []for file in FILE_CATEGORIES:f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file))outlets.append(f_out)run_this = BashOperator(task_id='run_me_first', bash_command='echo 1', dag=dag,inlets={"datasets": [f_in,]},outlets={"datasets": outlets})run_this.set_downstream(run_this_last)
任务定义了参数inlets和outlets。 inlets可以是一个数据集列表{"datesets":[dataset1,dataset2]},也可以是指定的上游任务outlets像这样{"task_ids":["task_id1","task_id2"]},或者不想指定直接用{"auto":True}也可以,甚至是前面几种的组合。 outlets 也是一个数据集列表{"datesets":[dataset1,dataset2]}。 在运行任务时,数据集的字段会被模板渲染。
注意: 只要 Operator 支持,它会自动地加上 inlets 和 outlets。
在示例 DAG 任务中, run_me_first是一个 BashOperator,它接收CAT1, CAT2, CAT3作为 inlets(译注:根据代码,应为“输出 outlets”)。 其中的execution_date会在任务运行时被渲染成执行时间。
注意: 在底层,Airflow 会在
pre_execute方法中准备 lineage 元数据。 当任务运行结束时,会调用post_execute将 lineage 元数据推送到 XCOM 中。 因此,如果您要创建自己的 Operator,并且需要覆写这些方法,确保分别用prepare_lineage和apply_lineage装饰这些方法。
Apache Atlas
Airflow 可以将 lineage 元数据发送到 Apache Atlas。 您需要在airflow.cfg中配置atlas:
[lineage]backend = airflow.lineage.backend.atlas[atlas]username = my_usernamepassword = my_passwordhost = hostport = 21000
请确保已经安装了atlasclient。
