Flows

了解如何使用 CrewAI Flows 创建和管理 AI 工作流。

概述

CrewAI Flows 是一项强大的功能,旨在简化 AI 工作流的创建与管理。Flows 允许开发者高效地组合与协调编码任务和 Crews,为构建复杂的 AI 自动化提供稳健的框架。

Flows 允许你创建结构化、事件驱动的工作流。它们提供了一种无缝方式来连接多个任务、管理状态,并控制 AI 应用中的执行流程。借助 Flows,你可以轻松设计并实现多步骤流程,充分发挥 CrewAI 能力的全部潜力。

  1. 简化工作流创建:轻松将多个 Crews 和任务串联起来,创建复杂的 AI 工作流。

  2. 状态管理:Flows 让你能够非常轻松地在工作流中的不同任务之间管理和共享状态。

  3. 事件驱动架构:基于事件驱动模型构建,使工作流更具动态性和响应性。

  4. 灵活的流程控制:可在工作流中实现条件逻辑、循环和分支。

快速开始

让我们创建一个简单的 Flow:你将在一个任务中使用 OpenAI 生成一个随机城市,然后在另一个任务中使用该城市生成一条有趣的事实。

  1. from crewai.flow.flow import Flow, listen, start
  2. from dotenv import load_dotenv
  3. from litellm import completion
  4. class ExampleFlow(Flow):
  5. model = "gpt-4o-mini"
  6. @start()
  7. def generate_city(self):
  8. print("Starting flow")
  9. # Each flow state automatically gets a unique ID
  10. print(f"Flow State ID: {self.state['id']}")
  11. response = completion(
  12. model=self.model,
  13. messages=[
  14. {
  15. "role": "user",
  16. "content": "Return the name of a random city in the world.",
  17. },
  18. ],
  19. )
  20. random_city = response["choices"][0]["message"]["content"]
  21. # Store the city in our state
  22. self.state["city"] = random_city
  23. print(f"Random City: {random_city}")
  24. return random_city
  25. @listen(generate_city)
  26. def generate_fun_fact(self, random_city):
  27. response = completion(
  28. model=self.model,
  29. messages=[
  30. {
  31. "role": "user",
  32. "content": f"Tell me a fun fact about {random_city}",
  33. },
  34. ],
  35. )
  36. fun_fact = response["choices"][0]["message"]["content"]
  37. # Store the fun fact in our state
  38. self.state["fun_fact"] = fun_fact
  39. return fun_fact
  40. flow = ExampleFlow()
  41. flow.plot()
  42. result = flow.kickoff()
  43. print(f"Generated fun fact: {result}")

在上面的示例中,我们创建了一个简单的 Flow,它先使用 OpenAI 生成一个随机城市,然后再生成一条关于该城市的有趣事实。这个 Flow 由两个任务组成:generate_citygenerate_fun_factgenerate_city 是 Flow 的起点,而 generate_fun_fact 任务会监听 generate_city 任务的输出。

每个 Flow 实例都会在其状态中自动获得一个唯一标识符(UUID),这有助于跟踪和管理 Flow 的执行。状态还可以存储附加数据(例如生成的城市和有趣事实),并在整个 Flow 执行期间持续保留。

当你运行这个 Flow 时,它将会:

  1. 为 Flow 状态生成一个唯一 ID
  2. 生成一个随机城市并将其存储在状态中
  3. 生成一条关于该城市的有趣事实并将其存储在状态中
  4. 将结果打印到控制台

状态中的唯一 ID 和存储的数据可用于跟踪 Flow 执行情况,并在任务之间保持上下文。

注意: 请确保你已经设置好 .env 文件来存储 OPENAI_API_KEY。该密钥是向 OpenAI API 发起身份验证请求所必需的。

@start()

@start() 装饰器用于标记 Flow 的入口点。你可以:

  • 声明多个无条件起点:@start()
  • 基于先前的方法或路由标签来控制起点:@start("method_or_label")
  • 提供一个可调用条件来控制起点何时触发

所有满足条件的 @start() 方法都会在 Flow 开始或恢复时执行(通常会并行执行)。

@listen()

@listen() 装饰器用于将某个方法标记为监听 Flow 中另一个任务输出的监听器。使用 @listen() 装饰的方法会在指定任务发出输出时执行。该方法可以通过参数访问它所监听任务的输出。

用法

