🚀 原文链接:https://airflow.apache.org/docs/apache-airflow/stable/scheduler.html
The Airflow scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete. Behind the scenes, the scheduler spins up a subprocess, which monitors and stays in sync with all DAGs in the specified DAG directory. Once per minute, by default, the scheduler collects DAG parsing results and checks whether any active tasks can be triggered.
The Airflow scheduler is designed to run as a persistent service in an Airflow production environment. To kick it off, all you need to do is execute the airflow scheduler
command. It uses the configuration specified in airflow.cfg
.
The scheduler uses the configured Executor to run tasks that are ready.
To start a scheduler, simply run the command:
$ airflow scheduler
Your DAGs will start executing once the scheduler is running successfully.
:::tips
🔖 Note
:::
:::info
The first DAG Run is created based on the minimum start_date
for the tasks in your DAG. Subsequent DAG Runs are created by the scheduler process, based on your DAG’s schedule_interval
, sequentially.
:::
The scheduler won’t trigger your tasks until the period it covers has ended e.g., A job with schedule_interval
set as @daily
runs after the day has ended. This technique makes sure that whatever data is required for that period is fully available before the dag is executed. In the UI, it appears as if Airflow is running your tasks a day late
:::tips
🔖 Note
:::
:::info
If you run a DAG on a schedule_interval
of one day, the run with execution_date
2019-11-21 triggers soon after 2019-11-21T23:59.
Let’s Repeat That, the scheduler runs your job one schedule_interval
AFTER the start date, at the END of the period.
You should refer to DAG Runs for details on scheduling a DAG. :::
1. Triggering DAG with Future Date
If you want to use external trigger to run future-dated execution dates, set allow_trigger_in_future = True
in scheduler
section in airflow.cfg
. This only has effect if your DAG has no schedule_interval
. If you keep default allow_trigger_in_future = False
and try external trigger to run future-dated execution dates, the scheduler won’t execute it now but the scheduler will execute it in the future once the current date rolls over to the execution date.
2. Running More Than One Scheduler
Airflow supports running more than one scheduler concurrently(同时地) — both for performance reasons and for resiliency.
2.1 Overview
The HA scheduler is designed to take advantage of the existing metadata database. This was primarily done for operational simplicity: every component already has to speak to this DB, and by not using direct communication or consensus algorithm between schedulers (Raft, Paxos, etc.) nor another consensus(共识) tool (Apache Zookeeper, or Consul for instance) we have kept the operational surface area to a minimum.
The scheduler now uses the serialized DAG representation to make its scheduling decisions and the rough outline of the scheduling loop is:
- Check for any DAGs needing a new DagRun, and create them
- Examine a batch of DagRuns for schedulable TaskInstances or complete DagRuns
- Select schedulable TaskInstances, and whilst respecting Pool limits and other concurrency limits, enqueue them for execution
This does however place some requirements on the Database.
2.2 Database Requirements
To maintain performance and throughput there is one part of the scheduling loop that does a number of calculations in memory (because having to round-trip to the DB for each TaskInstance would be too slow) so we need to ensure that only a single scheduler is in this critical section at once - otherwise limits would not be correctly respected. To achieve this we use database row-level locks (using SELECT ... FOR UPDATE
).
This critical section is where TaskInstances go from scheduled state and are enqueued to the executor, whilst ensuring the various concurrency and pool limits are respected. The critical section is obtained by asking for a row-level write lock on every row of the Pool table (roughly equivalent to SELECT * FROM slot_pool FOR UPDATE NOWAIT
but the exact query is slightly different).
The following databases are fully supported and provide an “optimal” experience:
- PostgreSQL 9.6+
- MySQL 8+
:::warning
🚧 Warning:
MariaDB does not implement the SKIP LOCKED
or NOWAIT
SQL clauses (see MDEV-13115). Without these features running multiple schedulers is not supported and deadlock errors have been reported.
MySQL 5.x also does not support SKIP LOCKED
or NOWAIT
, and additionally is more prone to deciding queries are deadlocked, so running with more than a single scheduler on MySQL 5.x is not supported or recommended.
:::
:::tips 🔖 Note:Microsoft SQLServer has not been tested with HA. :::
2.3 Scheduler Tuneables
The following config settings can be used to control aspects of the Scheduler HA loop.
- max_dagruns_to_create_per_loop
This changes the number of dags that are locked by each scheduler when creating dag runs. One possible reason for setting this lower is if you have huge dags and are running multiple schedules, you won’t want one scheduler to do all the work. - max_dagruns_per_loop_to_schedule
How many DagRuns should a scheduler examine (and lock) when scheduling and queuing tasks. Increasing this limit will allow more throughput for smaller DAGs but will likely slow down throughput for larger (>500 tasks for example) DAGs. Setting this too high when using multiple schedulers could also lead to one scheduler taking all the dag runs leaving no work for the others. - use_row_level_locking
Should the scheduler issueSELECT ... FOR UPDATE
in relevant queries. If this is set toFalse
then you should not run more than a single scheduler at once - pool_metrics_interval
How often (in seconds) should pool usage stats be sent to statsd (ifstatsd_on
is enabled). This is a relatively expensive query to compute this, so this should be set to match the same period as your statsd roll-up period. - clean_tis_without_dagrun_interval
How often should each scheduler run a check to “clean up“ TaskInstance rows that are found to no longer have a matching DagRun row.
In normal operation the scheduler won’t do this, it is only possible to do this by deleting rows via the UI, or directly in the DB. You can set this lower if this check is not important to you — tasks will be left in what ever state they are until the cleanup happens, at which point they will be set to failed.
How often (in seconds) should the scheduler check for orphaned tasks or dead SchedulerJobs.
This setting controls how a dead scheduler will be noticed and the tasks it was “supervising” get picked up by another scheduler. (The tasks will stay running, so there is no harm in not detecting this for a while.)
When a SchedulerJob is detected as “dead” (as determined by scheduler_health_check_threshold) any running or queued tasks that were launched by the dead process will be “adopted” and monitored by this scheduler instead.