1.Airflow介绍

Airflow 是 Airbnb 开源的一个用 Python 编写的调度工具。于 2014 年启动,2015 年春季开源,2016 年加入 Apache 软件基金会的孵化计划。
Airflow将一个工作流指定为一组任务的有向无环图(DAG),指派到一组计算节点上,根据相互之间的依赖关系,有序执行。
Airflow的优势:
灵活易用。由Python编写
功能强大。支持多种不同类型的作业,可自定义不同类型的作业。
简介优雅。作业定义简单
易扩展。提供各种基类供扩展,多种执行器可选择。

1.1体系架构

image.png
Webserver 守护进程。接受 HTTP 请求,通过 Python Flask Web 应用程序与 airflow 进行交互。
Webserver 提供功能的功能包括:中止、恢复、触发任务;监控正在运行的任务,断点续跑任务;查询任务的状态,日志等详细信息。
Scheduler 守护进程。周期性地轮询任务的调度计划,以确定是否触发任务执行。
Worker 守护进程。Worker负责启动机器上的executor来执行任务。使用celeryExecutor后可以在多个机器上部署worker服务。

1.2重要概念

DAG(Directed Acylic Graph)
在Airflow中,一个DAG定义了一个完整的作业。同一个DAG中的所有Task拥有相同的调度时间
参数:

  • dag_id:唯一识别DAG
  • default_args:默认参数,如果当前DAG实例的作业没有配置相应参数,则采用DAG实例的default_args中的相应参数
  • schedule_interval:配置DAG的执行周期,可采用crontab语法