@listen() 装饰器可通过多种方式使用:

  1. 按方法名监听:你可以传入要监听的方法名字符串。当该方法完成时,监听方法会被触发。

    1. @listen("generate_city")
    2. def generate_fun_fact(self, random_city):
    3. # Implementation
  2. 直接监听方法:你可以直接传入方法本身。当该方法完成时,监听方法会被触发。

    1. @listen(generate_city)
    2. def generate_fun_fact(self, random_city):
    3. # Implementation

Flow 输出

访问和处理 Flow 的输出,对于将 AI 工作流集成进更大的应用程序或系统至关重要。CrewAI Flows 提供了直接的机制来获取最终输出、访问中间结果并管理整体状态。

获取最终输出

当你运行一个 Flow 时,最终输出由最后一个完成的方法决定。kickoff() 方法会返回这个最终方法的输出。

以下是访问最终输出的方式:

  1. from crewai.flow.flow import Flow, listen, start
  2. class OutputExampleFlow(Flow):
  3. @start()
  4. def first_method(self):
  5. return "Output from first_method"
  6. @listen(first_method)
  7. def second_method(self, first_output):
  8. return f"Second method received: {first_output}"
  9. flow = OutputExampleFlow()
  10. flow.plot("my_flow_plot")
  11. final_output = flow.kickoff()
  12. print("---- Final Output ----")
  13. print(final_output)
  1. ---- Final Output ----
  2. Second method received: Output from first_method

在这个示例中,second_method 是最后完成的方法,因此它的输出将成为 Flow 的最终输出。 kickoff() 方法会返回最终输出,然后将其打印到控制台。plot() 方法会生成 HTML 文件,帮助你理解这个 Flow。

访问和更新状态

除了获取最终输出外,你还可以在 Flow 中访问和更新状态。状态可用于在 Flow 的不同方法之间存储和共享数据。Flow 运行结束后,你可以访问状态,以获取执行过程中新增或更新的任何信息。

下面是一个关于如何更新并访问状态的示例:

  1. from crewai.flow.flow import Flow, listen, start
  2. from pydantic import BaseModel
  3. class ExampleState(BaseModel):
  4. counter: int = 0
  5. message: str = ""
  6. class StateExampleFlow(Flow[ExampleState]):
  7. @start()
  8. def first_method(self):
  9. self.state.message = "Hello from first_method"
  10. self.state.counter += 1
  11. @listen(first_method)
  12. def second_method(self):
  13. self.state.message += " - updated by second_method"
  14. self.state.counter += 1
  15. return self.state.message
  16. flow = StateExampleFlow()
  17. flow.plot("my_flow_plot")
  18. final_output = flow.kickoff()
  19. print(f"Final Output: {final_output}")
  20. print("Final State:")
  21. print(flow.state)
  1. Final Output: Hello from first_method - updated by second_method
  2. Final State:
  3. counter=2 message='Hello from first_method - updated by second_method'

在这个示例中,状态会被 first_methodsecond_method 共同更新。 Flow 运行结束后,你可以访问最终状态,以查看这些方法所做的更新。

通过确保返回最终方法的输出并提供对状态的访问,CrewAI Flows 使你能够轻松地将 AI 工作流的结果集成到更大的应用程序或系统中, 同时还能在整个 Flow 执行过程中维护和访问状态。

Flow 状态管理

有效管理状态对于构建可靠且易维护的 AI 工作流至关重要。CrewAI Flows 同时为非结构化和结构化状态管理提供了稳健机制, 使开发者能够根据应用需求选择最适合的方法。

非结构化状态管理

在非结构化状态管理中,所有状态都存储在 Flow 类的 state 属性中。 这种方式提供了灵活性,使开发者无需定义严格模式即可动态添加或修改状态属性。 即使在非结构化状态下,CrewAI Flows 也会自动为每个状态实例生成并维护唯一标识符(UUID)。

  1. from crewai.flow.flow import Flow, listen, start
  2. class UnstructuredExampleFlow(Flow):
  3. @start()
  4. def first_method(self):
  5. # The state automatically includes an 'id' field
  6. print(f"State ID: {self.state['id']}")
  7. self.state['counter'] = 0
  8. self.state['message'] = "Hello from structured flow"
  9. @listen(first_method)
  10. def second_method(self):
  11. self.state['counter'] += 1
  12. self.state['message'] += " - updated"
  13. @listen(second_method)
  14. def third_method(self):
  15. self.state['counter'] += 1
  16. self.state['message'] += " - updated again"
  17. print(f"State after third_method: {self.state}")
  18. flow = UnstructuredExampleFlow()
  19. flow.plot("my_flow_plot")
  20. flow.kickoff()

