Note: Lineage support is very experimental and subject to change.
Airflow can help track origins of data, what happens to it and where it moves over time. This can aid having audit trails and data governance, but also debugging of data flows.
Airflow tracks data by means of inlets(入口) and outlets of the tasks. Let’s work from an example and see how it works.
from airflow.operators.bash import BashOperatorfrom airflow.operators.dummy import DummyOperatorfrom airflow.lineage import AUTOfrom airflow.lineage.entities import Filefrom airflow.models import DAGfrom airflow.utils.dates import days_agofrom datetime import timedeltaFILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]args = {'owner': 'airflow','start_date': days_ago(2)}dag = DAG(dag_id='example_lineage',default_args=args,schedule_interval='0 0 * * *',dagrun_timeout=timedelta(minutes=60))f_final = File(url="/tmp/final")run_this_last = DummyOperator(task_id='run_this_last',dag=dag,inlets=AUTO,outlets=f_final)f_in = File(url="/tmp/whole_directory/")outlets = []for file in FILE_CATEGORIES:f_out = File(url="/tmp/{}/{{{{ execution_date }}}}".format(file))outlets.append(f_out)run_this = BashOperator(task_id='run_me_first',bash_command='echo 1',dag=dag,inlets=f_in,outlets=outlets)run_this.set_downstream(run_this_last)
Inlets can be a (list of) upstream task ids or statically defined as an attr annotated object as is, for example, the File object. Outlets can only be attr annotated object. Both are rendered at run time. However the outlets of a task in case they are inlets to another task will not be re-rendered for the downstream task.
