JoinOperator
算子用于将多路输入数据连接成单个输入数据。比如说,如果有两个父节点,你可以通过Join算子将其连接为一个数据分支。
JoinOperator
的构建方式如下
通过组合函数来构建
from dbgpt.core.awel import DAG, JoinOperator
with DAG("awel_join_operator") as dag:
task = JoinOperator(combine_function=lambda x, y: x + y)
例子
两数之和
在awel_tutorial目录下创建一个名称为join_operator_sum_number.py
的文件,代码如下
import asyncio
from dbgpt.core.awel import (
DAG, JoinOperator, MapOperator, InputOperator, SimpleCallDataInputSource
)
with DAG("sum_numbers_dag") as dag:
# Create a input task to receive data from call_data
input_task = InputOperator(input_source=SimpleCallDataInputSource())
task1 = MapOperator(map_function=lambda x: x["t1"])
task2 = MapOperator(map_function=lambda x: x["t2"])
sum_task = JoinOperator(combine_function=lambda x, y: x + y)
input_task >> task1 >> sum_task
input_task >> task2 >> sum_task
if asyncio.run(sum_task.call(call_data={"t1": 5, "t2": 8})) == 13:
print("Success!")
else:
print("Failed")
通过如下命令进行运行, 并观察输出:
poetry run python awel_tutorial/join_operator_sum_numbers.py
> Success!
DAG流程图如下