注意: id 字段会在整个 Flow 执行过程中自动生成并保留。你无需手动管理或设置它,即使在用新数据更新状态时,它也会被保留。

关键点:

  • 灵活性: 你可以在没有预定义约束的情况下,动态向 self.state 添加属性。
  • 简洁性: 非常适合状态结构简单或变化较大的直接型工作流。

结构化状态管理

结构化状态管理利用预定义模式来确保整个工作流中的一致性和类型安全。 通过使用像 Pydantic 的 BaseModel 这样的模型,开发者可以明确定义状态的精确结构,从而在开发环境中获得更好的校验和自动补全能力。

CrewAI Flows 中的每个状态都会自动获得一个唯一标识符(UUID),以帮助跟踪和管理状态实例。这个 ID 由 Flow 系统自动生成和管理。

  1. from crewai.flow.flow import Flow, listen, start
  2. from pydantic import BaseModel
  3. class ExampleState(BaseModel):
  4. # Note: 'id' field is automatically added to all states
  5. counter: int = 0
  6. message: str = ""
  7. class StructuredExampleFlow(Flow[ExampleState]):
  8. @start()
  9. def first_method(self):
  10. # Access the auto-generated ID if needed
  11. print(f"State ID: {self.state.id}")
  12. self.state.message = "Hello from structured flow"
  13. @listen(first_method)
  14. def second_method(self):
  15. self.state.counter += 1
  16. self.state.message += " - updated"
  17. @listen(second_method)
  18. def third_method(self):
  19. self.state.counter += 1
  20. self.state.message += " - updated again"
  21. print(f"State after third_method: {self.state}")
  22. flow = StructuredExampleFlow()
  23. flow.kickoff()

关键点:

  • 明确的模式: ExampleState 清晰定义了状态结构,增强了代码的可读性和可维护性。
  • 类型安全: 利用 Pydantic 可确保状态属性符合指定类型,从而减少运行时错误。
  • 自动补全: IDE 可基于定义好的状态模型提供更好的自动补全和错误检查。

如何在非结构化与结构化状态管理之间做选择

  • 以下情况适合使用非结构化状态管理:

    • 工作流状态较简单,或具有高度动态性。
    • 与严格的状态定义相比,更优先考虑灵活性。
    • 需要快速原型开发,而不想承担定义模式的额外开销。
  • 以下情况适合使用结构化状态管理:

    • 工作流需要定义明确且一致的状态结构。
    • 类型安全和校验对于应用的可靠性很重要。
    • 你希望利用 IDE 的自动补全与类型检查功能来提升开发体验。

通过同时提供非结构化与结构化状态管理选项,CrewAI Flows 让开发者能够构建既灵活又稳健的 AI 工作流,以满足广泛的应用需求。

Flow 持久化

@persist 装饰器可在 CrewAI Flows 中启用自动状态持久化,使你能够在重启后或不同工作流执行之间保留 Flow 状态。该装饰器既可应用于类级别,也可应用于方法级别,从而在状态持久化管理方式上提供灵活性。

类级别持久化

当应用于类级别时,@persist 装饰器会自动持久化所有 Flow 方法的状态:

  1. @persist # Using SQLiteFlowPersistence by default
  2. class MyFlow(Flow[MyState]):
  3. @start()
  4. def initialize_flow(self):
  5. # This method will automatically have its state persisted
  6. self.state.counter = 1
  7. print("Initialized flow. State ID:", self.state.id)
  8. @listen(initialize_flow)
  9. def next_step(self):
  10. # The state (including self.state.id) is automatically reloaded
  11. self.state.counter += 1
  12. print("Flow state is persisted. Counter:", self.state.counter)

方法级别持久化

若需要更细粒度的控制,你可以将 @persist 应用于特定方法:

  1. class AnotherFlow(Flow[dict]):
  2. @persist # Persists only this method's state
  3. @start()
  4. def begin(self):
  5. if "runs" not in self.state:
  6. self.state["runs"] = 0
  7. self.state["runs"] += 1
  8. print("Method-level persisted runs:", self.state["runs"])

工作原理

  1. 唯一状态标识

    • 每个 Flow 状态都会自动获得一个唯一 UUID
    • 该 ID 会在状态更新和方法调用之间被保留
    • 同时支持结构化状态(Pydantic BaseModel)和非结构化状态(dictionary)
  2. 默认 SQLite 后端

    • SQLiteFlowPersistence 是默认存储后端
    • 状态会自动保存到本地 SQLite 数据库
    • 健壮的错误处理确保数据库操作失败时提供清晰提示
  3. 错误处理

    • 为数据库操作提供全面的错误信息
    • 在保存和加载时自动进行状态校验
    • 当持久化操作出现问题时提供清晰反馈

