前提条件

本指南假定您熟悉以下概念:

流式处理对于使基于 LLM 的应用程序对最终用户感觉响应迅速至关重要。

重要的 LangChain 原语如 聊天模型输出解析器提示检索器代理 都实现了 LangChain 可运行接口

该接口提供了两种通用方法来流式传输内容:

  1. 同步 stream 和异步 astream:这是一个默认实现的流式传输方法,它从链中流式传输最终输出
  2. 异步 astream_events 和异步 astream_log:这些方法提供了一种从链中流式传输中间步骤最终输出的方法。

让我们来看看这两种方法,并尝试了解如何使用它们。

使用 Stream

所有 Runnable 对象都实现了一个名为 stream 的同步方法和一个异步变体 astream

这些方法旨在分块流式传输最终输出,每当一个块可用时立即生成该块。

只有当程序中的所有步骤都知道如何处理输入流时,流式传输才有可能;即一次处理一个输入块,并生成相应的输出块。

这种处理的复杂性可能会有所不同,从发出 LLM 生成的标记等简单任务到在整个 JSON 完成之前流式传输 JSON 结果的部分等更具挑战性的任务。

探索流式传输的最佳起点是 LLM 应用程序中最重要的组件——LLM 本身!

LLM 和聊天模型

大型语言模型及其聊天变体是基于 LLM 的应用程序中的主要瓶颈。

大型语言模型可能需要几秒钟才能生成对查询的完整响应。这远比使应用程序对最终用户感觉响应迅速的~200-300 毫秒阈值慢得多。

使应用程序感觉更响应的关键策略是显示中间进度;即,逐字元地流式传输模型的输出。

我们将展示使用聊天模型进行流式传输的示例。从以下选项中选择一个:

OpenAI

  1. pip install -qU langchain-openai
  1. import getpass
  2. import os
  3. os.environ["OPENAI_API_KEY"] = getpass.getpass()
  4. from langchain_openai import ChatOpenAI
  5. model = ChatOpenAI(model="gpt-3.5-turbo-0125")

Anthropic

  1. pip install -qU langchain-anthropic
  1. import getpass
  2. import os
  3. os.environ["ANTHROPIC_API_KEY"] = getpass.getpass()
  4. from langchain_anthropic import ChatAnthropic
  5. model = ChatAnthropic(model="claude-3-sonnet-20240229")

Azure

  1. pip install -qU langchain-openai
  1. import getpass
  2. import os
  3. os.environ["AZURE_OPENAI_API_KEY"] = getpass.getpass()
  4. from langchain_openai import AzureChatOpenAI
  5. model = AzureChatOpenAI(
  6. azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
  7. azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"],
  8. openai_api_version=os.environ["AZURE_OPENAI_API_VERSION"],
  9. )

Google

  1. pip install -qU langchain-google-vertexai
  1. import getpass
  2. import os
  3. os.environ["GOOGLE_API_KEY"] = getpass.getpass()
  4. from langchain_google_vertexai import ChatVertexAI
  5. model = ChatVertexAI(model="gemini-pro")

Cohere

  1. pip install -qU langchain-cohere
  1. import getpass
  2. import os
  3. os.environ["COHERE_API_KEY"] = getpass.getpass()
  4. from langchain_cohere import ChatCohere
  5. model = ChatCohere(model="command-r")

FireworksAI

  1. pip install -qU langchain-fireworks
  1. import getpass
  2. import os
  3. os.environ["FIREWORKS_API_KEY"] = getpass.getpass()
  4. from langchain_fireworks import ChatFireworks
  5. model = ChatFireworks(model="accounts/fireworks/models/mixtral-8x7b-instruct")

MistralAI

  1. pip install -qU langchain-mistralai
  1. import getpass
  2. import os
  3. os.environ["MISTRAL_API_KEY"] = getpass.getpass()
  4. from langchain_mistralai import ChatMistralAI
  5. model = ChatMistralAI(model="mistral-large-latest")

TogetherAI

  1. pip install -qU langchain-openai
  1. import getpass
  2. import os
  3. os.environ["TOGETHER_API_KEY"] = getpass.getpass()
  4. from langchain_openai import ChatOpenAI
  5. model = ChatOpenAI(
  6. base_url="https://api.together.xyz/v1",
  7. api_key=os.environ["TOGETHER_API_KEY"],
  8. model="mistralai/Mixtral-8x7B-Instruct-v0.1",
  9. )

让我们从同步 stream API 开始:

  1. chunks = []
  2. for chunk in model.stream("what color is the sky?"):
  3. chunks.append(chunk)
  4. print(chunk.content, end="|", flush=True)
  1. The| sky| appears| blue| during| the| day|.

