Note: Lineage support is very experimental and subject to change.

    Airflow can help track origins of data, what happens to it and where it moves over time. This can aid having audit trails and data governance, but also debugging of data flows.

    Airflow tracks data by means of inlets(入口) and outlets of the tasks. Let’s work from an example and see how it works.

    1. from airflow.operators.bash import BashOperator
    2. from airflow.operators.dummy import DummyOperator
    3. from airflow.lineage import AUTO
    4. from airflow.lineage.entities import File
    5. from airflow.models import DAG
    6. from airflow.utils.dates import days_ago
    7. from datetime import timedelta
    8. FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
    9. args = {
    10. 'owner': 'airflow',
    11. 'start_date': days_ago(2)
    12. }
    13. dag = DAG(
    14. dag_id='example_lineage',
    15. default_args=args,
    16. schedule_interval='0 0 * * *',
    17. dagrun_timeout=timedelta(minutes=60))
    18. f_final = File(url="/tmp/final")
    19. run_this_last = DummyOperator(
    20. task_id='run_this_last',
    21. dag=dag,
    22. inlets=AUTO,
    23. outlets=f_final)
    24. f_in = File(url="/tmp/whole_directory/")
    25. outlets = []
    26. for file in FILE_CATEGORIES:
    27. f_out = File(url="/tmp/{}/{{{{ execution_date }}}}".format(file))
    28. outlets.append(f_out)
    29. run_this = BashOperator(
    30. task_id='run_me_first',
    31. bash_command='echo 1',
    32. dag=dag,
    33. inlets=f_in,
    34. outlets=outlets
    35. )
    36. run_this.set_downstream(run_this_last)


    Inlets can be a (list of) upstream task ids or statically defined as an attr annotated object as is, for example, the File object. Outlets can only be attr annotated object. Both are rendered at run time. However the outlets of a task in case they are inlets to another task will not be re-rendered for the downstream task.