重要注意事项

  • 状态类型: 同时支持结构化状态(Pydantic BaseModel)和非结构化状态(dictionary)
  • 自动 ID: 若不存在 id 字段,则会自动添加
  • 状态恢复: 执行失败或重启的 Flow 可自动重新加载之前的状态
  • 自定义实现: 你可以提供自己的 FlowPersistence 实现,以满足特定存储需求

技术优势

  1. 通过底层访问实现精确控制

    • 可直接访问持久化操作,以支持高级用例
    • 通过方法级持久化装饰器实现细粒度控制
    • 内置状态检查与调试能力
    • 完整掌控状态变化与持久化操作
  2. 增强可靠性

    • 系统故障或重启后可自动恢复状态
    • 基于事务的状态更新可保证数据完整性
    • 全面的错误处理与清晰的错误提示
    • 在状态保存和加载操作期间进行稳健校验
  3. 可扩展架构

    • 可通过 FlowPersistence 接口自定义持久化后端
    • 支持超越 SQLite 的专用存储方案
    • 兼容结构化(Pydantic)和非结构化(dict)状态
    • 可与现有 CrewAI Flow 模式无缝集成

该持久化系统的架构强调技术精确性和可定制性,使开发者既能受益于内置可靠性功能,又能对状态管理保持完全控制。

Flow 控制

条件逻辑:or

Flows 中的 or_ 函数允许你监听多个方法,并在任意一个指定方法发出输出时触发监听方法。

  1. from crewai.flow.flow import Flow, listen, or_, start
  2. class OrExampleFlow(Flow):
  3. @start()
  4. def start_method(self):
  5. return "Hello from the start method"
  6. @listen(start_method)
  7. def second_method(self):
  8. return "Hello from the second method"
  9. @listen(or_(start_method, second_method))
  10. def logger(self, result):
  11. print(f"Logger: {result}")
  12. flow = OrExampleFlow()
  13. flow.plot("my_flow_plot")
  14. flow.kickoff()
  1. Logger: Hello from the start method
  2. Logger: Hello from the second method

当你运行这个 Flow 时,logger 方法会由 start_methodsecond_method 的输出触发。 or_ 函数用于监听多个方法,并在任意指定方法发出输出时触发监听方法。

条件逻辑:and

