第一个自定义算子

使用AWEL创建自定义算子非常容易。在本教程中,我们将创建一个自定义的算子,输出 “Hello World” 信息。

在实际的使用中,绝大多数情况下,只需要继承基础算子对相关的方法进行重写,即可达到目的。

awel_tutorial 目录下创建一个文件,文件名为: hello_world_custom_operator.py

  1. import asyncio
  2. from dbgpt.core.awel import DAG, MapOperator
  3. class HelloWorldOperator(MapOperator[str, None]):
  4. async def map(self, x: str) -> None:
  5. print(f"Hello, {x}!")
  6. with DAG("awel_hello_world") as dag:
  7. task = HelloWorldOperator()
  8. asyncio.run(task.call(call_data="world"))

运行命令执行上面的程序, 你将在终端看到 “Hello World” 输出。

  1. poetry run python awel_tutorial/hello_world_custom_operator.py
  2. > Hello, world!

第一个自定义流式算子

我们来创建一个流算子,可以流输出从0 到 n-1, 然后将另一个流运算符中的每个数据加倍。

在awel_tutorial 目录创建一个custom_streaming_operator.py文件。

  1. import asyncio
  2. from typing import AsyncIterator
  3. from dbgpt.core.awel import DAG, StreamifyAbsOperator, TransformStreamAbsOperator
  4. class NumberProducerOperator(StreamifyAbsOperator[int, int]):
  5. async def streamify(self, n: int) -> AsyncIterator[int]:
  6. for i in range(n):
  7. yield i
  8. class NumberDoubleOperator(TransformStreamAbsOperator[int, int]):
  9. async def transform_stream(self, it: AsyncIterator) -> AsyncIterator[int]:
  10. async for i in it:
  11. # Double the number
  12. yield i * 2
  13. with DAG("numbers_dag") as dag:
  14. task = NumberProducerOperator()
  15. double_task = NumberDoubleOperator()
  16. task >> double_task
  17. async def helper_call_fn(t, n: int):
  18. # Call the streaming operator by `call_stream` method
  19. async for i in await t.call_stream(call_data=n):
  20. print(i)
  21. asyncio.run(helper_call_fn(double_task, 10))

运行下面的命令,执行对应的代码, 查看输出:

  1. poetry run python awel_tutorial/custom_streaming_operator.py
  2. 0
  3. 2
  4. 4
  5. 6
  6. 8
  7. 10
  8. 12
  9. 14
  10. 16
  11. 18

在此案例中,我们调用 call_stream 方法来执行流式操作,注意需要用 await 来获得流式输出结果