首先我们来看一个基础入门的Hello World程序。
前置准备
在本教程中,我们使用poetry
来管理我们项目的环境依赖。 如果你未安装poetry
,你可以通过下面的教程进行安装。poetry安装
pip install poetry
创建项目
在开始之前,你首先需要创建一个项目,你可以起任意你喜欢的名称作为项目名,在本教程中,我们统一同<font style="color:rgb(28, 30, 33);background-color:rgb(246, 247, 248);">awel-tutorial</font>
作为项目名称。
接下来我们来进行实际的操作。 首先你需要创建一个工作目录,你可以打开终端创建一个工作目录。
mkdir -p ~/projects
cd ~/projects
然后运行下面的命令创建工作目录,并进入到工作目录下。
poetry new awel-tutorial
cd awel-tutorial
你将看到如下的目录树结构:
awel-tutorial
├── README.md
├── awel_tutorial
│ └── __init__.py
├── pyproject.toml
└── tests
└── __init__.py
添加DB-GPT依赖
poetry add "dbgpt>=0.5.1rc0"
Hello World程序
接下来创建一个简单的DAG程序,在终端输出为”Hello World”。
首先在awel_tutorial
目录下创建一个名称为<font style="color:rgb(28, 30, 33);background-color:rgb(246, 247, 248);">first_hello_world.py</font>
的文件, 然后在文件中编写如下的代码:
from dbgpt.core.awel import DAG, MapOperator
with DAG("awel_hello_world") as dag:
task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
task._blocking_call(call_data="world")
查看项目的目录树结构:
awel-tutorial
├── README.md
├── awel_tutorial
│ ├── __init__.py
│ └── first_hello_world.py
├── poetry.lock
├── pyproject.toml
└── tests
└── __init__.py
通过命令执行代码,查看终端输出。
poetry run python awel_tutorial/first_hello_world.py
Hello, world!
异步Hello World程序
AWEL中所有的任务调度都是异步的。 此示例演示如何使用asyncio
运行任务。创建一个新的文件在awel_tutorial
目录下,文件名称为 first_hello_world_asyncio.py
并输入如下的代码:
import asyncio
from dbgpt.core.awel import DAG, MapOperator
with DAG("awel_hello_world") as dag:
task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
asyncio.run(task.call(call_data="world"))
运行如下命令进行执行:
poetry run python awel_tutorial/first_hello_world_asyncio.py
Hello, world!
有两个任务的Hello World程序
当我们调用节点的时候,我们可以进行数据传递。 本案例中展示如何通过 InputOperator
算子进行数据的传递。 创建一个文件名为 <font style="color:rgb(28, 30, 33);background-color:rgb(246, 247, 248);">first_hello_world_two_tasks.py</font>
的文件,并添加如下的代码:
import asyncio
from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource
with DAG("awel_hello_world") as dag:
input_task = InputOperator(
input_source=SimpleCallDataInputSource()
)
task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
input_task >> task
asyncio.run(task.call(call_data="world"))
运行代码进行测试
poetry run python awel_tutorial/first_hello_world_two_tasks.py
Hello, world!
在本案例中,我们有两个任务。 第一个任务是 InputOperator 算子,它从 SimpleCallDataInputSource
算子中获取数据。 第二个任务是 MapOperator ,从第一个任务中获取数据并将输出结果”Hello world!”打印到终端。同时我们使用 >>
操作符来连接两个任务。 此操作符被用来定义两个任务之间的父 -> 子关系,同时也表达了任务之间的依赖。 当然也可以通过 set_downstream
方法来定义两个任务之间的依赖关系。
input_task.set_downstream(task)
上面的单任务DAG是两任务DAG的一种特例, 其中未使用 InputOperator
with DAG("awel_hello_world") as dag:
task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
asyncio.run(task.call(call_data="world"))
DAG可视化
首先需要安装<font style="color:rgb(28, 30, 33);">graphviz</font>
包来进行DAG的可视化。
poetry add graphviz
更新first_hello_world_two_tasks.py
文件,添加如下的代码
dag.visualize_dag()
完整的代码如下:
import asyncio
from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource
with DAG("awel_hello_world") as dag:
input_task = InputOperator(
input_source=SimpleCallDataInputSource()
)
task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
input_task >> task
dag.visualize_dag()
asyncio.run(task.call(call_data="world"))
重新运行脚本
poetry run python awel_tutorial/first_hello_world_two_tasks.py
你将会观察到如下输出:
InputOperator(node_id=a307d921-3bd0-423d-80f0-30aa25aaa9fe)
-> MapOperator(node_id=bdb335f8-179d-4e08-b1ec-3b58a52d1e84)
Hello, world!
DAG 输出图如下: