日常任务中,我们会遇到一些串联任务,比如下方这个 run_case_sql1run_case_sql2...run_case_sql9,其实每个 task 运行的功能都是类似的,就是运行文件中的一段 SQL。这里只有 9 个还好,但是如果有 100 多个,一个一个写 task 会让代码显得很冗余,而且非常容易出粗,一点也不 Pythonic。
image.png

下面来简化一段代码,来让程序更简单一点。

1. SQL文件

我们有一个 case.sql 文件,里面存放了我们需要跑的 SQL 代码,内容如下(中间省去 sql2 到 sql8 代码):

  1. --[sql1]
  2. DROP TABLE IF EXISTS test.tb1
  3. ;
  4. CREATE TABLE IF NOT EXISTS test.tb1
  5. STORED AS PARQUET AS
  6. SELECT * FROM test.tb
  7. ;
  8. COMPUTE STATS test.tb1
  9. ;
  10. --[end]
  11. --[sql9]
  12. DROP TABLE IF EXISTS test.tb9
  13. ;
  14. CREATE TABLE IF NOT EXISTS test.tb9
  15. STORED AS PARQUET AS
  16. SELECT * FROM test.tb8
  17. ;
  18. COMPUTE STATS test.tb9
  19. ;
  20. --[end]

2. 复用函数

这是一段复用函数,主要用于对应文件中的 SQL 代码:

  1. def run_case_sql(i=0):
  2. """运行sql函数"""
  3. batch_date = airflow_get_date(today_key, 0)
  4. context = {"BATCH_DATE": batch_date}
  5. sql_file = os.path.join(sql_path, "case.sql")
  6. runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
  7. runner.run_sql_block(sql_name="sql" + str(i))
  8. runner.close()
  9. def run_phone_sql(i=0):
  10. """运行sql函数"""
  11. batch_date = airflow_get_date(today_key, 0)
  12. context = {"BATCH_DATE": batch_date}
  13. sql_file = os.path.join(sql_path, "phone.sql")
  14. runner = ImpalaHiveSQLRunner(db_name="impala", config=databases, filename=sql_file, context=context)
  15. runner.run_sql_block(sql_name="sql" + str(i))
  16. runner.close()

3. DAG文件

下面我们编写 DAG 文件

  1. default_args = {
  2. "owner": "yumingmin",
  3. "depends_on_past": False,
  4. "start_date": days_ago(1),
  5. "email": ["yu_mingm623@163.com"],
  6. "email_on_failure": False,
  7. "email_on_retry": False,
  8. "retries": 60,
  9. "retry_delay": timedelta(minutes=5),
  10. "catchup": False
  11. }
  12. dag = DAG(
  13. project_name,
  14. default_args=default_args,
  15. description=project_name,
  16. schedule_interval='30 8 * * *')
  17. taskflows = []
  18. for j in range(0, 9):
  19. taskflows.append(PythonOperator(
  20. task_id="run_case_sql" + str(j + 1),
  21. pyttaskflowshon_callable=run_case_sql,
  22. op_kwargs={"i": j + 1},
  23. dag=dag)
  24. )
  25. if j != 0:
  26. taskflows[-2] >> taskflows[-1]
  27. for j in range(0, 2):
  28. order.append(PythonOperator(
  29. task_id="run_phone_sql" + str(j + 1),
  30. python_callable=run_case_sql,
  31. op_kwargs={"i": j + 1},
  32. dag=dag)
  33. )
  34. taskflows[-2] >> taskflows[-1]

这样我们就代码量极大地进行了缩短,Perfect 🤟