Task
Task为DAG中具体的作业任务,依赖于DAG,必须存在于某个DAG中。Task在DAG中可以配置依赖关系
参数:

  • dag:当前作业属于相应DAG
  • task_id:任务标识符
  • owner:任务的拥有者
  • start_date:任务的开始时间

    2.Airflow安装部署

    2.1所需软件

    CentOS 7.X
    Python 3.5或以上版本(推荐)
    MySQL 5.7.x
    Apache-Airflow 1.10.11

    2.2Python环境准备

    1. # 卸载 mariadb mysql分支和mysql有冲突需要卸载
    2. rpm -qa | grep mariadb
    3. mariadb-libs-5.5.65-1.el7.x86_64
    4. mariadb-5.5.65-1.el7.x86_64
    5. mariadb-devel-5.5.65-1.el7.x86_64
    6. yum remove mariadb
    7. yum remove mariadb-libs
    8. # 安装依赖
    9. rpm -ivh mysql57-community-release-el7-11.noarch.rpm
    10. yum install readline readline-devel -y
    11. yum install gcc -y
    12. yum install zlib* -y
    13. yum install openssl openssl-devel -y
    14. yum install sqlite-devel -y
    15. yum install python-devel mysql-devel -y
    16. # 提前到python官网下载好包
    17. tar -zxvf Python-3.6.6.tgz
    18. # 安装 python3 运行环境
    19. cd Python-3.6.6/
    20. # configure文件是一个可执行的脚本文件。如果配置了--prefix,安装后的所有资源文件
    21. 都会放在目录中
    22. ./configure --prefix=/usr/local/python3.6
    23. make && make install
    24. /usr/local/python3.6/bin/pip3 install virtualenv
    25. # 启动 python3 环境
    26. cd /usr/local/python3.6/bin/
    27. ./virtualenv env
    28. . env/bin/activate
    29. # 检查 python 版本
    30. python -V

    2.3安装Airflow

    1. # 设置目录(配置文件)
    2. # 添加到配置文件/etc/profile。未设置是缺省值为 ~/airflow
    3. export AIRFLOW_HOME=/opt/lagou/servers/airflow
    4. # 使用豆瓣源非常快。-i: 指定库的安装源(可选选项)该命令安装版本为1.10.11
    5. pip install apache-airflow==1.10.11 -i https://pypi.douban.com/simple
    apache-airflow==1.10.11,需要指定安装的版本,重要!!!

    2.4创建数据库并授权

    1. -- 创建数据库
    2. create database airflowlinux122;
    3. -- 创建用户airflow,设置所有ip均可以访问
    4. create user 'airflow2'@'%' identified by '12345678';
    5. create user 'airflow2'@'localhost' identified by '12345678';
    6. -- 用户授权,为新建的airflow用户授予Airflow库的所有权限
    7. grant all on airflowlinux122.* to 'airflow2'@'%';
    8. SET GLOBAL explicit_defaults_for_timestamp = 1;
    9. flush privileges;

    2.5修改Airflow DB配置

    # python3 环境中执行
    pip install mysqlclient==1.4.6
    airflow initdb
    
    mysqlclient==1.4.6,需要指定安装的版本,重要!!!
    有可能在安装完Airflow找不到 $AIRFLOW_HOME/airflow.cfg 文件,执行完airflow initdb才会在对应的位置找到该文件。
    修改 $AIRFLOW_HOME/airflow.cfg:
    # 约 29 行
    sql_alchemy_conn = mysql://airflow2:12345678@linux123:3306/airflowlinux122
    # 重新执行
    airflow initdb
    
    可能出现的错误:Exception: Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
    解决方法:
    SET GLOBAL explicit_defaults_for_timestamp = 1;
    FLUSH PRIVILEGES;
    

    2.6安装密码模块

    安装password组件:
    pip install apache-airflow[password]
    修改 airflow.cfg 配置文件(第一行修改,第二行增加):
    # 约 281 行
    [webserver]
    # 约 353行
    authenticate = True
    auth_backend = airflow.contrib.auth.backends.password_auth
    
    添加密码文件
    python命令,执行一遍;添加用户登录,设置口令
    import airflow
    from airflow import models,settings
    from airflow.contrib.auth.backends.password_auth import PasswordUser
    user = PasswordUser(models.User())
    user.username ='airflow'
    user.email ='airflow@lagou.com'
    user.password ='airflow123'
    session = settings.Session()
    session.add(user)
    session.commit()
    session.close()
    exit()
    

    2.7启动服务

    # 备注:要先进入python3的运行环境
    cd /usr/local/python3.6/bin/
    ./virtualenv env
    . env/bin/activate
    # 退出虚拟环境命令
    deactivate
    # 启动scheduler调度器:
    airflow scheduler -D
    # 服务页面启动:
    airflow webserver -D
    
    airflow命令所在位置:/usr/local/python3.6/bin/env/bin/airflow
    安装完成,可以使用浏览器登录 linux122:8080;输入用户名、口令:airflow / airflow123
    image.png

    2.8修改时区

    airflow默认是使用UTC时间,需要修改时区
    1、在修改 $AIRFLOW_HOME/airflow.cfg 文件
    # 约 65 行
    default_timezone = Asia/Shanghai
    
    2、修改 timezone.py
    # 进入Airflow包的安装位置
    cd /usr/local/python3.6/bin/env/lib/python3.6/site-packages/
    # 修改airflow/utils/timezone.py
    cd airflow/utils
    vi timezone.py
    
    ```python 第27行注释,增加29-37行,代码中空格不能删除,python语法规定

    utc = pendulum.timezone(‘UTC’)

from airflow import configuration as conf try: tz = conf.get(“core”, “default_timezone”) if tz == “system”: utc = pendulum.local_timezone() else: utc = pendulum.timezone(tz) except Exception: pass

修改utcnow()函数注释70行增加71行 def utcnow(): “”” Get the current date and time in UTC

:return: “””

pendulum utcnow() is not used as that sets a TimezoneInfo object

instead of a Timezone. This is not pickable and also creates issues

when using replace()

d = dt.datetime.utcnow()

d = dt.datetime.now() d = d.replace(tzinfo=utc)

return d

3、修改 airflow/utils/sqlalchemy.py
```bash
# 进入Airflow包的安装位置
cd /usr/local/python3.6/bin/env/lib/python3.6/site-packages/airflow/utils
# 修改 sqlalchemy.py
vi sqlalchemy.py

在38行之后增加39-47行的内容,代码中空格不能删除,python语法规定

38 utc = pendulum.timezone('UTC')
from airflow import configuration
try:
     tz = configuration.conf("core","default_timezone")
     if tz == "system":
            utc = pendulum.local_timezone()
     else:
            utc = pendulum.timezone(tz)
except Exception:
     pass

4.修改airflow/www/templates/admin/master.html

# 进入Airflow包的安装位置
cd /usr/local/python3.6/bin/env/lib/python3.6/site-packages/
# 修改 airflow/www/templates/admin/master.html
cd airflow/www/templates/admin
vi master.html


# 将第40行修改为以下内容:
40 var UTCseconds = x.getTime();
# 将第43行修改为以下内容:
43 "timeFormat":"H:i:s",

重启airflow webserver

