DAG编排规划
首先我们看一个基础AWEL编排入门的例子, 例子中的核心功能是一个Http请求的输入输出的处理。 所以整个编排的节点只有两步。
- Http请求
- 处理Http响应结果
在DB-GPT当中,对于一些基础依赖的算子已经做了封装,可以直接引用。
from dbgpt._private.pydantic import BaseModel, Field
from dbgpt.core.awel import DAG, HttpTrigger, MapOperator
自定义算子
定义一个Http请求体, 接收两个参数。name
、age
class TriggerReqBody(BaseModel):
name: str = Field(..., description="User name")
age: int = Field(18, description="User age")
定义一个Reqeust处理算子 RequestHandleOperator
, 此算子集成基础MapOperator
算子, RequestHandleOperator
算子做的事情也比较简单,就是解析请求体,然后解析出name
与age
字段,然后拼接成一句话。如:
Hello, zhangsan, your age is 18
class RequestHandleOperator(MapOperator[TriggerReqBody, str]):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def map(self, input_value: TriggerReqBody) -> str:
print(f"Receive input value: {input_value}")
return f"Hello, {input_value.name}, your age is {input_value.age}"
DAG编排
写好上述算子之后,即可拼接为一个DAG编排。此DAG总共有两个节点,第一个节点是HttpTrigger, 主要是处理Http请求(此算子是DB-GPT内置算子),第二个节点就是我们刚刚定义的处理请求体的RequestHandleOperator
算子。 下面DAG代码即可讲两个节点进行拼接。
with DAG("simple_dag_example") as dag:
trigger = HttpTrigger("/examples/hello", request_body=TriggerReqBody)
map_node = RequestHandleOperator()
trigger >> map_node
访问验证
在访问验证之前,首先需要启动项目,python dbgpt/app/dbgpt_server.py
% curl -X GET http://127.0.0.1:5000/api/v1/awel/trigger/examples/hello\?name\=zhangsan
"Hello, zhangsan, your age is 18"
当然我们为了用户可以更方便测试,提供了一个测试环境。此测试环境可以在不启动dbgpt_server的前提下,进行测试。 在simple_dag_example
下面添加如下代码,然后直接运行simple_dag_example.py
脚本,即可在不启动项目的前提下,直接运行测试脚本。
if __name__ == "__main__":
if dag.leaf_nodes[0].dev_mode:
# Development mode, you can run the dag locally for debugging.
from dbgpt.core.awel import setup_dev_environment
setup_dev_environment([dag], port=5555)
else:
# Production mode, DB-GPT will automatically load and execute the current file after startup.
pass
% curl -X GET http://127.0.0.1:5555/api/v1/awel/trigger/examples/hello\?name\=zhangsan
"Hello, zhangsan, your age is 18"