原文文档:http://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html

Airflow can be set up to send metrics to StatsD.

1. Setup

First you must install statsd requirement:

  1. $ pip install 'apache-airflow[statsd]'

Add the following lines to your configuration file e.g. airflow.cfg

  1. [metrics]
  2. statsd_on = True
  3. statsd_host = localhost
  4. statsd_port = 8125
  5. statsd_prefix = airflow

If you want to avoid sending all the available metrics to StatsD, you can configure an allow list of prefixes to send only the metrics that start with the elements of the list:

  1. [metrics]
  2. statsd_allow_list = scheduler,executor,dagrun

If you want to redirect metrics to different name, you can configure stat_name_handler option in [scheduler] section. It should point to a function that validate the statsd stat name, apply changes to the stat name if necessary and return the transformed stat name. The function may looks as follow:

  1. def my_custom_stat_name_handler(stat_name: str) -> str:
  2. return stat_name.lower()[:32]

If you want to use a custom Statsd client outwith the default one provided by Airflow the following key must be added to the configuration file alongside the module path of your custom Statsd client. This module must be available on your PYTHONPATH.

  1. [metrics]
  2. statsd_custom_client_path = x.y.customclient


See Modules Management for details on how Python and Airflow manage modules.

Note: For a detailed listing of configuration options regarding metrics, see the configuration reference documentation - [metrics].

2. Counters

Name Description
_start Number of started job, ex. SchedulerJob, LocalTaskJob
_end Number of ended job, ex. SchedulerJob, LocalTaskJob
_heartbeat_failure Number of failed Heartbeats for a job, ex. SchedulerJob, LocalTaskJob
operatorfailures Operator failures
operatorsuccesses Operator successes
ti_failures Overall task instances failures
ti_successes Overall task instances successes
previously_succeeded Number of previously succeeded task instances
zombies_killed Zombie tasks killed
scheduler_heartbeat Scheduler heartbeats
dag_processing.processes Number of currently running DAG parsing processes
dag_processing.manager_stalls Number of stalled DagFileProcessorManager
dag_file_refresh_error Number of failures loading any DAG files
scheduler.tasks.killed_externally Number of tasks killed externally
scheduler.orphaned_tasks.cleared Number of Orphaned tasks cleared by the Scheduler
scheduler.orphaned_tasks.adopted Number of Orphaned tasks adopted by the Scheduler
scheduler.critical_section_busy Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process.
sla_email_notification_failure Number of failed SLA miss email notification attempts
ti.start.. Number of started task in a given dag. Similar to _start but for task
ti.finish... Number of completed task in a given dag. Similar to _end but for task
dag.callback_exceptions Number of exceptions raised from DAG callbacks. When this happens, it means DAG callback is not working.
celery.task_timeout_error Number of AirflowTaskTimeout errors raised when publishing Task to Celery Broker.
task_removed_from_dag. Number of tasks removed for a given dag (i.e. task no longer exists in DAG)
task_restored_to_dag. Number of tasks restored for a given dag (i.e. task instance which was previously in REMOVED state in the DB is added to DAG file)
task_instance_created- Number of tasks instances created for a given Operator

3. Gauges

Name Description
dagbag_size DAG bag size
dag_processing.import_errors Number of errors from trying to parse DAG files
dag_processing.total_parse_time Seconds taken to scan and import all DAG files once
dag_processing.last_runtime. Seconds spent processing (in most recent iteration)
dag_processing.last_run.seconds_ago. Seconds since was last processed
dag_processing.processor_timeouts Number of file processors that have been killed due to taking too long
scheduler.tasks.without_dagrun Number of tasks without DagRuns or with DagRuns not in Running state
scheduler.tasks.running Number of tasks running in executor
scheduler.tasks.starving Number of tasks that cannot be scheduled because of no open slot in pool
scheduler.tasks.executable Number of tasks that are ready for execution (set to queued) with respect to pool limits, dag concurrency, executor state, and priority.
executor.open_slots Number of open slots on executor
executor.queued_tasks Number of queued tasks on executor
executor.running_tasks Number of running tasks on executor
pool.open_slots. Number of open slots in the pool
pool.queued_slots. Number of queued slots in the pool
pool.running_slots. Number of running slots in the pool
pool.starving_tasks. Number of starving tasks in the pool
smart_sensor_operator.poked_tasks Number of tasks poked by the smart sensor in the previous poking loop
smart_sensor_operator.poked_success Number of newly succeeded tasks poked by the smart sensor in the previous poking loop
smart_sensor_operator.poked_exception Number of exceptions in the previous smart sensor poking loop
smart_sensor_operator.exception_failures Number of failures caused by exception in the previous smart sensor poking loop
smart_sensor_operator.infra_failures Number of infrastructure failures in the previous smart sensor poking loop

4. Timers

Name Description
dagrun.dependency-check. Milliseconds taken to check DAG dependencies
dag...duration Milliseconds taken to finish a task
dag_processing.last_duration. Milliseconds taken to load the given DAG file
dagrun.duration.success. Milliseconds taken for a DagRun to reach success state
dagrun.duration.failed. Milliseconds taken for a DagRun to reach failed state
dagrun.schedule_delay. Milliseconds of delay between the scheduled DagRun start date and the actual DagRun start date
scheduler.critical_section_duration Milliseconds spent in the critical section of scheduler loop – only a single scheduler can enter this loop at a time
dagrun..first_task_scheduling_delay Milliseconds elapsed between first task start_date and dagrun expected start
collect_db_dags Milliseconds taken for fetching all Serialized Dags from DB