Flows 中的 and_ 函数允许你监听多个方法,并仅在所有指定方法都发出输出时触发监听方法。

  1. from crewai.flow.flow import Flow, and_, listen, start
  2. class AndExampleFlow(Flow):
  3. @start()
  4. def start_method(self):
  5. self.state["greeting"] = "Hello from the start method"
  6. @listen(start_method)
  7. def second_method(self):
  8. self.state["joke"] = "What do computers eat? Microchips."
  9. @listen(and_(start_method, second_method))
  10. def logger(self):
  11. print("---- Logger ----")
  12. print(self.state)
  13. flow = AndExampleFlow()
  14. flow.plot()
  15. flow.kickoff()
  1. ---- Logger ----
  2. {'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}

当你运行这个 Flow 时,logger 方法只会在 start_methodsecond_method 都发出输出时被触发。 and_ 函数用于监听多个方法,并仅在所有指定方法都发出输出时触发监听方法。

Router

Flows 中的 @router() 装饰器允许你根据某个方法的输出定义条件路由逻辑。 你可以依据该方法的输出指定不同路由,从而动态控制执行流程。

  1. import random
  2. from crewai.flow.flow import Flow, listen, router, start
  3. from pydantic import BaseModel
  4. class ExampleState(BaseModel):
  5. success_flag: bool = False
  6. class RouterFlow(Flow[ExampleState]):
  7. @start()
  8. def start_method(self):
  9. print("Starting the structured flow")
  10. random_boolean = random.choice([True, False])
  11. self.state.success_flag = random_boolean
  12. @router(start_method)
  13. def second_method(self):
  14. if self.state.success_flag:
  15. return "success"
  16. else:
  17. return "failed"
  18. @listen("success")
  19. def third_method(self):
  20. print("Third method running")
  21. @listen("failed")
  22. def fourth_method(self):
  23. print("Fourth method running")
  24. flow = RouterFlow()
  25. flow.plot("my_flow_plot")
  26. flow.kickoff()
  1. Starting the structured flow
  2. Third method running
  3. Fourth method running

在上面的示例中,start_method 会生成一个随机布尔值并将其设置到状态中。 second_method 使用 @router() 装饰器,根据这个布尔值定义条件路由逻辑。 如果该布尔值为 True,方法返回 "success";如果为 False,则返回 "failed"third_methodfourth_method 会监听 second_method 的输出,并根据返回值执行。

运行这个 Flow 时,输出会根据 start_method 生成的随机布尔值而变化。

Human in the Loop(人工反馈)

注意

@human_feedback 装饰器要求 CrewAI 版本 1.8.0 或更高版本

@human_feedback 装饰器通过暂停 Flow 执行来收集人工反馈,从而实现 human-in-the-loop 工作流。这对于需要人工判断的审批关卡、质量审核和决策节点非常有用。

  1. from crewai.flow.flow import Flow, start, listen
  2. from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
  3. class ReviewFlow(Flow):
  4. @start()
  5. @human_feedback(
  6. message="Do you approve this content?",
  7. emit=["approved", "rejected", "needs_revision"],
  8. llm="gpt-4o-mini",
  9. default_outcome="needs_revision",
  10. )
  11. def generate_content(self):
  12. return "Content to be reviewed..."
  13. @listen("approved")
  14. def on_approval(self, result: HumanFeedbackResult):
  15. print(f"Approved! Feedback: {result.feedback}")
  16. @listen("rejected")
  17. def on_rejection(self, result: HumanFeedbackResult):
  18. print(f"Rejected. Reason: {result.feedback}")

当指定 emit 时,人的自由文本反馈会由 LLM 进行解释,并归类为指定结果之一,随后触发对应的 @listen 装饰器。

你也可以在不进行路由的情况下使用 @human_feedback,仅用于收集反馈:

  1. @start()
  2. @human_feedback(message="Any comments on this output?")
  3. def my_method(self):
  4. return "Output for review"
  5. @listen(my_method)
  6. def next_step(self, result: HumanFeedbackResult):
  7. # Access feedback via result.feedback
  8. # Access original output via result.output
  9. pass

你可以通过 self.last_human_feedback(最近一次反馈)或 self.human_feedback_history(所有反馈列表)访问 Flow 中收集到的全部反馈。

有关 Flows 中人工反馈的完整指南,包括使用自定义提供方(Slack、webhooks 等)进行 异步 / 非阻塞反馈,请参阅 Human Feedback in Flows

在 Flows 中添加 Agents

Agents 可以无缝集成到你的 Flows 中,当你需要更简单、更专注的任务执行时,它们是完整 Crews 之外的一种轻量替代方案。下面是一个在 Flow 中使用 Agent 进行市场研究的示例:

  1. import asyncio
  2. from typing import Any, Dict, List
  3. from crewai_tools import SerperDevTool
  4. from pydantic import BaseModel, Field
  5. from crewai.agent import Agent
  6. from crewai.flow.flow import Flow, listen, start
  7. # Define a structured output format
  8. class MarketAnalysis(BaseModel):
  9. key_trends: List[str] = Field(description="List of identified market trends")
  10. market_size: str = Field(description="Estimated market size")
  11. competitors: List[str] = Field(description="Major competitors in the space")
  12. # Define flow state
  13. class MarketResearchState(BaseModel):
  14. product: str = ""
  15. analysis: MarketAnalysis | None = None
  16. # Create a flow class
  17. class MarketResearchFlow(Flow[MarketResearchState]):
  18. @start()
  19. def initialize_research(self) -> Dict[str, Any]:
  20. print(f"Starting market research for {self.state.product}")
  21. return {"product": self.state.product}
  22. @listen(initialize_research)
  23. async def analyze_market(self) -> Dict[str, Any]:
  24. # Create an Agent for market research
  25. analyst = Agent(
  26. role="Market Research Analyst",
  27. goal=f"Analyze the market for {self.state.product}",
  28. backstory="You are an experienced market analyst with expertise in "
  29. "identifying market trends and opportunities.",
  30. tools=[SerperDevTool()],
  31. verbose=True,
  32. )
  33. # Define the research query
  34. query = f"""
  35. Research the market for {self.state.product}. Include:
  36. 1. Key market trends
  37. 2. Market size
  38. 3. Major competitors
  39. Format your response according to the specified structure.
  40. """
  41. # Execute the analysis with structured output format
  42. result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
  43. if result.pydantic:
  44. print("result", result.pydantic)
  45. else:
  46. print("result", result)
  47. # Return the analysis to update the state
  48. return {"analysis": result.pydantic}
  49. @listen(analyze_market)
  50. def present_results(self, analysis) -> None:
  51. print("\nMarket Analysis Results")
  52. print("=====================")
  53. if isinstance(analysis, dict):
  54. # If we got a dict with 'analysis' key, extract the actual analysis object
  55. market_analysis = analysis.get("analysis")
  56. else:
  57. market_analysis = analysis
  58. if market_analysis and isinstance(market_analysis, MarketAnalysis):
  59. print("\nKey Market Trends:")
  60. for trend in market_analysis.key_trends:
  61. print(f"- {trend}")
  62. print(f"\nMarket Size: {market_analysis.market_size}")
  63. print("\nMajor Competitors:")
  64. for competitor in market_analysis.competitors:
  65. print(f"- {competitor}")
  66. else:
  67. print("No structured analysis data available.")
  68. print("Raw analysis:", analysis)
  69. # Usage example
  70. async def run_flow():
  71. flow = MarketResearchFlow()
  72. flow.plot("MarketResearchFlowPlot")
  73. result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
  74. return result
  75. # Run the flow
  76. if __name__ == "__main__":
  77. asyncio.run(run_flow())

这个示例展示了在 Flow 中使用 Agents 的几个关键特性:

  1. 结构化输出:使用 Pydantic 模型定义预期输出格式(MarketAnalysis),可确保整个 Flow 中的数据具有类型安全和结构化特征。

  2. 状态管理:Flow 状态(MarketResearchState)在步骤之间保持上下文,并存储输入和输出。

  3. 工具集成:Agents 可以使用工具(例如 WebsiteSearchTool)来增强其能力。

在 Flows 中添加 Crews

在 CrewAI 中,创建一个包含多个 crews 的 flow 非常直接。

你可以通过运行以下命令,生成一个新的 CrewAI 项目,其中已经包含创建多 crew flow 所需的全部脚手架:

  1. crewai create flow name_of_flow

该命令会生成一个新的 CrewAI 项目,并带有所需的文件夹结构。生成的项目包含一个已可直接运行的预构建 crew,名为 poem_crew。你可以将这个 crew 作为模板,通过复制、粘贴和编辑来创建其他 crews。

文件夹结构

运行 crewai create flow name_of_flow 命令后,你会看到如下类似的文件夹结构:

目录 / 文件 说明
name_of_flow/ flow 的根目录。
├── crews/ 包含特定 crews 的目录。
│ └── poem_crew/ “poem_crew” 的目录,包含其配置和脚本。
│ ├── config/ “poem_crew” 的配置文件目录。
│ │ ├── agents.yaml 定义 “poem_crew” 中 agents 的 YAML 文件。
│ │ └── tasks.yaml 定义 “poem_crew” 中 tasks 的 YAML 文件。
│ ├── poem_crew.py “poem_crew” 功能脚本。
├── tools/ flow 中使用的附加工具目录。
│ └── custom_tool.py 自定义工具实现。
├── main.py 运行 flow 的主脚本。
├── README.md 项目说明和使用说明。
├── pyproject.toml 项目依赖和设置的配置文件。
└── .gitignore 指定版本控制中要忽略的文件和目录。

构建你的 Crews

crews 文件夹中,你可以定义多个 crews。每个 crew 都会有自己的文件夹,其中包含配置文件和 crew 定义文件。例如,poem_crew 文件夹包含:

  • config/agents.yaml:定义 crew 中的 agents。
  • config/tasks.yaml:定义 crew 中的 tasks。
  • poem_crew.py:包含 crew 定义,包括 agents、tasks 以及 crew 本身。

你可以复制、粘贴并编辑 poem_crew,以创建其他 crews。

main.py 中连接 Crews

main.py 文件是你创建 flow 并将 crews 连接在一起的地方。你可以使用 Flow 类以及 @start@listen 装饰器来定义执行流程。

下面是一个在 main.py 文件中连接 poem_crew 的示例:

  1. #!/usr/bin/env python
  2. from random import randint
  3. from pydantic import BaseModel
  4. from crewai.flow.flow import Flow, listen, start
  5. from .crews.poem_crew.poem_crew import PoemCrew
  6. class PoemState(BaseModel):
  7. sentence_count: int = 1
  8. poem: str = ""
  9. class PoemFlow(Flow[PoemState]):
  10. @start()
  11. def generate_sentence_count(self):
  12. print("Generating sentence count")
  13. self.state.sentence_count = randint(1, 5)
  14. @listen(generate_sentence_count)
  15. def generate_poem(self):
  16. print("Generating poem")
  17. result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})
  18. print("Poem generated", result.raw)
  19. self.state.poem = result.raw
  20. @listen(generate_poem)
  21. def save_poem(self):
  22. print("Saving poem")
  23. with open("poem.txt", "w") as f:
  24. f.write(self.state.poem)
  25. def kickoff():
  26. poem_flow = PoemFlow()
  27. poem_flow.kickoff()
  28. def plot():
  29. poem_flow = PoemFlow()
  30. poem_flow.plot("PoemFlowPlot")
  31. if __name__ == "__main__":
  32. kickoff()
  33. plot()

