第一个自定义算子
使用AWEL创建自定义算子非常容易。在本教程中,我们将创建一个自定义的算子,输出 “Hello World” 信息。
在实际的使用中,绝大多数情况下,只需要继承基础算子对相关的方法进行重写,即可达到目的。
在 awel_tutorial
目录下创建一个文件,文件名为: hello_world_custom_operator.py
import asyncio
from dbgpt.core.awel import DAG, MapOperator
class HelloWorldOperator(MapOperator[str, None]):
async def map(self, x: str) -> None:
print(f"Hello, {x}!")
with DAG("awel_hello_world") as dag:
task = HelloWorldOperator()
asyncio.run(task.call(call_data="world"))
运行命令执行上面的程序, 你将在终端看到 “Hello World” 输出。
poetry run python awel_tutorial/hello_world_custom_operator.py
> Hello, world!
第一个自定义流式算子
我们来创建一个流算子,可以流输出从0 到 n-1
, 然后将另一个流运算符中的每个数据加倍。
在awel_tutorial 目录创建一个custom_streaming_operator.py
文件。
import asyncio
from typing import AsyncIterator
from dbgpt.core.awel import DAG, StreamifyAbsOperator, TransformStreamAbsOperator
class NumberProducerOperator(StreamifyAbsOperator[int, int]):
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i
class NumberDoubleOperator(TransformStreamAbsOperator[int, int]):
async def transform_stream(self, it: AsyncIterator) -> AsyncIterator[int]:
async for i in it:
# Double the number
yield i * 2
with DAG("numbers_dag") as dag:
task = NumberProducerOperator()
double_task = NumberDoubleOperator()
task >> double_task
async def helper_call_fn(t, n: int):
# Call the streaming operator by `call_stream` method
async for i in await t.call_stream(call_data=n):
print(i)
asyncio.run(helper_call_fn(double_task, 10))
运行下面的命令,执行对应的代码, 查看输出:
poetry run python awel_tutorial/custom_streaming_operator.py
0
2
4
6
8
10
12
14
16
18
在此案例中,我们调用 call_stream
方法来执行流式操作,注意需要用 await
来获得流式输出结果