如果你在异步环境中工作,可以考虑使用异步 astream API:

  1. chunks = []
  2. async for chunk in model.astream("what color is the sky?"):
  3. chunks.append(chunk)
  4. print(chunk.content, end="|", flush=True)
  1. The| sky| appears| blue| during| the| day|.

让我们检查其中一个块

  1. chunks[0]
  1. AIMessageChunk(content='The', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')

我们得到了一个称为 AIMessageChunk 的东西。这个块代表 AIMessage 的一部分。

消息块是累加设计的——可以简单地将它们加起来以获得到目前为止的响应状态!

  1. chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]
  1. AIMessageChunk(content='The sky appears blue during', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')

几乎所有 LLM 应用程序都涉及的不仅仅是调用语言模型的步骤。

让我们使用 LangChain 表达式语言 (LCEL) 构建一个简单的链,该链将提示、模型和解析器结合起来,并验证流式传输是否有效。

我们将使用 StrOutputParser 来解析模型的输出。这是一个简单的解析器,它从 AIMessageChunk 中提取 content 字段,给我们模型返回的 token

小贴士

LCEL 是一种通过将不同的 LangChain 原语连接在一起来指定“程序”的声明性方式。使用 LCEL 创建的链受益于 streamastream 的自动实现,允许流式传输最终输出。事实上,使用 LCEL 创建的链实现了整个标准的可运行接口。

  1. from langchain_core.output_parsers import StrOutputParser
  2. from langchain_core.prompts import ChatPromptTemplate
  3. prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
  4. parser = StrOutputParser()
  5. chain = prompt | model | parser
  6. async for chunk in chain.astream({"topic": "parrot"}):
  7. print(chunk, end="|", flush=True)

API 参考:StrOutputParser | ChatPromptTemplate

  1. Here|'s| a| joke| about| a| par|rot|:|
  2. A man| goes| to| a| pet| shop| to| buy| a| par|rot|.| The| shop| owner| shows| him| two| stunning| pa|rr|ots| with| beautiful| pl|um|age|.|
  3. "|There|'s| a| talking| par|rot| an|d a| non|-|talking| par|rot|,"| the| owner| says|.| "|The| talking| par|rot| costs| $|100|,| an|d the| non|-|talking| par|rot| is| $|20|."|
  4. The| man| says|,| "|I|'ll| take| the| non|-|talking| par|rot| at| $|20|."|
  5. He| pays| an|d leaves| with| the| par|rot|.| As| he|'s| walking| down| the| street|,| the| par|rot| looks| up| at| him| an|d says|,| "|You| know|,| you| really| are| a| stupi|d man|!"|
  6. The| man| is| stun|ne|d an|d looks| at| the| par|rot| in| dis|bel|ief|.| The| par|rot| continues|,| "|Yes|,| you| got| r|ippe|d off| big| time|!| I| can| talk| just| as| well| as| that| other| par|rot|,| an|d you| only| pai|d $|20| |for| me|!"|

请注意,即使我们在上述链的末尾使用 parser,我们仍然得到了流式输出。parser 对每个流式块单独操作。许多 LCEL 原语 也支持这种变换式的传递流式处理,这在构建应用程序时非常方便。

自定义函数可以 设计为返回生成器,这些生成器能够对流进行操作。

某些可运行的对象,如 提示模板聊天模型,无法处理单个块,而是会聚合所有先前的步骤。此类可运行对象可能会中断流式处理过程。

注意

LangChain 表达式语言允许你将链的构建与其使用模式(例如,同步/异步,批处理/流式处理等)分开。如果这与你正在构建的内容无关,你也可以依靠标准的命令式编程方法,分别调用每个组件上的 invokebatchstream,将结果分配给变量,然后根据需要在下游使用它们。

使用输入流

如果你想在生成输出时流式传输 JSON,该怎么办?

如果依赖 json.loads 来解析部分 JSON,解析将失败,因为部分 JSON 不是有效的 JSON。

你可能会完全不知道该怎么办,并声称无法流式传输 JSON。

实际上是有办法的——解析器需要对输入流进行操作,并尝试将部分 JSON“自动完成”为有效状态。

让我们看看这样的解析器在实际中的应用,以理解这意味着什么。

  1. import json
  2. def json_stream_parser(input_stream):
  3. buffer = ""
  4. for chunk in input_stream:
  5. buffer += chunk
  6. try:
  7. data = json.loads(buffer)
  8. yield data
  9. buffer = ""
  10. except json.JSONDecodeError:
  11. continue
  12. # 示例使用
  13. input_stream = ['{"key1": "val', 'ue1", "}", '{"key2":', ' "value2"}']
  14. for parsed_json in json_stream_parser(input_stream):
  15. print(parsed_json)

这个示例展示了如何流式解析 JSON 输出。在这个例子中,json_stream_parser 函数接受一个输入流,将其缓冲并尝试解析 JSON。如果解析失败(因为 JSON 仍然不完整),它将继续累积更多的块,直到可以成功解析为止。

这种方法使得即使在输出 JSON 尚未完全生成时,也可以流式传输和解析 JSON。

  1. from langchain_core.output_parsers import JsonOutputParser
  2. chain = (
  3. model | JsonOutputParser()
  4. ) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
  5. async for text in chain.astream(
  6. "output a list of the countries france, spain and japan and their populations in JSON format. "
  7. 'Use a dict with an outer key of "countries" which contains a list of countries. '
  8. "Each country should have the key `name` and `population`"
  9. ):
  10. print(text, flush=True)

API 参考:JsonOutputParser

  1. {}
  2. {'countries': []}
  3. {'countries': [{}]}
  4. {'countries': [{'name': ''}]}
  5. {'countries': [{'name': 'France'}]}
  6. {'countries': [{'name': 'France', 'population': 67}]}
  7. {'countries': [{'name': 'France', 'population': 67413}]}
  8. {'countries': [{'name': 'France', 'population': 67413000}]}
  9. {'countries': [{'name': 'France', 'population': 67413000}, {}]}
  10. {'countries': [{'name': 'France', 'population': 67413000}, {'name': ''}]}
  11. {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}
  12. {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}
  13. {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351}]}
  14. {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}]}
  15. {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {}]}
  16. {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': ''}]}
  17. {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan'}]}
  18. {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125}]}
  19. {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584}]}
  20. {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584000}]}

