An operator represents a single, ideally idempotent(幂等), task. Operators determine what actually executes when your DAG runs. :::info 🔖 Note:
See the Operators Concepts documentation and the Operators API Reference for more information. :::

1. BashOperator

Use the [BashOperator](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/bash/index.html#airflow.operators.bash.BashOperator) to execute commands in a Bash shell.

📑 airflow/example_dags/example_bash_operator.py

  1. run_this = BashOperator(
  2. task_id='run_after_loop',
  3. bash_command='echo 1',
  4. )

1.1 Templating

You can use Jinja templates to parameterize the bash_command argument.

📑 airflow/example_dags/example_bash_operator.py

  1. also_run_this = BashOperator(
  2. task_id='also_run_this',
  3. bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
  4. )

:::info ⚠ Warning:
Care should be taken with “user” input or when using Jinja templates in the bash_command, as this bash operator does not perform any escaping or sanitization(处理) of the command.

This applies mostly to using “dag_run” conf, as that can be submitted via users in the Web UI. Most of the default template variables are not at risk. :::

For example, do not do this:

  1. bash_task = BashOperator(
  2. task_id="bash_task",
  3. bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"',
  4. )

Instead, you should pass this via the env kwarg and use double-quotes inside the bash_command, as below:

  1. bash_task = BashOperator(
  2. task_id="bash_task",
  3. bash_command='echo "here is the message: \'$message\'"',
  4. env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
  5. )

1.2 Troubleshooting

Jinja template not found

Add a space after the script name when directly calling a Bash script with the bash_command argument. This is because Airflow tries to apply a Jinja template to it, which will fail.

  1. t2 = BashOperator(
  2. task_id='bash_example',
  3. # This fails with 'Jinja template not found' error
  4. # bash_command="/home/batcher/test.sh",
  5. # This works (has a space after)
  6. bash_command="/home/batcher/test.sh ",
  7. dag=dag)

However, if you want to use templating in your bash script, do not add the space and instead put your bash script in a location relative to the directory containing the DAG file. So if your DAG file is in /usr/local/airflow/dags/test_dag.py, you can move your test.sh file to any location under /usr/local/airflow/dags/ (Example: /usr/local/airflow/dags/scripts/test.sh) and pass the relative path to bash_command as shown below:

  1. t2 = BashOperator(
  2. task_id='bash_example',
  3. # "scripts" folder is under "/usr/local/airflow/dags"
  4. bash_command="scripts/test.sh",
  5. dag=dag)

Creating separate folder for bash scripts may be desirable for many reasons, like separating your script’s logic and pipeline code, allowing for proper code highlighting in files composed in different languages, and general flexibility in structuring pipelines.

It is also possible to define your template_searchpath as pointing to any folder locations in the DAG constructor call.

Example:

  1. dag = DAG("example_bash_dag", template_searchpath="/opt/scripts")
  2. t2 = BashOperator(
  3. task_id='bash_example',
  4. # "test.sh" is a file under "/opt/scripts"
  5. bash_command="test.sh ",
  6. dag=dag)

2. PythonOperator

Use the [PythonOperator](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html#airflow.operators.python.PythonOperator) to execute Python callables.

📑 airflow/example_dags/example_python_operator.py

  1. def print_context(ds, **kwargs):
  2. """Print the Airflow context and ds variable from the context."""
  3. pprint(kwargs)
  4. print(ds)
  5. return 'Whatever you return gets printed in the logs'
  6. run_this = PythonOperator(
  7. task_id='print_the_context',
  8. python_callable=print_context,
  9. )

2.1 Passing in arguments

Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable.

📑 airflow/example_dags/example_python_operator.py

  1. def my_sleeping_function(random_base):
  2. """This is a function that will run within the DAG execution"""
  3. time.sleep(random_base)
  4. # Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
  5. for i in range(5):
  6. task = PythonOperator(
  7. task_id='sleep_for_' + str(i),
  8. python_callable=my_sleeping_function,
  9. op_kwargs={'random_base': float(i) / 10},
  10. )
  11. run_this >> task

2.2 Templating

Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.

The templates_dict argument is templated, so each value in the dictionary is evaluated as a Jinja template.

3. PythonVirtualenvOperator

Use the [PythonVirtualenvOperator](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html#airflow.operators.python.PythonVirtualenvOperator) to execute Python callables inside a new Python virtual environment.

📑 airflow/example_dags/example_python_operator.py

  1. def callable_virtualenv():
  2. """
  3. Example function that will be performed in a virtual environment.
  4. Importing at the module level ensures that it will not attempt to import the
  5. library before it is installed.
  6. """
  7. from time import sleep
  8. from colorama import Back, Fore, Style
  9. print(Fore.RED + 'some red text')
  10. print(Back.GREEN + 'and with a green background')
  11. print(Style.DIM + 'and in dim text')
  12. print(Style.RESET_ALL)
  13. for _ in range(10):
  14. print(Style.DIM + 'Please wait...', flush=True)
  15. sleep(10)
  16. print('Finished')
  17. virtualenv_task = PythonVirtualenvOperator(
  18. task_id="virtualenv_python",
  19. python_callable=callable_virtualenv,
  20. requirements=["colorama==0.4.0"],
  21. system_site_packages=False,
  22. )

3.1 Passing in arguments

You can use the op_args and op_kwargs arguments the same way you use it in the PythonOperator. Unfortunately we currently do not support to serialize var and ti / task_instance due to incompatibilities with the underlying library. For airflow context variables make sure that you either have access to Airflow through setting system_site_packages to True or add apache-airflow to the requirements argument. Otherwise you won’t have access to the most context variables of Airflow in op_kwargs. If you want the context related to datetime objects like execution_date you can add pendulum and lazy_object_proxy.

3.2 Templating

You can use jinja Templating the same way you use it in PythonOperator.

4. Cross-DAG Dependencies

When two DAGs have dependency relationships, it is worth considering combining them into a single DAG, which is usually simpler to understand. Airflow also offers better visual representation of dependencies for tasks on the same DAG. However, it is sometimes not practical to put all related tasks on the same DAG. For example:

  • Two DAGs may have different schedules. E.g. a weekly DAG may have tasks that depend on other tasks on a daily DAG.
  • Different teams are responsible for different DAGs, but these DAGs have some cross-DAG dependencies.
  • A task may depend on another task on the same DAG, but for a different execution_date.

ExternalTaskSensor can be used to establish such dependencies across different DAGs. When it is used together with ExternalTaskMarker, clearing dependent tasks can also happen across different DAGs.

4.1 ExternalTaskSensor

Use the [ExternalTaskSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor) to make tasks on a DAG wait for another task on a different DAG for a specific execution_date.

ExternalTaskSensor also provide options to set if the Task on a remote DAG succeeded or failed via allowed_states and failed_states parameters.

📑 airflow/example_dags/example_external_task_marker_dag.py

  1. child_task1 = ExternalTaskSensor(
  2. task_id="child_task1",
  3. external_dag_id=parent_dag.dag_id,
  4. external_task_id=parent_task.task_id,
  5. timeout=600,
  6. allowed_states=['success'],
  7. failed_states=['failed', 'skipped'],
  8. mode="reschedule",
  9. )

4.2 ExternalTaskMarker

If it is desirable that whenever parent_task on parent_dag is cleared, child_task1 on child_dag for a specific execution_date should also be cleared, ExternalTaskMarker should be used. Note that child_task1 will only be cleared if “Recursive” is selected when the user clears parent_task.

📑 airflow/example_dags/example_external_task_marker_dag.py

  1. parent_task = ExternalTaskMarker(
  2. task_id="parent_task",
  3. external_dag_id="example_external_task_marker_child",
  4. external_task_id="child_task1",
  5. )