日常任务中,我们会遇到一些串联任务,比如下方这个 run_case_sql1
➜ run_case_sql2
➜ ...
➜ run_case_sql9
,其实每个 task 运行的功能都是类似的,就是运行文件中的一段 SQL。这里只有 9 个还好,但是如果有 100 多个,一个一个写 task 会让代码显得很冗余,而且非常容易出粗,一点也不 Pythonic。
1. SQL文件
我们有一个 case.sql 文件,里面存放了我们需要跑的 SQL 代码,内容如下(中间省去 sql2 到 sql8 代码):
--[sql1]
DROP TABLE IF EXISTS test.tb1
;
CREATE TABLE IF NOT EXISTS test.tb1
STORED AS PARQUET AS
SELECT * FROM test.tb
;
COMPUTE STATS test.tb1
;
--[end]
--[sql9]
DROP TABLE IF EXISTS test.tb9
;
CREATE TABLE IF NOT EXISTS test.tb9
STORED AS PARQUET AS
SELECT * FROM test.tb8
;
COMPUTE STATS test.tb9
;
--[end]
2. 复用函数
这是一段复用函数,主要用于对应文件中的 SQL 代码:
def run_case_sql(i=0):
"""运行sql函数"""
batch_date = airflow_get_date(today_key, 0)
context = {"BATCH_DATE": batch_date}
sql_file = os.path.join(sql_path, "case.sql")
runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
runner.run_sql_block(sql_name="sql" + str(i))
runner.close()
def run_phone_sql(i=0):
"""运行sql函数"""
batch_date = airflow_get_date(today_key, 0)
context = {"BATCH_DATE": batch_date}
sql_file = os.path.join(sql_path, "phone.sql")
runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
runner.run_sql_block(sql_name="sql" + str(i))
runner.close()
3. DAG文件
下面我们编写 DAG 文件
default_args = {
"owner": "yumingmin",
"depends_on_past": False,
"start_date": days_ago(1),
"email": ["yu_mingm623@163.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 60,
"retry_delay": timedelta(minutes=5),
"catchup": False
}
dag = DAG(
project_name,
default_args=default_args,
description=project_name,
schedule_interval='30 8 * * *')
taskflows = []
for j in range(0, 9):
taskflows.append(PythonOperator(
task_id="run_case_sql" + str(j + 1),
pyttaskflowshon_callable=run_case_sql,
op_kwargs={"i": j + 1},
dag=dag)
)
if j != 0:
taskflows[-2] >> taskflows[-1]
for j in range(0, 2):
order.append(PythonOperator(
task_id="run_phone_sql" + str(j + 1),
python_callable=run_case_sql,
op_kwargs={"i": j + 1},
dag=dag)
)
taskflows[-2] >> taskflows[-1]
这样我们就代码量极大地进行了缩短,Perfect 🤟