# 关闭 airflow webserver 对应的服务
#-v 'grep'过滤出grep的进程,awk '{print $2}'取出找出的第二个参数即进程号
ps -ef | grep 'airflow-webserver' | grep -v 'grep' | awk '{print $2}' | xargs -i kill -9 {}
# 关闭 airflow scheduler 对应的服务
ps -ef | grep 'airflow' | grep 'scheduler' |awk '{print $2}'| xargs -i kill -9 {}
# 删除对应的pid文件
cd $AIRFLOW_HOME
rm -rf *.pid
# 重启服务(在python3.6虚拟环境中执行)
airflow scheduler -D
airflow webserver -D

至此已将web端时间改为本地时间

2.9Airflow的web界面相关属性

Trigger Dag:人为执行触发
Tree View:当dag执行的时候,可以点入,查看每个task的执行状态(基于树状视图)。状态:success、running、failed、skipped、retry、queued、no status
Graph View:基于图视图(有向无环图),查看每个task的执行状态
Tasks Duration:每个task的执行时间统计,可以选择最近多少次执行
Task Tries:每个task的重试次数
Gantt View:基于甘特图的视图,每个task的执行状态
Code View:查看任务执行代码
Logs:查看执行日志,比如失败原因
Refresh:刷新dag任务
Delete Dag:删除该dag任务

2.10、禁用自带的DAG任务

停止服务

# 关闭 airflow webserver 对应的服务
ps -ef | grep 'airflow-webserver' | grep -v 'grep' | awk '{print $2}' | xargs -i kill -9 {}
# 关闭 airflow scheduler 对应的服务
ps -ef | grep 'airflow' | grep 'scheduler' | awk '{print $2}' | xargs -i kill -9 {}
# 删除对应的pid文件
cd $AIRFLOW_HOME
rm -rf *.pid

修改文件 $AIRFLOW_HOME/airflow.cfg:

# 修改文件第 136 行
136 # load_examples = True
137 load_examples = False
# 重新设置db
airflow resetdb -y

重新设置账户、口令:

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'airflow'
user.email = 'airflow@lagou.com'
user.password = 'airflow123'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

重启服务

# 重启服务
airflow scheduler -D
airflow webserver -D

3.任务集成部署

3.1Airflow核心概念

DAGs:有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序;
Operators:Airflow内置了很多operators

  • BashOperator 执行一个bash 命令
  • PythonOperator 调用任意的 Python 函数
  • EmailOperator 用于发送邮件
  • HTTPOperator 用于发送HTTP请求
  • SqlOperator 用于执行SQL命令
  • 自定义Operator

Tasks:Task 是 Operator的一个实例;
Task Instance:由于Task会被重复调度,每次task的运行就是不同的 Taskinstance。Task instance 有自己的状态,包括success 、running 、failed 、skipped 、up_for_reschedule 、up_for_retry 、queued 、no_status 等;
Task Relationships:DAGs中的不同Tasks之间可以有依赖关系;
执行器(Executor)。Airflow支持的执行器就有四种:本项目中只使用前两种

  • SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试
  • LocalExecutor:多进程本地执行任务
  • CeleryExecutor:分布式调度,生产常用。Celery是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,如RabbitMQ
  • DaskExecutor :动态任务调度,主要用于数据分析。
  • 执行器的修改。修改 $AIRFLOW_HOME/airflow.cfg 第 70行: executor =LocalExecutor 。修改后要重启服务

    3.2入门案例

    放置在 $AIRFLOW_HOME/dags 目录下helloworld.py ```python from datetime import datetime, timedelta from airflow import DAG from airflow.utils import dates from airflow.utils.helpers import chain from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator def default_options(): default_args = {
      'owner':'airflow', # 拥有者名称
      'start_date': dates.days_ago(1), # 第一次开始执行的时间
      'retries': 1, # 失败重试次数
      'retry_delay': timedelta(seconds=5) # 失败重试间隔
    
    } return default_args

定义DAG

def task1(dag): t = “pwd”

# operator支持多种类型,这里使用 BashOperator
task = BashOperator(
     task_id='MyTask1', # task_id
     bash_command=t, # 指定要执行的命令
     dag=dag # 指定归属的dag
)
return task

def hello_world(): current_time = str(datetime.today()) print(‘hello world at {}’.format(current_time))

def task2(dag):

# Python Operator
task = PythonOperator(
     task_id='MyTask2',
     python_callable=hello_world, # 指定要执行的函数
     dag=dag)
return task

def task3(dag): t = “date” task = BashOperator( task_id=’MyTask3’, bash_command=t, dag=dag) return task

with DAG( ‘HelloWorldDag’, # dag_id default_args=default_options(), # 指定默认参数 schedule_interval=”/2 *” # 执行周期,每分钟2次 ) as d: task1 = task1(d) task2 = task2(d) task3 = task3(d) chain(task1, task2, task3) # 指定执行顺序

```bash
# 执行命令检查脚本是否有错误。如果命令行没有报错,就表示没问题
   python $AIRFLOW_HOME/dags/helloworld.py
