在开始阅读AWEL工作原理之前,建议首先查看快速开始相关的文档。 通过快速开始中用AWEL编写Hello World的程序,能更好的理解AWEL的工作原理。
介绍
让我们以快速开始中的代码为例,来分析AWEL的工作原理。 我们有如下的应用程序
import asyncio
from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource
with DAG("awel_hello_world") as dag:
input_task = InputOperator(
input_source=SimpleCallDataInputSource()
)
task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
input_task >> task
dag.visualize_dag()
asyncio.run(task.call(call_data="world"))
如上代码中包含了几个关键的概念。1. DAG
2. Operator
3. Task
- DAG: DAG是一个表示有向无环图的类(Directed Acyclic Graph)。 它用来定义任务以及上下游依赖的结构。
- Operator:
InputOperator
和MapOperator
是operator 的例子。一个operator是DAG的一个节点。 可以是一个数据源,一个转换处理器,或者数据接收器。在本案例中,InputOperator
是一个数据源,MapOperator
是一段转换程序。 - Task: 一个任务是一个算子的实例,它是一个动态的概念。
- Runner: Runner是用来执行DAG任务的。当我们调用
task.call(call_data="world")
,我们使用一个runner来执行这个任务。DefaultWorkflowRunner
会在同一个进程中运行任务。RayWorkflowRunner
会运行任务在Ray集群当中(社区版本还未实现)
DAG(Directed Acyclic Graph)
什么是DAG?
Directed Acyclic Graph(DAG)是有具有一组顶点与一组有向边的图。边从一个节点到另一个节点,图中没有环。在AWEL的概念里面,顶点是的算子(operators), 边是算子之间的依赖。
算子(Operator)
什么是算子?
一个算子是DAG的一个节点。它可以是数据源,一段转换程序,或者一个模型调用。在AWEL的概念里面,一个算子是一个继承了 dbgpt.core.awel.BaseOperator
基类的拓展类。
根据输出数据类型的不同,算子可以区分为两类。1. 流式算子 和非流式算子
基础算子
这里有一些基础的算子,可以用来构建更复杂的算子。
InputOperator
: 是一个非流式算子,用来从输入数据源获取数据。MapOperator
: 是一个非流式算子,用来做用一个处理函数进行数据的转换与处理。BranchOperator
: 非流式算子,用于根据输入数据决定运行哪条路径。JoinOperator
: 此非流式算子用于将多个路径中的数据连接到单个路径中。StreamifyAbsOperator
: 这是一个流式算子,用来转换非流式算子到流式算子。UnStreamifyAbsOperator
: 这是一个非流式算子,用来转换流式算子到非流算子。TransformStreamAbsOperator
: 这是一个流式算子,用来流式数据到另一个流式数据。ReduceStreamOperator
: 这是一个非流式算子,将流数据Reduce为非流数据。TriggerOperator
: 非流算子用来触发任务。 是一种特殊的InputOperator
高级算子
RequestBuilderOperator
: 非流算子用于根据输入数据构建模型请求。LLMOperator
: 非流算子用来调用大模型服务StreamingLLMOperator
: 流式算子用来调用大模型服务。LLMBranchOperator
: 非流算子,根据输出决定运行路径OpenAIStreamOutputOperator
: 流算子,用来将模型输出转换为兼容OpenAI SDK的流数据。ChatHistoryPromptComposerOperator
: 非流算子用于构建高级任务来编写聊天历史提示词。
任务
什么是任务?
任务是算子的一个实例,它是无状态的,这也意味着任务可以使用不同的输出参数重复执行。
每个任务可以从父任务接收多个输入数据。并向子任务返回单个输出数据。
Runner
什么是Runner?
Runner是用来执行DAG中任务的类。当我们通过 task.call(call_data="world")
调用任务时,我们正使用runner来执行任务。它会触发该任务的所有父任务,然后执行该任务。
DefaultWorkflowRunner
在同一进程中运行您的任务。 RayWorkflowRunner
在 Ray 集群中运行您的任务(社区版本尚未实现)。此外,您还可以实现自己的运行器以在自己的环境中运行任务。