现在,让我们破坏流式传输。我们将使用之前的示例,并在末尾附加一个提取函数,从最终的 JSON 中提取国家名称。

危险

在链中任何操作最终输入而不是输入流的步骤,都可能通过 streamastream 破坏流式功能。

提示

稍后,我们将讨论 astream_events API,该 API 从中间步骤流式传输结果。即使链中包含仅操作最终输入的步骤,该 API 也会从中间步骤流式传输结果。

以下是如何破坏流式传输的示例:

  1. import json
  2. def json_stream_parser(input_stream):
  3. buffer = ""
  4. for chunk in input_stream:
  5. buffer += chunk
  6. try:
  7. data = json.loads(buffer)
  8. yield data
  9. buffer = ""
  10. except json.JSONDecodeError:
  11. continue
  12. def extract_countries(json_data):
  13. if "countries" in json_data:
  14. return json_data["countries"]
  15. return []
  16. # 示例使用
  17. input_stream = ['{"countries": ["USA",', '"Canada", "Mexico"]}', '{"cities": ["New York",', ' "Toronto"]}']
  18. # 这是流式解析 JSON
  19. for parsed_json in json_stream_parser(input_stream):
  20. print(parsed_json)
  21. # 这是从最终 JSON 中提取国家的步骤
  22. for parsed_json in json_stream_parser(input_stream):
  23. countries = extract_countries(parsed_json)
  24. print(countries)

在这个示例中,json_stream_parser 函数继续逐块解析 JSON 数据。然而,extract_countries 函数则需要完整的 JSON 数据来提取国家名称。这会破坏流式传输的功能,因为提取国家的操作需要等待整个 JSON 完成后才能执行。

如果我们尝试流式传输每个步骤的中间结果,则需要使用更高级的 API,如 astream_events。以下是如何实现的示例:

  1. async def astream_events(chain, input):
  2. events = []
  3. async for event in chain.astream_events(input):
  4. events.append(event)
  5. print(event)
  6. return events
  7. # 示例使用
  8. import asyncio
  9. input_data = 'some input data for chain'
  10. async def main():
  11. events = await astream_events(chain, input_data)
  12. for event in events:
  13. print(event)
  14. asyncio.run(main())

通过这种方式,即使链中包含仅操作最终输入的步骤,也可以从中间步骤流式传输结果。

  1. from langchain_core.output_parsers import (
  2. JsonOutputParser,
  3. )
  4. # A function that operates on finalized inputs
  5. # rather than on an input_stream
  6. def _extract_country_names(inputs):
  7. """A function that does not operates on input streams and breaks streaming."""
  8. if not isinstance(inputs, dict):
  9. return ""
  10. if "countries" not in inputs:
  11. return ""
  12. countries = inputs["countries"]
  13. if not isinstance(countries, list):
  14. return ""
  15. country_names = [
  16. country.get("name") for country in countries if isinstance(country, dict)
  17. ]
  18. return country_names
  19. chain = model | JsonOutputParser() | _extract_country_names
  20. async for text in chain.astream(
  21. "output a list of the countries france, spain and japan and their populations in JSON format. "
  22. 'Use a dict with an outer key of "countries" which contains a list of countries. '
  23. "Each country should have the key `name` and `population`"
  24. ):
  25. print(text, end="|", flush=True)

