ReduceStreamOperator 用来将流式数据转换为非流式数据。

有两种方式可以使用ReduceStreamOperator 算子。

通过reduce 函数构建 ReduceStreamOperator

  1. from dbgpt.core.awel import DAG, ReduceStreamOperator
  2. with DAG("awel_reduce_operator") as dag:
  3. task = ReduceStreamOperator(reduce_function=lambda x, y: x + y)

实现一个自定义的 ReduceStreamOperator 算子

  1. from dbgpt.core.awel import DAG, ReduceStreamOperator
  2. class MySumOperator(ReduceStreamOperator[int, int]):
  3. async def reduce(self, x: int, y: int) -> int:
  4. return x + y
  5. with DAG("awel_reduce_operator") as dag:
  6. task = MySumOperator()

例子

在awel_tutorial 目录下创建一个名为 reduce_operator_sum_numbers.py的文件,其中代码如下

  1. import asyncio
  2. from typing import AsyncIterator
  3. from dbgpt.core.awel import DAG, ReduceStreamOperator, StreamifyAbsOperator
  4. class NumberProducerOperator(StreamifyAbsOperator[int, int]):
  5. """Create a stream of numbers from 0 to `n-1`"""
  6. async def streamify(self, n: int) -> AsyncIterator[int]:
  7. for i in range(n):
  8. yield i
  9. class MySumOperator(ReduceStreamOperator[int, int]):
  10. async def reduce(self, x: int, y: int) -> int:
  11. return x + y
  12. with DAG("sum_numbers_dag") as dag:
  13. task = NumberProducerOperator()
  14. sum_task = MySumOperator()
  15. task >> sum_task
  16. o1 = asyncio.run(sum_task.call(call_data=5))
  17. if o1 == sum(range(5)):
  18. print(f"Success! n is 5, sum is {o1}")
  19. else:
  20. print("Failed")
  21. o2 = asyncio.run(sum_task.call(call_data=10))
  22. if o2 == sum(range(10)):
  23. print(f"Success! n is 10, sum is {o2}")
  24. else:
  25. print("Failed")

运行如下命令查看代码输出

  1. poetry run python awel_tutorial/reduce_operator_sum_numbers.py
  2. Success! n is 5, sum is 10
  3. Success! n is 10, sum is 45