之前我们简单总结过串联任务的 DAG 脚本如何编写,具体可以参考一下 串联任务。下面我们改写一下脚本,来让脚本并行跑任务。将任务处理成并行 task,最大的好处就是可以极大地节省时间,下面直接贴一下并联代码:

    1. def run_case_sql(i=0):
    2. """运行sql函数"""
    3. batch_date = airflow_get_date(today_key, 0)
    4. context = {"BATCH_DATE": batch_date}
    5. sql_file = os.path.join(sql_path, "case.sql")
    6. runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
    7. runner.run_sql_block(sql_name="sql" + str(i))
    8. runner.close()
    9. def run_phone_sql(i=0):
    10. """运行sql函数"""
    11. batch_date = airflow_get_date(today_key, 0)
    12. context = {"BATCH_DATE": batch_date}
    13. sql_file = os.path.join(sql_path, "phone.sql")
    14. runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
    15. runner.run_sql_block(sql_name="sql" + str(i))
    16. runner.close()
    17. default_args = {
    18. "owner": "yumingmin",
    19. "depends_on_past": False,
    20. "start_date": days_ago(1),
    21. "email": ["yu_mingm623@163.com"],
    22. "email_on_failure": False,
    23. "email_on_retry": False,
    24. "retries": 60,
    25. "retry_delay": timedelta(minutes=5),
    26. "catchup": False
    27. }
    28. dag = DAG(
    29. project_name,
    30. default_args=default_args,
    31. description=project_name,
    32. schedule_interval='30 8 * * *')
    33. init_op = PythonOperator(
    34. task_id="init",
    35. python_callable=init,
    36. dag=dag)
    37. for j in range(0, 9):
    38. parallel_op = PythonOperator(
    39. task_id="run_case_sql" + str(j + 1),
    40. pyttaskflowshon_callable=run_case_sql,
    41. op_kwargs={"i": j + 1},
    42. dag=dag)
    43. init_variable_op >> parallel_op
    44. for j in range(0, 2):
    45. parallel_op = PythonOperator(
    46. task_id="run_phone_sql" + str(j + 1),
    47. python_callable=run_case_sql,
    48. op_kwargs={"i": j + 1},
    49. dag=dag)
    50. init_variable_op >> parallel_op

    💡 注意:如果是并行任务,一定是要有一个并行的分支点才可以。