在这个示例中,PoemFlow 类定义了一个 flow:先生成句子数量,再使用 PoemCrew 生成诗歌,最后将诗歌保存到文件中。通过调用 kickoff() 方法来启动该 flow。plot() 方法会生成 PoemFlowPlot

运行 Flow

(可选)在运行 flow 之前,你可以先通过以下命令安装依赖:

  1. crewai install

安装完所有依赖后,你需要通过以下命令激活虚拟环境:

  1. source .venv/bin/activate

激活虚拟环境后,你可以执行以下任一命令来运行 flow:

  1. crewai flow kickoff

或者

  1. uv run kickoff

该 flow 将会执行,你应该会在控制台中看到输出结果。

绘制 Flows

将 AI 工作流可视化可以帮助你深入理解 flows 的结构与执行路径。CrewAI 提供了强大的可视化工具,允许你为 flows 生成交互式图表,从而更轻松地理解和优化 AI 工作流。

什么是图表

CrewAI 中的图表是 AI 工作流的图形化表示。它们展示了各个任务、它们之间的连接关系,以及数据在它们之间的流动方式。这种可视化有助于理解操作顺序、识别瓶颈,并确保工作流逻辑符合你的预期。

如何生成图表

CrewAI 提供了两种便捷方式来生成 flow 图表:

