之前我们简单总结过串联任务的 DAG 脚本如何编写,具体可以参考一下 串联任务。下面我们改写一下脚本,来让脚本并行跑任务。将任务处理成并行 task,最大的好处就是可以极大地节省时间,下面直接贴一下并联代码:
def run_case_sql(i=0):"""运行sql函数"""batch_date = airflow_get_date(today_key, 0)context = {"BATCH_DATE": batch_date}sql_file = os.path.join(sql_path, "case.sql")runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)runner.run_sql_block(sql_name="sql" + str(i))runner.close()def run_phone_sql(i=0):"""运行sql函数"""batch_date = airflow_get_date(today_key, 0)context = {"BATCH_DATE": batch_date}sql_file = os.path.join(sql_path, "phone.sql")runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)runner.run_sql_block(sql_name="sql" + str(i))runner.close()default_args = {"owner": "yumingmin","depends_on_past": False,"start_date": days_ago(1),"email": ["yu_mingm623@163.com"],"email_on_failure": False,"email_on_retry": False,"retries": 60,"retry_delay": timedelta(minutes=5),"catchup": False}dag = DAG(project_name,default_args=default_args,description=project_name,schedule_interval='30 8 * * *')init_op = PythonOperator(task_id="init",python_callable=init,dag=dag)for j in range(0, 9):parallel_op = PythonOperator(task_id="run_case_sql" + str(j + 1),pyttaskflowshon_callable=run_case_sql,op_kwargs={"i": j + 1},dag=dag)init_variable_op >> parallel_opfor j in range(0, 2):parallel_op = PythonOperator(task_id="run_phone_sql" + str(j + 1),python_callable=run_case_sql,op_kwargs={"i": j + 1},dag=dag)init_variable_op >> parallel_op
💡 注意:如果是并行任务,一定是要有一个并行的分支点才可以。
