在开始阅读AWEL工作原理之前,建议首先查看快速开始相关的文档。 通过快速开始中用AWEL编写Hello World的程序,能更好的理解AWEL的工作原理。

介绍

让我们以快速开始中的代码为例,来分析AWEL的工作原理。 我们有如下的应用程序

  1. import asyncio
  2. from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource
  3. with DAG("awel_hello_world") as dag:
  4. input_task = InputOperator(
  5. input_source=SimpleCallDataInputSource()
  6. )
  7. task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
  8. input_task >> task
  9. dag.visualize_dag()
  10. asyncio.run(task.call(call_data="world"))

如上代码中包含了几个关键的概念。1. DAG 2. Operator 3. Task

  • DAG: DAG是一个表示有向无环图的类(Directed Acyclic Graph)。 它用来定义任务以及上下游依赖的结构。
  • Operator: InputOperatorMapOperator是operator 的例子。一个operator是DAG的一个节点。 可以是一个数据源,一个转换处理器,或者数据接收器。在本案例中,InputOperator是一个数据源,MapOperator是一段转换程序。
  • Task: 一个任务是一个算子的实例,它是一个动态的概念。
  • Runner: Runner是用来执行DAG任务的。当我们调用task.call(call_data="world"),我们使用一个runner来执行这个任务。 DefaultWorkflowRunner 会在同一个进程中运行任务。 RayWorkflowRunner会运行任务在Ray集群当中(社区版本还未实现)

DAG(Directed Acyclic Graph)

什么是DAG?

Directed Acyclic Graph(DAG)是有具有一组顶点与一组有向边的图。边从一个节点到另一个节点,图中没有环。在AWEL的概念里面,顶点是的算子(operators), 边是算子之间的依赖。

算子(Operator)

什么是算子?

一个算子是DAG的一个节点。它可以是数据源,一段转换程序,或者一个模型调用。在AWEL的概念里面,一个算子是一个继承了 dbgpt.core.awel.BaseOperator 基类的拓展类。

根据输出数据类型的不同,算子可以区分为两类。1. 流式算子 非流式算子

基础算子

这里有一些基础的算子,可以用来构建更复杂的算子。

  • InputOperator: 是一个非流式算子,用来从输入数据源获取数据。
  • MapOperator: 是一个非流式算子,用来做用一个处理函数进行数据的转换与处理。
  • BranchOperator: 非流式算子,用于根据输入数据决定运行哪条路径。
  • JoinOperator: 此非流式算子用于将多个路径中的数据连接到单个路径中。
  • StreamifyAbsOperator: 这是一个流式算子,用来转换非流式算子到流式算子。
  • UnStreamifyAbsOperator: 这是一个非流式算子,用来转换流式算子到非流算子。
  • TransformStreamAbsOperator: 这是一个流式算子,用来流式数据到另一个流式数据。
  • ReduceStreamOperator: 这是一个非流式算子,将流数据Reduce为非流数据。
  • TriggerOperator: 非流算子用来触发任务。 是一种特殊的InputOperator

高级算子

  • RequestBuilderOperator: 非流算子用于根据输入数据构建模型请求。
  • LLMOperator: 非流算子用来调用大模型服务
  • StreamingLLMOperator: 流式算子用来调用大模型服务。
  • LLMBranchOperator: 非流算子,根据输出决定运行路径
  • OpenAIStreamOutputOperator: 流算子,用来将模型输出转换为兼容OpenAI SDK的流数据。
  • ChatHistoryPromptComposerOperator: 非流算子用于构建高级任务来编写聊天历史提示词。

任务

什么是任务?

任务是算子的一个实例,它是无状态的,这也意味着任务可以使用不同的输出参数重复执行。

每个任务可以从父任务接收多个输入数据。并向子任务返回单个输出数据。

Runner

什么是Runner?

Runner是用来执行DAG中任务的类。当我们通过 task.call(call_data="world")调用任务时,我们正使用runner来执行任务。它会触发该任务的所有父任务,然后执行该任务。

DefaultWorkflowRunner 在同一进程中运行您的任务。 RayWorkflowRunner 在 Ray 集群中运行您的任务(社区版本尚未实现)。此外,您还可以实现自己的运行器以在自己的环境中运行任务。