JoinOperator 算子用于将多路输入数据连接成单个输入数据。比如说,如果有两个父节点,你可以通过Join算子将其连接为一个数据分支。

JoinOperator的构建方式如下

通过组合函数来构建

  1. from dbgpt.core.awel import DAG, JoinOperator
  2. with DAG("awel_join_operator") as dag:
  3. task = JoinOperator(combine_function=lambda x, y: x + y)

例子

两数之和

在awel_tutorial目录下创建一个名称为join_operator_sum_number.py 的文件,代码如下

  1. import asyncio
  2. from dbgpt.core.awel import (
  3. DAG, JoinOperator, MapOperator, InputOperator, SimpleCallDataInputSource
  4. )
  5. with DAG("sum_numbers_dag") as dag:
  6. # Create a input task to receive data from call_data
  7. input_task = InputOperator(input_source=SimpleCallDataInputSource())
  8. task1 = MapOperator(map_function=lambda x: x["t1"])
  9. task2 = MapOperator(map_function=lambda x: x["t2"])
  10. sum_task = JoinOperator(combine_function=lambda x, y: x + y)
  11. input_task >> task1 >> sum_task
  12. input_task >> task2 >> sum_task
  13. if asyncio.run(sum_task.call(call_data={"t1": 5, "t2": 8})) == 13:
  14. print("Success!")
  15. else:
  16. print("Failed")

通过如下命令进行运行, 并观察输出:

  1. poetry run python awel_tutorial/join_operator_sum_numbers.py
  2. > Success!

DAG流程图如下

Join Operator - 图1