时区

贡献者:@morefreeze

默认情况下启用对时区的支持。 Airflow 在内部处理和数据库中以 UTC 格式存储日期时间信息。 这样您就可以在调度中执行带有时区的 DAG。 目前,Airflow 不支持在 Web UI 中显示最终用户时区,它始终以 UTC 显示。 此外,Operator中使用的模板也不会被转换。 时区信息取决于 DAG 作者如何使用它。

如果您的用户居住在多个时区,并且您希望根据每个用户的挂钟(wall clock)显示日期时间信息,这将非常方便。

即使您只在一个时区运行 Airflow,在数据库中以 UTC 格式存储数据仍然是一种很好的做法(在 Airflow 关心时区问题之前也是如此,这也是推荐的甚至是必需的设置)。 主要原因是夏令时(DST)。 许多国家都有 DST 系统,其中时间在春季向前移动,在秋季向后移动。 如果您在当地工作,那么当发生转换时,您可能每年会遇到两次错误。 (pendulum 和 pytz 文档更详细地讨论了这些问题。)这可能并不影响简单的 DAG,但如果您涉及金融服务中,这可能就变成灾难。

时区在airflow.cfg中设置。 默认情况下,它设置为 utc,您可以将其更改为使用系统设置或任意 IANA 时区,例如Europe/Amsterdam (阿姆斯特丹)。 安装 Airflow 时会安装 Pendulum,它比 pytz 更精确,也会让您更容易处理时区问题。

请注意,Web UI 目前仅显示 UTC 时间

概念

了解 datetime 时区敏感对象

tzinfo 是 datetime.datetime 对象中表示时区信息的属性,它是 datetime.tzinfo 类的对象。 设置 tzinfo 会使 datetime 对象变为时区敏感,否则就是不敏感。

您可以使用timezone.is_aware()timezone.is_naive()来确定 datetime 是否时区敏感。

因为 Airflow 使用时区敏感 datetime 对象,所以您创建 datetime 对象时也应该考虑时区问题。

  1. from airflow.utils import timezone
  2. now = timezone.utcnow()
  3. a_date = timezone.datetime(2017,1,1)

理解原生 datetime 对象

尽管 Airflow 现在是时区敏感的,但为了向后兼容,它仍然允许在 DAG 定义中使用原生 datetime 为start_datesend_dates赋值。 如果遇到使用原生 datetime 的start_dateend_date则使用默认时区。 换句话说,如果您设置了默认时区为Europe/Amsterdam,那么对于start_date = datetime(2017,1,1),则认定它是 2017 年 1 月 1 日的 Amesterdam 时间。

  1. default_args = dict(
  2. start_date = datetime(2016, 1, 1),
  3. owner = 'Airflow'
  4. )
  5. dag = DAG ('my_dag', default_args=default_args)
  6. op = DummyOperator(task_id='dummy', dag=dag)
  7. print(op.owner) # Airflow

不幸的是,在 DST 转换期间,某些时间不存在或有二义性。 在这种情况下,pendulum 会抛出异常。 这就是为什么在启用时区支持时应始终创建时区敏感的 datetime 对象。

实际上,这个问题几乎不会发生。 在 model 或 DAG 中,Airflow 总是返回给您的是时区敏感的 datetime 对象,通常新的 datetime 对象是通过现有对象和 timedelta 计算而来的。 唯一经常创建并且可能出问题的是当前时间,用timezone.utcnow()会避免这些麻烦。

默认时区

默认时区是在airflow.cfg中的[core]下的default_timezone定义的。 如果您刚刚安装了 Airflow,它推荐您设置为 utc。 您还可以将其设置为系统或 IANA 时区(例如Europe/Amsterdam)。 DAG 也会在 worker 上进行计算,因此确保所有 worker 节点上的此设置相同。

  1. [core]
  2. default_timezone = utc

时区敏感 DAG

只需设置start_date为时区敏感的 datetime,就创建了时区敏感的 DAG。 建议使用 pendulum,但也可以使用 pytz(手动安装)。

  1. import pendulum
  2. local_tz = pendulum.timezone("Europe/Amsterdam")
  3. default_args = dict (
  4. start_date = datetime(2016, 1, 1, tzinfo=local_tz),
  5. owner = 'Airflow'
  6. )
  7. dag = DAG('my_tz_dag', default_args=default_args)
  8. op = DummyOperator(task_id='dummy', dag=dag)
  9. print(dag.timezone) # <Timezone [Europe/Amsterdam]>

模板

Airflow 在模板中返回时区敏感的 datetime,但不会将它们转换为本地时间,因此它们仍然是 UTC 时区。 由 DAG 来处理转换。

  1. import pendulum
  2. local_tz = pendulum.timezone("Europe/Amsterdam")
  3. local_tz.convert(execution_date)

Cron 安排

如果您设置了 crontab,Airflow 会假定您始终希望在同一时间运行。 然后它将忽略 DST。 比如,您有一个调度要在每天格林威治标准时间 +1 的 08:00 运行,它将始终在这个时间运行,无论 DST 是否发生

时间增量调度

对于具有时间增量的调度,Airflow 假定您始终希望以指定的间隔运行。 比如,您指定timedelta(hours=2)运行,它将始终每两小时运行一次。 在这种情况下,将考虑 DST