ReduceStreamOperator
用来将流式数据转换为非流式数据。
有两种方式可以使用ReduceStreamOperator
算子。
通过reduce 函数构建 ReduceStreamOperator
from dbgpt.core.awel import DAG, ReduceStreamOperator
with DAG("awel_reduce_operator") as dag:
task = ReduceStreamOperator(reduce_function=lambda x, y: x + y)
实现一个自定义的 ReduceStreamOperator
算子
from dbgpt.core.awel import DAG, ReduceStreamOperator
class MySumOperator(ReduceStreamOperator[int, int]):
async def reduce(self, x: int, y: int) -> int:
return x + y
with DAG("awel_reduce_operator") as dag:
task = MySumOperator()
例子
在awel_tutorial 目录下创建一个名为 reduce_operator_sum_numbers.py
的文件,其中代码如下
import asyncio
from typing import AsyncIterator
from dbgpt.core.awel import DAG, ReduceStreamOperator, StreamifyAbsOperator
class NumberProducerOperator(StreamifyAbsOperator[int, int]):
"""Create a stream of numbers from 0 to `n-1`"""
async def streamify(self, n: int) -> AsyncIterator[int]:
for i in range(n):
yield i
class MySumOperator(ReduceStreamOperator[int, int]):
async def reduce(self, x: int, y: int) -> int:
return x + y
with DAG("sum_numbers_dag") as dag:
task = NumberProducerOperator()
sum_task = MySumOperator()
task >> sum_task
o1 = asyncio.run(sum_task.call(call_data=5))
if o1 == sum(range(5)):
print(f"Success! n is 5, sum is {o1}")
else:
print("Failed")
o2 = asyncio.run(sum_task.call(call_data=10))
if o2 == sum(range(10)):
print(f"Success! n is 10, sum is {o2}")
else:
print("Failed")
运行如下命令查看代码输出
poetry run python awel_tutorial/reduce_operator_sum_numbers.py
Success! n is 5, sum is 10
Success! n is 10, sum is 45