API 参考: JsonOutputParser

  1. ['France', 'Spain', 'Japan']|

生成器函数

让我们使用可以对输入流进行操作的生成器函数来修复流式传输。

提示

生成器函数(使用 yield 的函数)允许编写对输入流进行操作的代码。

下面是如何实现这一点的示例:

  1. import json
  2. # 解析JSON流的生成器函数
  3. def json_stream_parser(input_stream):
  4. buffer = ""
  5. for chunk in input_stream:
  6. buffer += chunk
  7. try:
  8. data = json.loads(buffer)
  9. yield data
  10. buffer = ""
  11. except json.JSONDecodeError:
  12. continue
  13. # 提取国家名称的生成器函数
  14. def extract_countries_from_stream(input_stream):
  15. for json_data in json_stream_parser(input_stream):
  16. if "countries" in json_data:
  17. yield json_data["countries"]
  18. else:
  19. yield []
  20. # 示例使用
  21. input_stream = ['{"countries": ["France",', '"Spain", "Japan"]}', '{"cities": ["Paris",', ' "Madrid"]}']
  22. for countries in extract_countries_from_stream(input_stream):
  23. print(countries)

在这个示例中,json_stream_parser 生成器函数解析输入流中的 JSON 数据块,并生成已解析的 JSON 数据。extract_countries_from_stream 生成器函数接收这些已解析的 JSON 数据块,并从中提取国家名称。

这样,提取国家的操作可以逐块进行,而不是等待整个 JSON 完成,从而保持了流式传输的功能。

  1. from langchain_core.output_parsers import JsonOutputParser
  2. async def _extract_country_names_streaming(input_stream):
  3. """A function that operates on input streams."""
  4. country_names_so_far = set()
  5. async for input in input_stream:
  6. if not isinstance(input, dict):
  7. continue
  8. if "countries" not in input:
  9. continue
  10. countries = input["countries"]
  11. if not isinstance(countries, list):
  12. continue
  13. for country in countries:
  14. name = country.get("name")
  15. if not name:
  16. continue
  17. if name not in country_names_so_far:
  18. yield name
  19. country_names_so_far.add(name)
  20. chain = model | JsonOutputParser() | _extract_country_names_streaming
  21. async for text in chain.astream(
  22. "output a list of the countries france, spain and japan and their populations in JSON format. "
  23. 'Use a dict with an outer key of "countries" which contains a list of countries. '
  24. "Each country should have the key `name` and `population`",
  25. ):
  26. print(text, end="|", flush=True)

API 参考: JsonOutputParser

  1. France|Spain|Japan|

注意

由于上述代码依赖于 JSON 自动完成,你可能会看到国家名称的部分名称(例如 SpSpain),这并不是我们想要的提取结果!

我们专注于流式传输的概念,而不一定是链的结果。

非流式组件

某些内置组件如 Retrievers 不提供任何 streaming 功能。如果我们尝试对它们进行 streaming 会发生什么? 🤨

在这种情况下,流式传输将会中断,因为这些组件需要完整的数据才能进行处理。让我们看看一个例子:

  1. # 示例非流式组件
  2. class NonStreamingRetriever:
  3. def retrieve(self, query):
  4. # 模拟非流式的检索操作
  5. return ["Document 1", "Document 2", "Document 3"]
  6. # 流式处理链中的非流式组件
  7. def process_with_retriever(input_stream, retriever):
  8. for chunk in input_stream:
  9. # 非流式检索器在这里中断流式传输
  10. results = retriever.retrieve(chunk)
  11. for result in results:
  12. yield result
  13. # 示例使用
  14. input_stream = ["query1", "query2", "query3"]
  15. retriever = NonStreamingRetriever()
  16. for result in process_with_retriever(input_stream, retriever):
  17. print(result)

在这个示例中,NonStreamingRetriever 组件需要完整的查询才能进行检索操作,并返回结果。这会中断流式传输,因为它不能逐块处理输入。

为了解决这个问题,可以将非流式组件与流式组件隔离开来,并确保仅在整个输入流完成后调用非流式组件的操作。这可以通过在链中使用中间步骤来实现。

但是,如果我们确实需要从非流式组件中获取中间步骤的结果,我们可以使用 astream_events API,如下所示:

  1. async def astream_events(chain, input):
  2. events = []
  3. async for event in chain.astream_events(input):
  4. events.append(event)
  5. print(event)
  6. return events
  7. # 示例使用
  8. import asyncio
  9. input_data = 'some input data for chain'
  10. async def main():
  11. events = await astream_events(chain, input_data)
  12. for event in events:
  13. print(event)
  14. asyncio.run(main())