# 查看生效的 dags
   airflow list_dags -sd $AIRFLOW_HOME/dags
# 查看指定dag中的task
  airflow list_tasks HelloWorldDag
# 测试dag中的task
  airflow test HelloWorldDag MyTask2 20200801

3.3核心交易调度任务集成

核心交易分析

# 加载ODS数据(DataX迁移数据)
/data/lagoudw/script/trade/ods_load_trade.sh
# 加载DIM层数据
/data/lagoudw/script/trade/dim_load_product_cat.sh
/data/lagoudw/script/trade/dim_load_shop_org.sh
/data/lagoudw/script/trade/dim_load_payment.sh
/data/lagoudw/script/trade/dim_load_product_info.sh
# 加载DWD层数据
/data/lagoudw/script/trade/dwd_load_trade_orders.sh
# 加载DWS层数据
/data/lagoudw/script/trade/dws_load_trade_orders.sh
# 加载ADS层数据
/data/lagoudw/script/trade/ads_load_trade_order_analysis.sh

$AIRFLOW_HOME/dags/coretrade.py
depends_on_past设置为True时,上一次调度成功了,才可以触发,这里使用False

from datetime import timedelta
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# 定义dag的缺省参数
default_args = {
      'owner': 'airflow',
      'depends_on_past': False,
      'start_date': '2020-06-20',
      'email': ['airflow@example.com'],
      'email_on_failure': False,
      'email_on_retry': False,
      'retries': 1,
      'retry_delay': timedelta(minutes=5),
}
# 定义DAG
coretradedag = DAG(
      'coretrade',
      default_args=default_args,
      description='core trade analyze',
      schedule_interval='30 0 * * *',
)
today=datetime.date.today()
oneday=timedelta(days=1)
yesterday=(today-oneday).strftime("%Y-%m-%d")
odstask = BashOperator(
       task_id='ods_load_data',
       depends_on_past=False,
       bash_command='sh /data/lagoudw/script/trade/ods_load_trade.sh ' + yesterday,
       dag=coretradedag
)
dimtask1 = BashOperator(
       task_id='dimtask_product_cat',
       depends_on_past=False,
       bash_command='sh /data/lagoudw/script/trade/dim_load_product_cat.sh '+yesterday,
       dag=coretradedag
)
dimtask2 = BashOperator(
       task_id='dimtask_shop_org',
       depends_on_past=False,
       bash_command='sh /data/lagoudw/script/trade/dim_load_shop_org.sh ' + yesterday,
       dag=coretradedag
)
dimtask3 = BashOperator(
       task_id='dimtask_payment',
       depends_on_past=False,
       bash_command='sh /data/lagoudw/script/trade/dim_load_payment.sh ' + yesterday,
       dag=coretradedag
)
dimtask4 = BashOperator(
      task_id='dimtask_product_info',
      depends_on_past=False,
      bash_command='sh /data/lagoudw/script/trade/dim_load_product_info.sh ' + yesterday,
      dag=coretradedag
)
dwdtask = BashOperator(
      task_id='dwd_load_data',
      depends_on_past=False,
      bash_command='sh /data/lagoudw/script/trade/dwd_load_trade_orders.sh '+ yesterday,
      dag=coretradedag
)
dwstask = BashOperator(
      task_id='dws_load_data',
      depends_on_past=False,
      bash_command='sh /data/lagoudw/script/trade/dws_load_trade_orders.sh ' +yesterday,
      dag=coretradedag
)
adstask = BashOperator(
     task_id='ads_load_data',
     depends_on_past=False,
     bash_command='sh /data/lagoudw/script/trade/ads_load_trade_order_analysis.sh ' + yesterday,
     dag=coretradedag
)
odstask >> dimtask1
odstask >> dimtask2
odstask >> dimtask3
odstask >> dimtask4
odstask >> dwdtask
dimtask1 >> dwstask
dimtask2 >> dwstask
dimtask3 >> dwstask
dimtask4 >> dwstask
dwdtask >> dwstask
dwstask >> adstask

airflow list_dags
airflow list_tasks coretrade —tree
在web端查看
image.png