方式 1:使用 plot() 方法

如果你直接使用某个 flow 实例进行开发,可以在 flow 对象上调用 plot() 方法来生成图表。该方法会创建一个包含交互式 flow 图表的 HTML 文件。

  1. # Assuming you have a flow instance
  2. flow.plot("my_flow_plot")

这会在当前目录下生成一个名为 my_flow_plot.html 的文件。你可以在浏览器中打开该文件,查看交互式图表。

方式 2:使用命令行

如果你是在一个结构化的 CrewAI 项目中工作,也可以通过命令行生成图表。这对于较大的项目特别有用,因为你可能希望可视化整个 flow 设置。

  1. crewai flow plot

该命令会生成一个包含 flow 图表的 HTML 文件,与 plot() 方法效果类似。文件会保存在你的项目目录中,你可以在浏览器中打开它来查看 flow。

理解图表

生成的图表会显示代表 flow 中任务的节点,带方向的边则表示执行流向。图表是交互式的,你可以进行缩放,并在悬停于节点上时查看附加细节。

通过将 flows 可视化,你可以更清晰地理解工作流结构,从而更容易进行调试、优化,并向他人传达你的 AI 流程设计。

总结

绘制 flows 是 CrewAI 的一项强大功能,它能增强你设计和管理复杂 AI 工作流的能力。无论你选择使用 plot() 方法还是命令行,生成图表都能为你的工作流提供可视化表示,从而在开发和展示中都带来帮助。

下一步

如果你有兴趣进一步探索更多 flow 示例,我们在示例仓库中提供了多种推荐。以下是四个具体的 flow 示例,每个都展示了独特的使用场景,可帮助你将当前问题类型匹配到特定示例:

  1. Email Auto Responder Flow:该示例展示了一个无限循环,其中后台任务持续运行以自动回复邮件。这非常适合那些需要在无人手动干预下持续重复执行的任务。查看示例

  2. Lead Score Flow:这个 flow 展示了如何加入 human-in-the-loop 反馈,并使用 router 处理不同的条件分支。它是将动态决策和人工监督融入工作流的优秀示例。查看示例

  3. Write a Book Flow:这个示例非常擅长将多个 crews 串联起来,即一个 crew 的输出会被另一个 crew 使用。具体来说,一个 crew 负责为整本书制定大纲,另一个 crew 则根据大纲生成章节,最终连接起来生成完整书籍。这个 flow 非常适合需要不同任务协同的复杂多步骤流程。查看示例

  4. Meeting Assistant Flow:这个 flow 展示了如何广播一个事件来触发多个后续动作。例如,在会议结束后,flow 可以更新 Trello 看板、发送 Slack 消息,并保存结果。它是处理单一事件触发多个结果的绝佳示例,非常适合全面的任务管理和通知系统。查看示例