这样,即使链中包含仅操作最终输入的步骤,astream_events 也可以从中间步骤流式传输结果。

  1. from langchain_community.vectorstores import FAISS
  2. from langchain_core.output_parsers import StrOutputParser
  3. from langchain_core.prompts import ChatPromptTemplate
  4. from langchain_core.runnables import RunnablePassthrough
  5. from langchain_openai import OpenAIEmbeddings
  6. template = """Answer the question based only on the following context:
  7. {context}
  8. Question: {question}
  9. """
  10. prompt = ChatPromptTemplate.from_template(template)
  11. vectorstore = FAISS.from_texts(
  12. ["harrison worked at kensho", "harrison likes spicy food"],
  13. embedding=OpenAIEmbeddings(),
  14. )
  15. retriever = vectorstore.as_retriever()
  16. chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
  17. chunks

API 参考: FAISS | StrOutputParser | ChatPromptTemplate | RunnablePassthrough | OpenAIEmbeddings

  1. [[Document(page_content='harrison worked at kensho'), Document(page_content='harrison likes spicy food')]]

流式传输刚刚从该组件中产生了最终结果。

这很好 🥹!并非所有组件都必须实现流式传输——在某些情况下,流式传输要么是不必要的,要么很困难,要么根本没有意义。

提示

使用非流式组件构建的 LCEL 链,在许多情况下仍然可以流式传输,流式传输的部分输出将从链中的最后一个非流式步骤之后开始。

  1. retrieval_chain = (
  2. {
  3. "context": retriever.with_config(run_name="Docs"),
  4. "question": RunnablePassthrough(),
  5. }
  6. | prompt
  7. | model
  8. | StrOutputParser()
  9. )
  1. for chunk in retrieval_chain.stream(
  2. "Where did harrison work? " "Write 3 made up sentences about this place."
  3. ):
  4. print(chunk, end="|", flush=True)
  1. Base|d on| the| given| context|,| Harrison| worke|d at| K|ens|ho|.|
  2. Here| are| |3| |made| up| sentences| about| this| place|:|
  3. 1|.| K|ens|ho| was| a| cutting|-|edge| technology| company| known| for| its| innovative| solutions| in| artificial| intelligence| an|d data| analytics|.|
  4. 2|.| The| modern| office| space| at| K|ens|ho| feature|d open| floor| plans|,| collaborative| work|sp|aces|,| an|d a| vib|rant| atmosphere| that| fos|tere|d creativity| an|d team|work|.|
  5. 3|.| With| its| prime| location| in| the| heart| of| the| city|,| K|ens|ho| attracte|d top| talent| from| aroun|d the| worl|d,| creating| a| diverse| an|d dynamic| work| environment|.|

使用流式事件

事件流是一个beta API。这个 API 可能会根据反馈略有变化。

注意

本指南演示了 V2 API,并需要 langchain-core >= 0.2。对于与旧版本 LangChain 兼容的 V1 API,请参阅这里

  1. import langchain_core
  2. print(langchain_core.__version__)

为了使 astream_events API 正常工作:

  • 在代码中尽可能使用 async(例如,异步工具等)
  • 如果定义自定义函数/可运行时,请传递回调函数
  • 每当在没有 LCEL 的情况下使用可运行组件时,请确保在 LLM 上调用 .astream() 而不是 .ainvoke,以强制 LLM 流式传输令牌。
  • 如果有任何问题,请告诉我们! :)

事件参考

下面是一个参考表,显示各种可运行对象可能发出的一些事件。

注意

当流式传输正确实现时,可运行对象的输入将直到完全消耗输入流后才会知道。这意味着 inputs 通常仅包含在 end 事件中,而不是在 start 事件中。

