Eino: Flow 集成

大模型应用是存在通用场景和模式的,若把这些场景进行抽象,就能提供一些可以帮助开发者快速构建大模型应用的模版。Eino 的 Flow 模块就是在做这件事。

目前 Eino 已经集成了 react agent、host multi agent 两个常用的 Agent 模式,以及 MultiQueryRetriever, ParentIndexer 等。

Flow 集成组件自身一般是由一个或多个 graph 编排而成。同时,这些 flow 也可以作为节点进入其他 graph 的编排之中,方式有三种:

  1. 如果一个 flow 实现了某个组件的 interface,可用该组件对应的 AddXXXNode 等方法加入编排,如 multiquery retriever:
  1. // instantiate the flow: multiquery.NewRetriever
  2. vk, err := newVikingDBRetriever(ctx, vikingDBHost, vikingDBRegion, vikingDBAK, vikingDBSK)
  3. if err != nil {
  4. logs.Errorf("newVikingDBRetriever failed, err=%v", err)
  5. return
  6. }
  7. llm, err := newChatModel(ctx, openAIBaseURL, openAIAPIKey, openAIModelName)
  8. if err != nil {
  9. logs.Errorf("newChatModel failed, err=%v", err)
  10. return
  11. }
  12. // rewrite query by llm
  13. mqr, err := multiquery.NewRetriever(ctx, &multiquery.Config{
  14. RewriteLLM: llm,
  15. RewriteTemplate: nil, // use default
  16. QueryVar: "", // use default
  17. LLMOutputParser: nil, // use default
  18. MaxQueriesNum: 3,
  19. OrigRetriever: vk,
  20. FusionFunc: nil, // use default fusion, just deduplicate by doc id
  21. })
  22. if err != nil {
  23. logs.Errorf("NewMultiQueryRetriever failed, err=%v", err)
  24. return
  25. }
  26. // add the flow to graph
  27. graph := compose.NewGraph[string, *schema.Message]()
  28. _ = graph.AddRetrieverNode("multi_query_retriever", mqr, compose.WithOutputKey("context"))
  29. _ = graph.AddEdge(compose._START_, "multi_query_retriever")
  30. _ = graph.AddChatTemplateNode("template", prompt.FromMessages(schema._FString_, schema.UserMessage("{context}")))
  31. // ...
  1. 如果一个 flow 内部是由单个 graph 编排而成,且 flow 的功能可完全等价于这个 graph 的运行(没有不能转化成 graph run 的定制逻辑),则可以将该 flow 的 graph 导出,通过 AddGraphNode 等方法加入编排,如 ReAct Agent 和 Host Multi-Agent:
  1. // instantiate the host multi-agent
  2. hostMA, err := NewMultiAgent(ctx, &MultiAgentConfig{
  3. Host: Host{
  4. ChatModel: mockHostLLM,
  5. },
  6. Specialists: []*Specialist{
  7. specialist1,
  8. specialist2,
  9. },
  10. })
  11. assert.Nil(t, err)
  12. // export graph and []GraphAddNodeOption from host multi-agent
  13. maGraph, opts := hostMA.ExportGraph()
  14. // add to another graph
  15. fullGraph, err := compose.NewChain[map[string]any, *schema.Message]().
  16. AppendChatTemplate(prompt.FromMessages(schema._FString_, schema.UserMessage("what's the capital city of {country_name}"))).
  17. AppendGraph(maGraph, append(opts, compose.WithNodeKey("host_ma_node"))...).
  18. Compile(ctx)
  19. assert.Nil(t, err)
  20. // invoke the other graph
  21. // convert the flow's own option to compose.Option if needed
  22. // assign options to flow's nodes if needed
  23. out, err := fullGraph.Invoke(ctx, map[string]any{"country_name": "China"},
  24. compose.WithCallbacks(ConvertCallbackHandlers(mockCallback)).
  25. DesignateNodeWithPath(compose.NewNodePath("host_ma_node", hostMA.HostNodeKey())))
  1. 所有 flow 应当都可以封装成 Lambda,通过 AddLambdaNode 等方法加入编排。目前所有的 flow 都可以通过 1 或 2 加入编排,所以不需要降级到使用 Lambda。如果要用,使用姿势是:
  1. // instantiate the flow
  2. a, err := NewAgent(ctx, &AgentConfig{
  3. Model: cm,
  4. ToolsConfig: compose.ToolsNodeConfig{
  5. Tools: []tool.BaseTool{fakeTool, &fakeStreamToolGreetForTest{}},
  6. },
  7. MaxStep: 40,
  8. })
  9. assert.Nil(t, err)
  10. chain := compose.NewChain[[]*schema.Message, string]()
  11. // convert the flow to Lambda
  12. agentLambda, err := compose.AnyLambda(a.Generate, a.Stream, nil, nil)
  13. assert.Nil(t, err)
  14. // add lambda to another graph
  15. chain.
  16. AppendLambda(agentLambda).
  17. AppendLambda(compose.InvokableLambda(func(ctx context.Context, input *schema.Message) (string, error) {
  18. t.Log("got agent response: ", input.Content)
  19. return input.Content, nil
  20. }))
  21. r, err := chain.Compile(ctx)
  22. assert.Nil(t, err)
  23. // invoke the graph
  24. res, err := r.Invoke(ctx, []*schema.Message{{Role: schema._User_, Content: "hello"}},
  25. compose.WithCallbacks(callbackForTest))

三个方法的对比如下:

方式 适用场景 优势
作为组件 需实现组件的 interface 简单直接,语义清晰
作为 Graph 由单个 graph 编排而成,功能不超出这个 graph 的范围 graph 内节点对外层 graph 暴露,可统一分配运行时 option,相比 Lambda 少一层转化,可通过 GraphCompileCallback 获取上下级 graph 关系
作为 Lambda 所有 普适