1 目前部署方式

问题描述:目前我在部署 Airflow 任务的方式如下图所示,不同的 DAG 任务对应到不同的文件夹,例如这里的 DAG1 和 DAG2 分别对应到不同的 2 个任务,而 DAG1 和 DAG2 的配置文件都存放在 📂 itool/settings 文件夹下,分别对应到图中的 📑 config_dag1.py 和 📑 config_dag2.py 文件,最终我们还是需要在 airflow_dag1.py 和 airflow_dag2.py 文件中导入其配置。
image.png
这样的任务部署方式是完全 OK 的,而且任务也可以 RUN 起来,但是这样的方式会带来哪些问题呢?

  • 同一个项目,但是 DAG 文件和配置文件分别存放在不同的文件夹中,这很不符合工程规范
  • 由于项目配置文件是放在 📂 itool/settings 文件夹中,导致版本控制困难。如果你只是修改 DAG 文件夹下的文件还好,但是一旦需要修改项目配置文件,还需要到 📂 itool/settings 文件中去修改,最后还有可能你本地修改了配置文件,但是服务器端没有同步上去(我就犯过这样的低级错误😂)。
  • 这里我服务器的账号是 yumingmin,但是如果换一台服务器的话,即使 Airflow 环境是一致的,你的脚本也是无法正常编译的。

🐶 工作中你不光需要负责你自己的项目上线,很多时间别人的项目也是在你这边负责。最理想的状态下,我不需要懂别人的代码怎么写的,只需要一个 git pull 命令,把他的仓库完全同步下来就好(即使他的代码存在一些错误,你并不需要关心,debug 的事情可以他自己来搞),显然现在的方式并不支持这样做。

2. 解决方法(一)

看到这里,估计很多做工程的小伙伴会说,配置文件放在一个项目里不就得了,对应的 DAG 文件从项目中的配置文件来导入相关变量。Just so easy~~~!确实,如果想实现一个 git pull 就可以将修改进行同步,就必然需要将 📑 config_dag1.py 和 📑 config_dag2.py 文件都放在各自的文件下,那么你可能会将上述代码修改成以下这样:

  1. import sys
  2. sys.path.append("/home/yumingmin/itools")
  3. from config_dag1 import db_config

你会发现在终端中使用 python airflow_dag1.pypython airflow_dag2.py 都是正常编译的,无任何报错。但是一打开 http://localhost:9000/admin ,登录 Airflow 之后你会发现报错 No module named ‘config_dag1’,任务始终出现在页面上。一些有经验的小伙伴会觉得这是导入的问题,所以很快把代码修改成下面这样:

  1. import sys
  2. sys.path.append("/home/yumingmin/itools")
  3. from .config_dag1 import db_config

结果这样修改之后,连编译都无法正常通过,直接报以下的错误:

  1. ModuleNotFoundError: No module named '__main__.config_dag1'; '__main__' is not a package

😭 显然,这样并不可行。当然也有小伙伴说,还可以改成绝对路径啊,我也尝试过该方法,依旧还是不行。

3. 解决方法(二)

尝试了上述两种解决方法之后,又去百度和 Google 了很久,发现了下面一种解决方式:

  1. import sys
  2. home = os.environ.get("HOME")
  3. sys.path.append(os.path.join(home, "itools"))
  4. sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
  5. from config_dag1 import db_config

这样改动是将整个项目的绝对路径加载到系统环境中,以使得 Python 可以寻找到相应的配置文件。再次编译一下,可以直接编译成功,而且在 Airflow 任务界面上也没有报错😊 。 那既然这样的方式是 OK 的话,我们就可以将 📑 config_dag1.py 修改为 📑 config.py,这样可以让不同项目下文件呈现一致。

那么现在是我们有 DAG1 和 DAG2 两个工程,对应的工程目录如下:

  1. dags
  2. ├── DAG1
  3. ├── airflow_main.py
  4. └── config.py
  5. └── DAG2
  6. ├── airflow_main.py
  7. └── config.py

其中两个 📑 config.py 配置文件内容不相同,两个 📑 airflow_main.py 文件也不一样,具体如下:

  1. # DAG1/config.py
  2. a = 1
  3. b = 2
  4. c = 3
  1. # DAG2/config.py
  2. a = 1
  3. b = 2
  4. d = 3
  1. # DAG1/airflow_main.py
  2. import sys
  3. home = os.environ.get("HOME")
  4. sys.path.append(os.path.join(home, "itools"))
  5. sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
  6. from config import a, b, c
  1. # DAG2/airflow_main.py
  2. import sys
  3. home = os.environ.get("HOME")
  4. sys.path.append(os.path.join(home, "itools"))
  5. sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
  6. from config import a, b, d

在部署 DAG1 之后,你将 DAG2 以同样的方式部署上去之后,会又发现一堆问题:

  • 对于 DAG2,📑 config.py 文件明明确实在,但是导入相关变量时却提示没有
  • DAG2 任务不能够显示在 Airflow 的任务列表里
  • Airflow 是将所有工程环境存储在 sys.path 中,也就是说,直接将两个工程文件夹放到 sys.path 中后,两个同名的文件 Airflow 只能索引到一个,且是先部署的那一个。

    emm…又是一个悲伤的故事,折腾了好久,居然还是不行,尝试过重启 Airflow,也还是不行 😭。

从这里我们可以看出 Airflow 的工作原理,采样这种方式任务时,应该注意以下问题:

  • 📑 airflow_main.py 不可以重名,可以将上述 📑 airflow_main.py 修改为 📑 airflow_dag1.py 和 📑 airflow_dag2.py
  • 📑 config.py 也不可以重名,可以将上述 📑 config.py 修改为 📑 config_dag1.py 和 📑 config_dag2.py

修改完成后,应该就是将两个任务部署上去了,但是这样的方式其实也给我们带来一些问题,你需要和每个人说明,项目下的 📑 airflow_main.py 和 📑 config.py 文件名要唯一,且不可以与之前的重名,但是这种人为的错误在某些时刻也是不可避免的。显然,这种方式也存在很严重的 bug。

4. 解决方法(三)⭐

我们调整下工程部署的目录结构,将 DAG1 和 DAG2 放到 📂 projects 下的 dags 文件夹中,同时也可以将 itools 自定义库放到 📂 projects 中。同时在 /home/yumingmin/bigdata/miniconda3/lib/python3.6/site-packages 下新建一个 📑 mylib.pth
添加以下内容:

  1. /home/yumingmin/bigdata/projects

🔔 这里需要将原有 📑 airflow.cfg 文件重新进行修改:

  1. [core]
  2. # The folder where your airflow pipelines live, most likely a
  3. # subfolder in a code repository. This path must be absolute.
  4. dags_folder = /home/yumingmin/bigdata/projects/dags
  5. # The folder where airflow should store its log files
  6. # This path must be absolute
  7. base_log_folder = /home/yumingmin/bigdata/projects/logs

接着,我们重新改写两个 📑 airflow_main.py 文件:

  1. # DAG1/airflow_main.py
  2. import sys
  3. from itools import check_table_update
  4. from DAG1.config import a, b, c
  1. # DAG2/airflow_main.py
  2. import sys
  3. from itools import check_table_update
  4. from DAG2.config import a, b, d

这样就 OK 了。

🔔 值得注意的是,完成这样的部署方式,需要运行 airflow resetdb 来清除掉之前的任务。