首先我们来看一个基础入门的Hello World程序。

前置准备

在本教程中,我们使用poetry来管理我们项目的环境依赖。 如果你未安装poetry,你可以通过下面的教程进行安装。poetry安装

  1. pip install poetry

创建项目

在开始之前,你首先需要创建一个项目,你可以起任意你喜欢的名称作为项目名,在本教程中,我们统一同<font style="color:rgb(28, 30, 33);background-color:rgb(246, 247, 248);">awel-tutorial</font>作为项目名称。

接下来我们来进行实际的操作。 首先你需要创建一个工作目录,你可以打开终端创建一个工作目录。

  1. mkdir -p ~/projects
  2. cd ~/projects

然后运行下面的命令创建工作目录,并进入到工作目录下。

  1. poetry new awel-tutorial
  2. cd awel-tutorial

你将看到如下的目录树结构:

  1. awel-tutorial
  2. ├── README.md
  3. ├── awel_tutorial
  4. └── __init__.py
  5. ├── pyproject.toml
  6. └── tests
  7. └── __init__.py

添加DB-GPT依赖

  1. poetry add "dbgpt>=0.5.1rc0"

Hello World程序

接下来创建一个简单的DAG程序,在终端输出为”Hello World”。

首先在awel_tutorial目录下创建一个名称为<font style="color:rgb(28, 30, 33);background-color:rgb(246, 247, 248);">first_hello_world.py</font>的文件, 然后在文件中编写如下的代码:

  1. from dbgpt.core.awel import DAG, MapOperator
  2. with DAG("awel_hello_world") as dag:
  3. task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
  4. task._blocking_call(call_data="world")

查看项目的目录树结构:

  1. awel-tutorial
  2. ├── README.md
  3. ├── awel_tutorial
  4. ├── __init__.py
  5. └── first_hello_world.py
  6. ├── poetry.lock
  7. ├── pyproject.toml
  8. └── tests
  9. └── __init__.py

通过命令执行代码,查看终端输出。

  1. poetry run python awel_tutorial/first_hello_world.py
  2. Hello, world!

异步Hello World程序

AWEL中所有的任务调度都是异步的。 此示例演示如何使用asyncio运行任务。创建一个新的文件在awel_tutorial目录下,文件名称为 first_hello_world_asyncio.py 并输入如下的代码:

  1. import asyncio
  2. from dbgpt.core.awel import DAG, MapOperator
  3. with DAG("awel_hello_world") as dag:
  4. task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
  5. asyncio.run(task.call(call_data="world"))

运行如下命令进行执行:

  1. poetry run python awel_tutorial/first_hello_world_asyncio.py
  2. Hello, world!

有两个任务的Hello World程序

当我们调用节点的时候,我们可以进行数据传递。 本案例中展示如何通过 InputOperator 算子进行数据的传递。 创建一个文件名为 <font style="color:rgb(28, 30, 33);background-color:rgb(246, 247, 248);">first_hello_world_two_tasks.py</font> 的文件,并添加如下的代码:

  1. import asyncio
  2. from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource
  3. with DAG("awel_hello_world") as dag:
  4. input_task = InputOperator(
  5. input_source=SimpleCallDataInputSource()
  6. )
  7. task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
  8. input_task >> task
  9. asyncio.run(task.call(call_data="world"))

运行代码进行测试

  1. poetry run python awel_tutorial/first_hello_world_two_tasks.py
  2. Hello, world!

在本案例中,我们有两个任务。 第一个任务是 InputOperator 算子,它从 SimpleCallDataInputSource

算子中获取数据。 第二个任务是 MapOperator ,从第一个任务中获取数据并将输出结果”Hello world!”打印到终端。同时我们使用 >> 操作符来连接两个任务。 此操作符被用来定义两个任务之间的父 -> 子关系,同时也表达了任务之间的依赖。 当然也可以通过 set_downstream 方法来定义两个任务之间的依赖关系。

  1. input_task.set_downstream(task)

上面的单任务DAG是两任务DAG的一种特例, 其中未使用 InputOperator

  1. with DAG("awel_hello_world") as dag:
  2. task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
  3. asyncio.run(task.call(call_data="world"))

DAG可视化

首先需要安装<font style="color:rgb(28, 30, 33);">graphviz</font>包来进行DAG的可视化。

  1. poetry add graphviz

更新first_hello_world_two_tasks.py 文件,添加如下的代码

  1. dag.visualize_dag()

完整的代码如下:

  1. import asyncio
  2. from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource
  3. with DAG("awel_hello_world") as dag:
  4. input_task = InputOperator(
  5. input_source=SimpleCallDataInputSource()
  6. )
  7. task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
  8. input_task >> task
  9. dag.visualize_dag()
  10. asyncio.run(task.call(call_data="world"))

重新运行脚本

  1. poetry run python awel_tutorial/first_hello_world_two_tasks.py

你将会观察到如下输出:

  1. InputOperator(node_id=a307d921-3bd0-423d-80f0-30aa25aaa9fe)
  2. -> MapOperator(node_id=bdb335f8-179d-4e08-b1ec-3b58a52d1e84)
  3. Hello, world!

DAG 输出图如下:

Hello World - 图1