- 三大组件与运行机制
webserver: web界面,查看任务进行情况和一些简单任务操作,参数配置等
scheduler: 进行任务调度,具体状态存储于mysql,通过celery框架发送指令给worker
worker:接受指令,发动airlfow run命令,运行本地代码
- 手动调度(manual web上点或api调用)与周期调度(scheduled、backfill)
{{ds}}区别、api调用可传参
- 基本元素:
dag:dag_id、description、schedule_interval、
start_date、end_date、catchup、
user_defined_macros、user_defined_filters、
max_active_runs、concurrency、default_args
operator:task_id、dag
own、email、email_on_retry、email_on_failure、
retries、retry_delay、
depends_on_past、start_date、end_date、
priority_weight、weight_rule、queue、pool、
task_concurrency
sensor:poke_interval、mode
task:
- web操作
clear
trigger
新任务处理
- 本地测试
- airflow常用命令
测试某个任务:airflow test dag_name task_name 2015-06-01
回跑某个dag(历史没跑过):airflow backfill dag_name -x -s ‘2018-06-01 10:00:00’ -e ‘2015- 08-01 10:00:00’
回跑某个dag(历史跑过,数据有误):airflow clear dag_name -s 2018-06-01 -e 2015-08-01
回跑某个dag的某个task及其后续(历史跑过,数据有误):airflow clear dag_name -t task_name -d -s 2020-01-04 -e 2020-01-08
注:时间不包括-e里的结束时间
- 数仓任务写法
结构、依赖关系改为表的依赖(sources targets)、配置写法
- 同步mysql表任务
- hive数据清理任务
- python脚本任务
- shell脚本任务
- BashOperator、KylinBuildCubeOperator、SqoopOperator
- 生成excel推小Q群(wx.sql)
- cdag
- jinja2 udf
- hive udf
- env
- 分支策略
- 后期优化措施
- 简化同步业务库流程
- sqoop export任务模版化
- hiveserver2地址正服改为去zk读取
- airflow dag解析优化
- 天以上周期依赖天的策略
- hive udf上hdfs
- 简单任务比如sql定时任务web界面配置
- 分支策略
DAG
默认参数
{'dag_id': '文件夹名称','catchup': False,'schedule_interval': '15 0 * * *','default_args': {'owner': 'data_warehouse','depends_on_past': False,'start_date': datetime(2019, 12, 1, 0, 0),'email': ['bigdata@youcheyihou.com'],'email_on_failure': True,'email_on_retry': True,'retries': 1,'retry_delay': timedelta(hours=1)}}
dag默认按天任务配置,若需要自定以配置,需要在文件中创建 dag.yml 进行配置
description: 埋点数据清理任务
schedule_interval: 4 * * * *
default_args:
retry_delay: {minutes: 10}