通过探索这些示例,你可以深入了解如何将 CrewAI Flows 应用于各种场景,从自动化重复任务到管理具有动态决策和人工反馈的复杂多步骤流程。

另外,也欢迎查看我们下方关于如何在 CrewAI 中使用 flows 的 YouTube 视频!

运行 Flows

运行 flow 有两种方式:

使用 Flow API

你可以通过创建 flow 类的实例并调用 kickoff() 方法,以编程方式运行一个 flow:

  1. flow = ExampleFlow()
  2. result = flow.kickoff()

流式执行 Flow

为了实时了解 flow 执行情况,你可以启用流式输出,以便在生成输出时立即接收:

  1. class StreamingFlow(Flow):
  2. stream = True # Enable streaming
  3. @start()
  4. def research(self):
  5. # Your flow implementation
  6. pass
  7. # Iterate over streaming output
  8. flow = StreamingFlow()
  9. streaming = flow.kickoff()
  10. for chunk in streaming:
  11. print(chunk.content, end="", flush=True)
  12. # Access final result
  13. result = streaming.result

有关流式处理的更多信息,请参阅 Streaming Flow Execution 指南。

Flows 中的 Memory

每个 Flow 都会自动获得对 CrewAI 统一 Memory 系统的访问权限。你可以在任何 flow 方法中使用三个内置便捷方法直接存储、召回和提取记忆。

内置方法

方法 说明
self.remember(content, **kwargs) 将内容存入 memory。可选参数包括 scopecategoriesmetadataimportance
self.recall(query, **kwargs) 检索相关 memories。可选参数包括 scopecategorieslimitdepth
self.extract_memories(content) 将原始文本拆分为离散、自包含的 memory 陈述。

初始化 Flow 时会自动创建默认的 Memory() 实例。你也可以传入自定义实例:

  1. from crewai.flow.flow import Flow
  2. from crewai import Memory
  3. custom_memory = Memory(
  4. recency_weight=0.5,
  5. recency_half_life_days=7,
  6. embedder={"provider": "ollama", "config": {"model_name": "mxbai-embed-large"}},
  7. )
  8. flow = MyFlow(memory=custom_memory)

示例:Research and Analyze Flow

  1. from crewai.flow.flow import Flow, listen, start
  2. class ResearchAnalysisFlow(Flow):
  3. @start()
  4. def gather_data(self):
  5. # Simulate research findings
  6. findings = (
  7. "PostgreSQL handles 10k concurrent connections with connection pooling. "
  8. "MySQL caps at around 5k. MongoDB scales horizontally but adds complexity."
  9. )
  10. # Extract atomic facts and remember each one
  11. memories = self.extract_memories(findings)
  12. for mem in memories:
  13. self.remember(mem, scope="/research/databases")
  14. return findings
  15. @listen(gather_data)
  16. def analyze(self, raw_findings):
  17. # Recall relevant past research (from this run or previous runs)
  18. past = self.recall("database performance and scaling", limit=10, depth="shallow")
  19. context_lines = [f"- {m.record.content}" for m in past]
  20. context = "\n".join(context_lines) if context_lines else "No prior context."
  21. return {
  22. "new_findings": raw_findings,
  23. "prior_context": context,
  24. "total_memories": len(past),
  25. }
  26. flow = ResearchAnalysisFlow()
  27. result = flow.kickoff()
  28. print(result)

由于 memory 会在不同运行之间持久存在(由磁盘上的 LanceDB 支持),因此 analyze 步骤也会召回之前执行中的研究结果——从而使 flow 能够随着时间学习并积累知识。

有关 scopes、slices、组合评分、embedder 配置等更多细节,请参阅 Memory documentation

使用 CLI

从 0.103.0 版本开始,你可以使用 crewai run 命令运行 flows:

  1. crewai run

该命令会自动检测你的项目是否为 flow(依据 pyproject.toml 中的 type = "flow" 设置),并相应地运行它。这是从命令行运行 flows 的推荐方式。

为保持向后兼容,你也可以使用:

  1. crewai flow kickoff

不过,现在更推荐使用 crewai run 命令,因为它同时适用于 crews 和 flows。