之前我们简单总结过串联任务的 DAG 脚本如何编写,具体可以参考一下 串联任务。下面我们改写一下脚本,来让脚本并行跑任务。将任务处理成并行 task,最大的好处就是可以极大地节省时间,下面直接贴一下并联代码:
def run_case_sql(i=0):
"""运行sql函数"""
batch_date = airflow_get_date(today_key, 0)
context = {"BATCH_DATE": batch_date}
sql_file = os.path.join(sql_path, "case.sql")
runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
runner.run_sql_block(sql_name="sql" + str(i))
runner.close()
def run_phone_sql(i=0):
"""运行sql函数"""
batch_date = airflow_get_date(today_key, 0)
context = {"BATCH_DATE": batch_date}
sql_file = os.path.join(sql_path, "phone.sql")
runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
runner.run_sql_block(sql_name="sql" + str(i))
runner.close()
default_args = {
"owner": "yumingmin",
"depends_on_past": False,
"start_date": days_ago(1),
"email": ["yu_mingm623@163.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 60,
"retry_delay": timedelta(minutes=5),
"catchup": False
}
dag = DAG(
project_name,
default_args=default_args,
description=project_name,
schedule_interval='30 8 * * *')
init_op = PythonOperator(
task_id="init",
python_callable=init,
dag=dag)
for j in range(0, 9):
parallel_op = PythonOperator(
task_id="run_case_sql" + str(j + 1),
pyttaskflowshon_callable=run_case_sql,
op_kwargs={"i": j + 1},
dag=dag)
init_variable_op >> parallel_op
for j in range(0, 2):
parallel_op = PythonOperator(
task_id="run_phone_sql" + str(j + 1),
python_callable=run_case_sql,
op_kwargs={"i": j + 1},
dag=dag)
init_variable_op >> parallel_op
💡 注意:如果是并行任务,一定是要有一个并行的分支点才可以。