事件 名称 输入 输出
on_chat_model_start [model name] {“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream [model name] AIMessageChunk(content=”hello”)
on_chat_model_end [model name] {“messages”: [[SystemMessage, HumanMessage]]} AIMessageChunk(content=”hello world”)
on_llm_start [model name] {‘input’: ‘hello’}
on_llm_stream [model name] ‘Hello’
on_llm_end [model name] ‘Hello human!’
on_chain_start format_docs
on_chain_stream format_docs “hello world!, goodbye world!”
on_chain_end format_docs [Document(…)] “hello world!, goodbye world!”
on_tool_start some_tool {“x”: 1, “y”: “2”}
on_tool_end some_tool {“x”: 1, “y”: “2”}
on_retriever_start [retriever name] {“query”: “hello”}
on_retriever_end [retriever name] {“query”: “hello”} [Document(…), ..]
on_prompt_start [template_name] {“question”: “hello”}
on_prompt_end [template_name] {“question”: “hello”} ChatPromptValue(messages: [SystemMessage, …])

聊天模型

让我们首先看一下聊天模型产生的事件。

  1. events = []
  2. async for event in model.astream_events("hello", version="v2"):
  3. events.append(event)
  1. /home/eugene/src/langchain/libs/core/langchain_core/_api/beta_decorator.py:87: LangChainBetaWarning: This API is in beta and may change in the future.
  2. warn_beta(

备注

嘿,API 中那个有趣的 version=”v2” 参数是什么意思?😾

这是一个 beta API,我们几乎肯定会对它进行一些修改(事实上,我们已经修改了!)。

这个版本参数可以让我们尽量减少对您代码的破坏性修改。

简而言之,我们现在让你烦恼,以后就不用再让你烦恼了。

v2 “仅适用于 langchain-core>=0.2.0。

让我们来看看开始事件和结束事件中的几个事件。

  1. events[:3]
  1. [{'event': 'on_chat_model_start',
  2. 'data': {'input': 'hello'},
  3. 'name': 'ChatAnthropic',
  4. 'tags': [],
  5. 'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
  6. 'metadata': {}},
  7. {'event': 'on_chat_model_stream',
  8. 'data': {'chunk': AIMessageChunk(content='Hello', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},
  9. 'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
  10. 'name': 'ChatAnthropic',
  11. 'tags': [],
  12. 'metadata': {}},
  13. {'event': 'on_chat_model_stream',
  14. 'data': {'chunk': AIMessageChunk(content='!', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},
  15. 'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
  16. 'name': 'ChatAnthropic',
  17. 'tags': [],
  18. 'metadata': {}}]
  1. events[-2:]
  1. [{'event': 'on_chat_model_stream',
  2. 'data': {'chunk': AIMessageChunk(content='?', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},
  3. 'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
  4. 'name': 'ChatAnthropic',
  5. 'tags': [],
  6. 'metadata': {}},
  7. {'event': 'on_chat_model_end',
  8. 'data': {'output': AIMessageChunk(content='Hello! How can I assist you today?', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},
  9. 'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
  10. 'name': 'ChatAnthropic',
  11. 'tags': [],
  12. 'metadata': {}}]

让我们重新审视一下解析流式JSON的示例链,以探索流式事件API。

  1. chain = (
  2. model | JsonOutputParser()
  3. ) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
  4. events = [
  5. event
  6. async for event in chain.astream_events(
  7. "output a list of the countries france, spain and japan and their populations in JSON format. "
  8. 'Use a dict with an outer key of "countries" which contains a list of countries. '
  9. "Each country should have the key `name` and `population`",
  10. version="v2",
  11. )
  12. ]

如果你检查前几个事件,你会注意到有3个不同的开始事件,而不是2个开始事件。

这三个开始事件对应于:

  1. 链(模型 + 解析器)
  2. 模型
  3. 解析器
  1. events[:3]
  1. [{'event': 'on_chain_start',
  2. 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
  3. 'name': 'RunnableSequence',
  4. 'tags': [],
  5. 'run_id': '4765006b-16e2-4b1d-a523-edd9fd64cb92',
  6. 'metadata': {}},
  7. {'event': 'on_chat_model_start',
  8. 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}},
  9. 'name': 'ChatAnthropic',
  10. 'tags': ['seq:step:1'],
  11. 'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
  12. 'metadata': {}},
  13. {'event': 'on_chat_model_stream',
  14. 'data': {'chunk': AIMessageChunk(content='{', id='run-0320c234-7b52-4a14-ae4e-5f100949e589')},
  15. 'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
  16. 'name': 'ChatAnthropic',
  17. 'tags': ['seq:step:1'],
  18. 'metadata': {}}]

如果你查看最后的3个事件,你会看到什么?中间的事件呢?

让我们使用这个API来输出模型和解析器的流事件。我们忽略开始事件、结束事件以及来自链的事件。

  1. num_events = 0
  2. async for event in chain.astream_events(
  3. "output a list of the countries france, spain and japan and their populations in JSON format. "
  4. 'Use a dict with an outer key of "countries" which contains a list of countries. '
  5. "Each country should have the key `name` and `population`",
  6. version="v2",
  7. ):
  8. kind = event["event"]
  9. if kind == "on_chat_model_stream":
  10. print(
  11. f"Chat model chunk: {repr(event['data']['chunk'].content)}",
  12. flush=True,
  13. )
  14. if kind == "on_parser_stream":
  15. print(f"Parser chunk: {event['data']['chunk']}", flush=True)
  16. num_events += 1
  17. if num_events > 30:
  18. # Truncate the output
  19. print("...")
  20. break
  1. Chat model chunk: '{'
  2. Parser chunk: {}
  3. Chat model chunk: '\n '
  4. Chat model chunk: '"'
  5. Chat model chunk: 'countries'
  6. Chat model chunk: '":'
  7. Chat model chunk: ' ['
  8. Parser chunk: {'countries': []}
  9. Chat model chunk: '\n '
  10. Chat model chunk: '{'
  11. Parser chunk: {'countries': [{}]}
  12. Chat model chunk: '\n '
  13. Chat model chunk: '"'
  14. Chat model chunk: 'name'
  15. Chat model chunk: '":'
  16. Chat model chunk: ' "'
  17. Parser chunk: {'countries': [{'name': ''}]}
  18. Chat model chunk: 'France'
  19. Parser chunk: {'countries': [{'name': 'France'}]}
  20. Chat model chunk: '",'
  21. Chat model chunk: '\n '
  22. Chat model chunk: '"'
  23. Chat model chunk: 'population'
  24. ...

因为模型和解析器都支持流式传输,我们可以看到两个组件的实时流事件!很酷,不是吗?🦜

过滤事件

由于这个API产生了大量的事件,能够对事件进行过滤是很有用的。

你可以通过组件的名称、组件的标签或者组件的类型来过滤。

按名称过滤

  1. chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
  2. {"run_name": "my_parser"}
  3. )
  4. max_events = 0
  5. async for event in chain.astream_events(
  6. "output a list of the countries france, spain and japan and their populations in JSON format. "
  7. 'Use a dict with an outer key of "countries" which contains a list of countries. '
  8. "Each country should have the key `name` and `population`",
  9. version="v2",
  10. include_names=["my_parser"],
  11. ):
  12. print(event)
  13. max_events += 1
  14. if max_events > 10:
  15. # Truncate output
  16. print("...")
  17. break
  1. {'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'metadata': {}}
  2. {'event': 'on_parser_stream', 'data': {'chunk': {}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
  3. {'event': 'on_parser_stream', 'data': {'chunk': {'countries': []}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
  4. {'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
  5. {'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': ''}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
  6. {'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France'}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
  7. {'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
  8. {'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
  9. {'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
  10. {'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
  11. {'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {'name': ''}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
  12. ...

根据类型

  1. chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
  2. {"run_name": "my_parser"}
  3. )
  4. max_events = 0
  5. async for event in chain.astream_events(
  6. 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
  7. version="v2",
  8. include_types=["chat_model"],
  9. ):
  10. print(event)
  11. max_events += 1
  12. if max_events > 10:
  13. # Truncate output
  14. print("...")
  15. break
  1. {'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'metadata': {}}
  2. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
  3. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n ', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
  4. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='"', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
  5. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='countries', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
  6. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='":', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
  7. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content=' [', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
  8. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n ', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
  9. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
  10. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n ', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
  11. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='"', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
  12. ...

按标签过滤

注意

标签是由给定可运行组件的子组件继承的。

如果你使用标签来过滤,请确保这是你想要的。

  1. chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})
  2. max_events = 0
  3. async for event in chain.astream_events(
  4. 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
  5. version="v2",
  6. include_tags=["my_chain"],
  7. ):
  8. print(event)
  9. max_events += 1
  10. if max_events > 10:
  11. # Truncate output
  12. print("...")
  13. break
  1. {'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': 'fd68dd64-7a4d-4bdb-a0c2-ee592db0d024', 'metadata': {}}
  2. {'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}, 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'metadata': {}}
  3. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}
  4. {'event': 'on_parser_start', 'data': {}, 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'run_id': 'afde30b9-beac-4b36-b4c7-dbbe423ddcdb', 'metadata': {}}
  5. {'event': 'on_parser_stream', 'data': {'chunk': {}}, 'run_id': 'afde30b9-beac-4b36-b4c7-dbbe423ddcdb', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}}
  6. {'event': 'on_chain_stream', 'data': {'chunk': {}}, 'run_id': 'fd68dd64-7a4d-4bdb-a0c2-ee592db0d024', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}}
  7. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n ', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}
  8. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='"', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}
  9. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='countries', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}
  10. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='":', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}
  11. {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content=' [', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}}
  12. ...

非流式组件

还记得一些组件因为不操作输入流而不能很好地进行流式传输吗?

尽管这样的组件在使用astream时可能会中断最终输出的流式传输,但astream_events仍然会从支持流式传输的中间步骤产生流事件!

  1. # Function that does not support streaming.
  2. # It operates on the finalizes inputs rather than
  3. # operating on the input stream.
  4. def _extract_country_names(inputs):
  5. """A function that does not operates on input streams and breaks streaming."""
  6. if not isinstance(inputs, dict):
  7. return ""
  8. if "countries" not in inputs:
  9. return ""
  10. countries = inputs["countries"]
  11. if not isinstance(countries, list):
  12. return ""
  13. country_names = [
  14. country.get("name") for country in countries if isinstance(country, dict)
  15. ]
  16. return country_names
  17. chain = (
  18. model | JsonOutputParser() | _extract_country_names
  19. ) # This parser only works with OpenAI right now

正如预期的那样,astream API 没有正确工作,因为 _extract_country_names 不在流上操作。

  1. async for chunk in chain.astream(
  2. "output a list of the countries france, spain and japan and their populations in JSON format. "
  3. 'Use a dict with an outer key of "countries" which contains a list of countries. '
  4. "Each country should have the key `name` and `population`",
  5. ):
  6. print(chunk, flush=True)
  1. ['France', 'Spain', 'Japan']

好的,让我们确认一下,使用 astream_events 我们仍然可以从模型和解析器那里看到流式输出。

  1. num_events = 0
  2. async for event in chain.astream_events(
  3. "output a list of the countries france, spain and japan and their populations in JSON format. "
  4. 'Use a dict with an outer key of "countries" which contains a list of countries. '
  5. "Each country should have the key `name` and `population`",
  6. version="v2",
  7. ):
  8. kind = event["event"]
  9. if kind == "on_chat_model_stream":
  10. print(
  11. f"Chat model chunk: {repr(event['data']['chunk'].content)}",
  12. flush=True,
  13. )
  14. if kind == "on_parser_stream":
  15. print(f"Parser chunk: {event['data']['chunk']}", flush=True)
  16. num_events += 1
  17. if num_events > 30:
  18. # Truncate the output
  19. print("...")
  20. break
  1. Chat model chunk: '{'
  2. Parser chunk: {}
  3. Chat model chunk: '\n '
  4. Chat model chunk: '"'
  5. Chat model chunk: 'countries'
  6. Chat model chunk: '":'
  7. Chat model chunk: ' ['
  8. Parser chunk: {'countries': []}
  9. Chat model chunk: '\n '
  10. Chat model chunk: '{'
  11. Parser chunk: {'countries': [{}]}
  12. Chat model chunk: '\n '
  13. Chat model chunk: '"'
  14. Chat model chunk: 'name'
  15. Chat model chunk: '":'
  16. Chat model chunk: ' "'
  17. Parser chunk: {'countries': [{'name': ''}]}
  18. Chat model chunk: 'France'
  19. Parser chunk: {'countries': [{'name': 'France'}]}
  20. Chat model chunk: '",'
  21. Chat model chunk: '\n '
  22. Chat model chunk: '"'
  23. Chat model chunk: 'population'
  24. Chat model chunk: '":'
  25. Chat model chunk: ' '
  26. Chat model chunk: '67'
  27. Parser chunk: {'countries': [{'name': 'France', 'population': 67}]}
  28. ...

传播回调

注意

如果您在工具中调用可运行的组件,您需要将回调传播到该组件;否则,将不会生成任何流事件。

注意

当使用 RunnableLambdas@chain 装饰器时,回调会在后台自动传播。

  1. from langchain_core.runnables import RunnableLambda
  2. from langchain_core.tools import tool
  3. def reverse_word(word: str):
  4. return word[::-1]
  5. reverse_word = RunnableLambda(reverse_word)
  6. @tool
  7. def bad_tool(word: str):
  8. """Custom tool that doesn't propagate callbacks."""
  9. return reverse_word.invoke(word)
  10. async for event in bad_tool.astream_events("hello", version="v2"):
  11. print(event)

API 参考:RunnableLambda | tool

  1. {'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'bad_tool', 'tags': [], 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'metadata': {}}
  2. {'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'metadata': {}}
  3. {'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
  4. {'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'name': 'bad_tool', 'tags': [], 'metadata': {}}

如果你在 Runnable Lambdas 或 @chain 中调用可运行的组件,那么回调将会自动代表你传递。

  1. from langchain_core.runnables import RunnableLambda
  2. async def reverse_and_double(word: str):
  3. return await reverse_word.ainvoke(word) * 2
  4. reverse_and_double = RunnableLambda(reverse_and_double)
  5. await reverse_and_double.ainvoke("1234")
  6. async for event in reverse_and_double.astream_events("1234", version="v2"):
  7. print(event)

API 参考:RunnableLambda

  1. {'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'metadata': {}}
  2. {'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'metadata': {}}
  3. {'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
  4. {'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
  5. {'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}

使用 @chain 装饰器:

  1. from langchain_core.runnables import chain
  2. @chain
  3. async def reverse_and_double(word: str):
  4. return await reverse_word.ainvoke(word) * 2
  5. await reverse_and_double.ainvoke("1234")
  6. async for event in reverse_and_double.astream_events("1234", version="v2"):
  7. print(event)

API 参考:chain

  1. {'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'metadata': {}}
  2. {'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'metadata': {}}
  3. {'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
  4. {'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
  5. {'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}

后续步骤

现在你已经学会了使用LangChain流式传输最终输出和内部步骤的一些方法。

要了解更多,请查看本节中的其他操作指南,或查看Langchain表达式语言的概念指南